diff options
| author | hilimd <68371223+hilimd@users.noreply.github.com> | 2020-08-31 18:02:11 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-08-31 18:02:11 +0800 |
| commit | 44a56b158e4637bd70d3fcf8ddc9107973b60558 (patch) | |
| tree | 4cf59d290d346c6ea06d617531c90d2653f3bc03 | |
| parent | b0d6330cf44dbb0664f6ede0dbc82865879dcfe0 (diff) | |
| parent | 408e339c53b9b6626e81f1c3f0f2399494bf4ce6 (diff) | |
| download | seaweedfs-44a56b158e4637bd70d3fcf8ddc9107973b60558.tar.xz seaweedfs-44a56b158e4637bd70d3fcf8ddc9107973b60558.zip | |
Merge pull request #13 from chrislusf/master
sync
80 files changed, 909 insertions, 428 deletions
diff --git a/k8s/seaweedfs/Chart.yaml b/k8s/seaweedfs/Chart.yaml index 0744d5051..39de78deb 100644 --- a/k8s/seaweedfs/Chart.yaml +++ b/k8s/seaweedfs/Chart.yaml @@ -1,4 +1,4 @@ apiVersion: v1 description: SeaweedFS name: seaweedfs -version: 1.90
\ No newline at end of file +version: 1.92
\ No newline at end of file diff --git a/k8s/seaweedfs/values.yaml b/k8s/seaweedfs/values.yaml index e87bc0fcb..ebbcd55b5 100644 --- a/k8s/seaweedfs/values.yaml +++ b/k8s/seaweedfs/values.yaml @@ -4,7 +4,7 @@ global: registry: "" repository: "" imageName: chrislusf/seaweedfs - imageTag: "1.90" + imageTag: "1.92" imagePullPolicy: IfNotPresent imagePullSecrets: imagepullsecret restartPolicy: Always diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index dcc18f2a5..65947e674 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -102,6 +102,7 @@ message EventNotification { bool delete_chunks = 3; string new_parent_path = 4; bool is_from_other_cluster = 5; + repeated int32 signatures = 6; } message FileChunk { @@ -150,6 +151,7 @@ message CreateEntryRequest { Entry entry = 2; bool o_excl = 3; bool is_from_other_cluster = 4; + repeated int32 signatures = 5; } message CreateEntryResponse { @@ -160,6 +162,7 @@ message UpdateEntryRequest { string directory = 1; Entry entry = 2; bool is_from_other_cluster = 3; + repeated int32 signatures = 4; } message UpdateEntryResponse { } @@ -180,6 +183,7 @@ message DeleteEntryRequest { bool is_recursive = 5; bool ignore_recursive_error = 6; bool is_from_other_cluster = 7; + repeated int32 signatures = 8; } message DeleteEntryResponse { @@ -268,6 +272,7 @@ message SubscribeMetadataRequest { string client_name = 1; string path_prefix = 2; int64 since_ns = 3; + int32 signature = 4; } message SubscribeMetadataResponse { string directory = 1; diff --git a/test/random_access/src/test/java/seaewedfs/mmap/MmapFileTest.java b/test/random_access/src/test/java/seaweedfs/file/MmapFileTest.java index e64fb85f0..1d741ee2f 100644 --- a/test/random_access/src/test/java/seaewedfs/mmap/MmapFileTest.java +++ b/test/random_access/src/test/java/seaweedfs/file/MmapFileTest.java @@ -1,4 +1,4 @@ -package seaewedfs.mmap; +package seaweedfs.file; import org.junit.Test; @@ -11,7 +11,7 @@ import java.nio.channels.FileChannel; public class MmapFileTest { - File dir = new File("/Users/chris/tmp/mm/dev"); + static File dir = new File("/Users/chris/tmp/mm/dev"); @Test public void testMmap() { diff --git a/test/random_access/src/test/java/seaweedfs/file/RandomeAccessFileTest.java b/test/random_access/src/test/java/seaweedfs/file/RandomeAccessFileTest.java new file mode 100644 index 000000000..cb5847567 --- /dev/null +++ b/test/random_access/src/test/java/seaweedfs/file/RandomeAccessFileTest.java @@ -0,0 +1,70 @@ +package seaweedfs.file; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.util.Random; + +public class RandomeAccessFileTest { + + @Test + public void testRandomWriteAndRead() throws IOException { + + File f = new File(MmapFileTest.dir, "mmap_file.txt"); + + RandomAccessFile af = new RandomAccessFile(f, "rw"); + af.setLength(0); + af.close(); + + Random r = new Random(); + + int maxLength = 5000; + + byte[] data = new byte[maxLength]; + byte[] readData = new byte[maxLength]; + + for (int i = 4096; i < maxLength; i++) { + + RandomAccessFile raf = new RandomAccessFile(f, "rw"); + long fileSize = raf.length(); + + raf.readFully(readData, 0, (int)fileSize); + + for (int x=0;x<fileSize;x++){ + Assert.assertEquals(data[x], readData[x]); + } + + int start = r.nextInt(i); + int stop = r.nextInt(i); + if (start > stop) { + int t = stop; + stop = start; + start = t; + } + if (stop > fileSize) { + fileSize = stop; + raf.setLength(fileSize); + } + + randomize(r, data, start, stop); + raf.seek(start); + raf.write(data, start, stop-start); + + raf.close(); + } + + } + + private static void randomize(Random r, byte[] bytes, int start, int stop) { + for (int i = start; i < stop; i++) { + int rnd = r.nextInt(); + bytes[i] = (byte) rnd; + } + } + + +} diff --git a/weed/command/server.go b/weed/command/server.go index d16095075..4aeecbff0 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -98,6 +98,7 @@ func init() { serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second") serverOptions.v.fileSizeLimitMB = cmdServer.Flag.Int("volume.fileSizeLimitMB", 1024, "limit file size to avoid out of memory") serverOptions.v.publicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address") + serverOptions.v.preStopSeconds = cmdServer.Flag.Int("volume.preStopSeconds", 30, "number of seconds between stop send heartbeats and stop volume server") serverOptions.v.pprof = &False s3Options.port = cmdServer.Flag.Int("s3.port", 8333, "s3 server http listen port") diff --git a/weed/command/volume.go b/weed/command/volume.go index 6fb7447e7..c8f24802c 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -55,6 +55,7 @@ type VolumeServerOptions struct { fileSizeLimitMB *int minFreeSpacePercents []float32 pprof *bool + preStopSeconds *int // pulseSeconds *int } @@ -66,6 +67,7 @@ func init() { v.publicUrl = cmdVolume.Flag.String("publicUrl", "", "Publicly accessible address") v.bindIp = cmdVolume.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to") v.masters = cmdVolume.Flag.String("mserver", "localhost:9333", "comma-separated master servers") + v.preStopSeconds = cmdVolume.Flag.Int("preStopSeconds", 30, "number of seconds between stop send heartbeats and stop volume server") // v.pulseSeconds = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than or equal to the master's setting") v.idleConnectionTimeout = cmdVolume.Flag.Int("idleTimeout", 30, "connection idle seconds") v.dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name") @@ -206,7 +208,6 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v *v.compactionMBPerSecond, *v.fileSizeLimitMB, ) - // starting grpc server grpcS := v.startGrpcService(volumeServer) @@ -227,6 +228,11 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v fmt.Println("volume server has be killed") var startTime time.Time + // Stop heartbeats + glog.V(0).Infof("stop send heartbeat and wait %d seconds until shutdown ...", *v.preStopSeconds) + volumeServer.SendHeartbeat = false + time.Sleep(time.Duration(*v.preStopSeconds) * time.Second) + glog.V(0).Infof("end sleep %d sec", *v.preStopSeconds) // firstly, stop the public http service to prevent from receiving new user request if nil != publicHttpDown { startTime = time.Now() diff --git a/weed/command/watch.go b/weed/command/watch.go index b46707a62..9340db141 100644 --- a/weed/command/watch.go +++ b/weed/command/watch.go @@ -4,6 +4,8 @@ import ( "context" "fmt" "io" + "path/filepath" + "strings" "time" "github.com/chrislusf/seaweedfs/weed/pb" @@ -17,7 +19,7 @@ func init() { } var cmdWatch = &Command{ - UsageLine: "watch <wip> [-filer=localhost:8888] [-target=/]", + UsageLine: "watch [-filer=localhost:8888] [-target=/]", Short: "see recent changes on a filer", Long: `See recent changes on a filer. @@ -25,15 +27,55 @@ var cmdWatch = &Command{ } var ( - watchFiler = cmdWatch.Flag.String("filer", "localhost:8888", "filer hostname:port") - watchTarget = cmdWatch.Flag.String("pathPrefix", "/", "path to a folder or file, or common prefix for the folders or files on filer") - watchStart = cmdWatch.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"") + watchFiler = cmdWatch.Flag.String("filer", "localhost:8888", "filer hostname:port") + watchTarget = cmdWatch.Flag.String("pathPrefix", "/", "path to a folder or file, or common prefix for the folders or files on filer") + watchStart = cmdWatch.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"") + watchPattern = cmdWatch.Flag.String("pattern", "", "full path or just filename pattern, ex: \"/home/?opher\", \"*.pdf\", see https://golang.org/pkg/path/filepath/#Match ") ) func runWatch(cmd *Command, args []string) bool { grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") + var filterFunc func(dir, fname string) bool + if *watchPattern != "" { + if strings.Contains(*watchPattern, "/") { + println("watch path pattern", *watchPattern) + filterFunc = func(dir, fname string) bool { + matched, err := filepath.Match(*watchPattern, dir+"/"+fname) + if err != nil { + fmt.Printf("error: %v", err) + } + return matched + } + } else { + println("watch file pattern", *watchPattern) + filterFunc = func(dir, fname string) bool { + matched, err := filepath.Match(*watchPattern, fname) + if err != nil { + fmt.Printf("error: %v", err) + } + return matched + } + } + } + + shouldPrint := func(resp *filer_pb.SubscribeMetadataResponse) bool { + if filterFunc == nil { + return true + } + if resp.EventNotification.OldEntry == nil && resp.EventNotification.NewEntry == nil { + return false + } + if resp.EventNotification.OldEntry != nil && filterFunc(resp.Directory, resp.EventNotification.OldEntry.Name) { + return true + } + if resp.EventNotification.NewEntry != nil && filterFunc(resp.EventNotification.NewParentPath, resp.EventNotification.NewEntry.Name) { + return true + } + return false + } + watchErr := pb.WithFilerClient(*watchFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { stream, err := client.SubscribeMetadata(context.Background(), &filer_pb.SubscribeMetadataRequest{ @@ -53,7 +95,10 @@ func runWatch(cmd *Command, args []string) bool { if listenErr != nil { return listenErr } - fmt.Printf("events: %+v\n", resp.EventNotification) + if !shouldPrint(resp) { + continue + } + fmt.Printf("%+v\n", resp.EventNotification) } }) diff --git a/weed/filer2/abstract_sql/abstract_sql_store.go b/weed/filer2/abstract_sql/abstract_sql_store.go index 5ade18960..3dd4af103 100644 --- a/weed/filer2/abstract_sql/abstract_sql_store.go +++ b/weed/filer2/abstract_sql/abstract_sql_store.go @@ -72,11 +72,25 @@ func (store *AbstractSqlStore) InsertEntry(ctx context.Context, entry *filer2.En return fmt.Errorf("insert %s: %s", entry.FullPath, err) } + affectedRows, err := res.RowsAffected() + if err == nil && affectedRows > 0 { + return nil + } + + // now the insert failed possibly due to duplication constraints + glog.V(1).Infof("insert %s falls back to update: %s", entry.FullPath, err) + + res, err = store.getTxOrDB(ctx).ExecContext(ctx, store.SqlUpdate, meta, util.HashStringToLong(dir), name, dir) + if err != nil { + return fmt.Errorf("upsert %s: %s", entry.FullPath, err) + } + _, err = res.RowsAffected() if err != nil { - return fmt.Errorf("insert %s but no rows affected: %s", entry.FullPath, err) + return fmt.Errorf("upsert %s but no rows affected: %s", entry.FullPath, err) } return nil + } func (store *AbstractSqlStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) { diff --git a/weed/filer2/filechunk_manifest.go b/weed/filer2/filechunk_manifest.go index 037b0c1e8..ba4625bab 100644 --- a/weed/filer2/filechunk_manifest.go +++ b/weed/filer2/filechunk_manifest.go @@ -26,7 +26,18 @@ func HasChunkManifest(chunks []*filer_pb.FileChunk) bool { return false } -func ResolveChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (dataChunks, manifestChunks []*filer_pb.FileChunk, manefestResolveErr error) { +func SeparateManifestChunks(chunks []*filer_pb.FileChunk) (manifestChunks, nonManifestChunks []*filer_pb.FileChunk) { + for _, c := range chunks { + if c.IsChunkManifest { + manifestChunks = append(manifestChunks, c) + } else { + nonManifestChunks = append(nonManifestChunks, c) + } + } + return +} + +func ResolveChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (dataChunks, manifestChunks []*filer_pb.FileChunk, manifestResolveErr error) { // TODO maybe parallel this for _, chunk := range chunks { if !chunk.IsChunkManifest { @@ -34,19 +45,14 @@ func ResolveChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunks []*fil continue } - // IsChunkManifest - data, err := fetchChunk(lookupFileIdFn, chunk.FileId, chunk.CipherKey, chunk.IsCompressed) + resolvedChunks, err := ResolveOneChunkManifest(lookupFileIdFn, chunk) if err != nil { - return chunks, nil, fmt.Errorf("fail to read manifest %s: %v", chunk.FileId, err) - } - m := &filer_pb.FileChunkManifest{} - if err := proto.Unmarshal(data, m); err != nil { - return chunks, nil, fmt.Errorf("fail to unmarshal manifest %s: %v", chunk.FileId, err) + return chunks, nil, err } + manifestChunks = append(manifestChunks, chunk) // recursive - filer_pb.AfterEntryDeserialization(m.Chunks) - dchunks, mchunks, subErr := ResolveChunkManifest(lookupFileIdFn, m.Chunks) + dchunks, mchunks, subErr := ResolveChunkManifest(lookupFileIdFn, resolvedChunks) if subErr != nil { return chunks, nil, subErr } @@ -56,6 +62,26 @@ func ResolveChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunks []*fil return } +func ResolveOneChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunk *filer_pb.FileChunk) (dataChunks []*filer_pb.FileChunk, manifestResolveErr error) { + if !chunk.IsChunkManifest { + return + } + + // IsChunkManifest + data, err := fetchChunk(lookupFileIdFn, chunk.GetFileIdString(), chunk.CipherKey, chunk.IsCompressed) + if err != nil { + return nil, fmt.Errorf("fail to read manifest %s: %v", chunk.GetFileIdString(), err) + } + m := &filer_pb.FileChunkManifest{} + if err := proto.Unmarshal(data, m); err != nil { + return nil, fmt.Errorf("fail to unmarshal manifest %s: %v", chunk.GetFileIdString(), err) + } + + // recursive + filer_pb.AfterEntryDeserialization(m.Chunks) + return m.Chunks, nil +} + // TODO fetch from cache for weed mount? func fetchChunk(lookupFileIdFn LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) { urlString, err := lookupFileIdFn(fileId) diff --git a/weed/filer2/filechunks.go b/weed/filer2/filechunks.go index 1ab6679f9..53c679d6b 100644 --- a/weed/filer2/filechunks.go +++ b/weed/filer2/filechunks.go @@ -158,7 +158,7 @@ func logPrintf(name string, visibles []VisibleInterval) { /* glog.V(0).Infof("%s len %d", name, len(visibles)) for _, v := range visibles { - glog.V(0).Infof("%s: [%d,%d)", name, v.start, v.stop) + glog.V(0).Infof("%s: [%d,%d) %s %d", name, v.start, v.stop, v.fileId, v.chunkOffset) } */ } @@ -169,7 +169,7 @@ var bufPool = sync.Pool{ }, } -func MergeIntoVisibles(visibles, newVisibles []VisibleInterval, chunk *filer_pb.FileChunk) []VisibleInterval { +func MergeIntoVisibles(visibles []VisibleInterval, chunk *filer_pb.FileChunk) (newVisibles []VisibleInterval) { newV := newVisibleInterval(chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.GetFileIdString(), chunk.Mtime, 0, chunk.Size, chunk.CipherKey, chunk.IsCompressed) @@ -183,16 +183,22 @@ func MergeIntoVisibles(visibles, newVisibles []VisibleInterval, chunk *filer_pb. } logPrintf(" before", visibles) + // glog.V(0).Infof("newVisibles %d adding chunk [%d,%d) %s size:%d", len(newVisibles), chunk.Offset, chunk.Offset+int64(chunk.Size), chunk.GetFileIdString(), chunk.Size) chunkStop := chunk.Offset + int64(chunk.Size) for _, v := range visibles { if v.start < chunk.Offset && chunk.Offset < v.stop { - newVisibles = append(newVisibles, newVisibleInterval(v.start, chunk.Offset, v.fileId, v.modifiedTime, v.chunkOffset, v.chunkSize, v.cipherKey, v.isGzipped)) + t := newVisibleInterval(v.start, chunk.Offset, v.fileId, v.modifiedTime, v.chunkOffset, v.chunkSize, v.cipherKey, v.isGzipped) + newVisibles = append(newVisibles, t) + // glog.V(0).Infof("visible %d [%d,%d) =1> [%d,%d)", i, v.start, v.stop, t.start, t.stop) } if v.start < chunkStop && chunkStop < v.stop { - newVisibles = append(newVisibles, newVisibleInterval(chunkStop, v.stop, v.fileId, v.modifiedTime, v.chunkOffset+(chunkStop-v.start), v.chunkSize, v.cipherKey, v.isGzipped)) + t := newVisibleInterval(chunkStop, v.stop, v.fileId, v.modifiedTime, v.chunkOffset+(chunkStop-v.start), v.chunkSize, v.cipherKey, v.isGzipped) + newVisibles = append(newVisibles, t) + // glog.V(0).Infof("visible %d [%d,%d) =2> [%d,%d)", i, v.start, v.stop, t.start, t.stop) } if chunkStop <= v.start || v.stop <= chunk.Offset { newVisibles = append(newVisibles, v) + // glog.V(0).Infof("visible %d [%d,%d) =3> [%d,%d)", i, v.start, v.stop, v.start, v.stop) } } newVisibles = append(newVisibles, newV) @@ -219,17 +225,16 @@ func NonOverlappingVisibleIntervals(lookupFileIdFn LookupFileIdFunctionType, chu chunks, _, err = ResolveChunkManifest(lookupFileIdFn, chunks) sort.Slice(chunks, func(i, j int) bool { - return chunks[i].Mtime < chunks[j].Mtime + if chunks[i].Mtime == chunks[j].Mtime { + return chunks[i].Fid.FileKey < chunks[j].Fid.FileKey + } + return chunks[i].Mtime < chunks[j].Mtime // keep this to make tests run }) - var newVisibles []VisibleInterval for _, chunk := range chunks { // glog.V(0).Infof("merge [%d,%d)", chunk.Offset, chunk.Offset+int64(chunk.Size)) - newVisibles = MergeIntoVisibles(visibles, newVisibles, chunk) - t := visibles[:0] - visibles = newVisibles - newVisibles = t + visibles = MergeIntoVisibles(visibles, chunk) logPrintf("add", visibles) diff --git a/weed/filer2/filechunks2_test.go b/weed/filer2/filechunks2_test.go new file mode 100644 index 000000000..d896da3cc --- /dev/null +++ b/weed/filer2/filechunks2_test.go @@ -0,0 +1,46 @@ +package filer2 + +import ( + "sort" + "testing" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" +) + +func TestCompactFileChunksRealCase(t *testing.T) { + + chunks := []*filer_pb.FileChunk{ + {FileId: "2,512f31f2c0700a", Offset: 0, Size: 25 - 0, Mtime: 5320497}, + {FileId: "6,512f2c2e24e9e8", Offset: 868352, Size: 917585 - 868352, Mtime: 5320492}, + {FileId: "7,514468dd5954ca", Offset: 884736, Size: 901120 - 884736, Mtime: 5325928}, + {FileId: "5,5144463173fe77", Offset: 917504, Size: 2297856 - 917504, Mtime: 5325894}, + {FileId: "4,51444c7ab54e2d", Offset: 2301952, Size: 2367488 - 2301952, Mtime: 5325900}, + {FileId: "4,514450e643ad22", Offset: 2371584, Size: 2420736 - 2371584, Mtime: 5325904}, + {FileId: "6,514456a5e9e4d7", Offset: 2449408, Size: 2490368 - 2449408, Mtime: 5325910}, + {FileId: "3,51444f8d53eebe", Offset: 2494464, Size: 2555904 - 2494464, Mtime: 5325903}, + {FileId: "4,5144578b097c7e", Offset: 2560000, Size: 2596864 - 2560000, Mtime: 5325911}, + {FileId: "3,51445500b6b4ac", Offset: 2637824, Size: 2678784 - 2637824, Mtime: 5325909}, + {FileId: "1,51446285e52a61", Offset: 2695168, Size: 2715648 - 2695168, Mtime: 5325922}, + } + + printChunks("before", chunks) + + compacted, garbage := CompactFileChunks(nil, chunks) + + printChunks("compacted", compacted) + printChunks("garbage", garbage) + +} + +func printChunks(name string, chunks []*filer_pb.FileChunk) { + sort.Slice(chunks, func(i, j int) bool { + if chunks[i].Offset == chunks[j].Offset { + return chunks[i].Mtime < chunks[j].Mtime + } + return chunks[i].Offset < chunks[j].Offset + }) + for _, chunk := range chunks { + glog.V(0).Infof("%s chunk %s [%10d,%10d)", name, chunk.GetFileIdString(), chunk.Offset, chunk.Offset+int64(chunk.Size)) + } +} diff --git a/weed/filer2/filechunks_test.go b/weed/filer2/filechunks_test.go index 70da6e16c..31b74a22a 100644 --- a/weed/filer2/filechunks_test.go +++ b/weed/filer2/filechunks_test.go @@ -4,6 +4,8 @@ import ( "fmt" "log" "math" + "math/rand" + "strconv" "testing" "github.com/stretchr/testify/assert" @@ -62,6 +64,42 @@ func TestCompactFileChunks2(t *testing.T) { } } +func TestRandomFileChunksCompact(t *testing.T) { + + data := make([]byte, 1024) + + var chunks []*filer_pb.FileChunk + for i := 0; i < 15; i++ { + start, stop := rand.Intn(len(data)), rand.Intn(len(data)) + if start > stop { + start, stop = stop, start + } + if start+16 < stop { + stop = start + 16 + } + chunk := &filer_pb.FileChunk{ + FileId: strconv.Itoa(i), + Offset: int64(start), + Size: uint64(stop - start), + Mtime: int64(i), + Fid: &filer_pb.FileId{FileKey: uint64(i)}, + } + chunks = append(chunks, chunk) + for x := start; x < stop; x++ { + data[x] = byte(i) + } + } + + visibles, _ := NonOverlappingVisibleIntervals(nil, chunks) + + for _, v := range visibles { + for x := v.start; x < v.stop; x++ { + assert.Equal(t, strconv.Itoa(int(data[x])), v.fileId) + } + } + +} + func TestIntervalMerging(t *testing.T) { testcases := []struct { @@ -142,12 +180,12 @@ func TestIntervalMerging(t *testing.T) { // case 6: same updates { Chunks: []*filer_pb.FileChunk{ - {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, - {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, - {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, + {Offset: 0, Size: 100, FileId: "abc", Fid: &filer_pb.FileId{FileKey: 1}, Mtime: 123}, + {Offset: 0, Size: 100, FileId: "axf", Fid: &filer_pb.FileId{FileKey: 2}, Mtime: 123}, + {Offset: 0, Size: 100, FileId: "xyz", Fid: &filer_pb.FileId{FileKey: 3}, Mtime: 123}, }, Expected: []*VisibleInterval{ - {start: 0, stop: 100, fileId: "abc"}, + {start: 0, stop: 100, fileId: "xyz"}, }, }, // case 7: real updates @@ -314,14 +352,14 @@ func TestChunksReading(t *testing.T) { // case 6: same updates { Chunks: []*filer_pb.FileChunk{ - {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, - {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, - {Offset: 0, Size: 100, FileId: "abc", Mtime: 123}, + {Offset: 0, Size: 100, FileId: "abc", Fid: &filer_pb.FileId{FileKey: 1}, Mtime: 123}, + {Offset: 0, Size: 100, FileId: "def", Fid: &filer_pb.FileId{FileKey: 2}, Mtime: 123}, + {Offset: 0, Size: 100, FileId: "xyz", Fid: &filer_pb.FileId{FileKey: 3}, Mtime: 123}, }, Offset: 0, Size: 100, Expected: []*ChunkView{ - {Offset: 0, Size: 100, FileId: "abc", LogicOffset: 0}, + {Offset: 0, Size: 100, FileId: "xyz", LogicOffset: 0}, }, }, // case 7: edge cases diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go index d3dfa5a6f..a3b7709ad 100644 --- a/weed/filer2/filer.go +++ b/weed/filer2/filer.go @@ -16,7 +16,10 @@ import ( "github.com/chrislusf/seaweedfs/weed/wdclient" ) -const PaginationSize = 1024 * 256 +const ( + LogFlushInterval = time.Minute + PaginationSize = 1024 * 256 +) var ( OS_UID = uint32(os.Getuid()) @@ -36,6 +39,7 @@ type Filer struct { metaLogCollection string metaLogReplication string MetaAggregator *MetaAggregator + Signature int32 } func NewFiler(masters []string, grpcDialOption grpc.DialOption, @@ -44,8 +48,9 @@ func NewFiler(masters []string, grpcDialOption grpc.DialOption, MasterClient: wdclient.NewMasterClient(grpcDialOption, "filer", filerHost, filerGrpcPort, masters), fileIdDeletionQueue: util.NewUnboundedQueue(), GrpcDialOption: grpcDialOption, + Signature: util.RandomInt32(), } - f.LocalMetaLogBuffer = log_buffer.NewLogBuffer(time.Minute, f.logFlushFunc, notifyFn) + f.LocalMetaLogBuffer = log_buffer.NewLogBuffer(LogFlushInterval, f.logFlushFunc, notifyFn) f.metaLogCollection = collection f.metaLogReplication = replication @@ -93,7 +98,7 @@ func (f *Filer) RollbackTransaction(ctx context.Context) error { return f.Store.RollbackTransaction(ctx) } -func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFromOtherCluster bool) error { +func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFromOtherCluster bool, signatures []int32) error { if string(entry.FullPath) == "/" { return nil @@ -143,7 +148,7 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr } } else { f.maybeAddBucket(dirEntry) - f.NotifyUpdateEvent(ctx, nil, dirEntry, false, isFromOtherCluster) + f.NotifyUpdateEvent(ctx, nil, dirEntry, false, isFromOtherCluster, nil) } } else if !dirEntry.IsDirectory() { @@ -191,7 +196,7 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr } f.maybeAddBucket(entry) - f.NotifyUpdateEvent(ctx, oldEntry, entry, true, isFromOtherCluster) + f.NotifyUpdateEvent(ctx, oldEntry, entry, true, isFromOtherCluster, signatures) f.deleteChunksIfNotNew(oldEntry, entry) diff --git a/weed/filer2/filer_delete_entry.go b/weed/filer2/filer_delete_entry.go index 926569b30..797b0f651 100644 --- a/weed/filer2/filer_delete_entry.go +++ b/weed/filer2/filer_delete_entry.go @@ -10,7 +10,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) -func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isFromOtherCluster bool) (err error) { +func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isFromOtherCluster bool, signatures []int32) (err error) { if p == "/" { return nil } @@ -36,7 +36,7 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isR } // delete the file or folder - err = f.doDeleteEntryMetaAndData(ctx, entry, shouldDeleteChunks, isFromOtherCluster) + err = f.doDeleteEntryMetaAndData(ctx, entry, shouldDeleteChunks, isFromOtherCluster, signatures) if err != nil { return fmt.Errorf("delete file %s: %v", p, err) } @@ -76,7 +76,7 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError, shouldDeleteChunks, false) chunks = append(chunks, dirChunks...) } else { - f.NotifyUpdateEvent(ctx, sub, nil, shouldDeleteChunks, isFromOtherCluster) + f.NotifyUpdateEvent(ctx, sub, nil, shouldDeleteChunks, isFromOtherCluster, nil) chunks = append(chunks, sub.Chunks...) } if err != nil && !ignoreRecursiveError { @@ -95,12 +95,12 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry return nil, fmt.Errorf("filer store delete: %v", storeDeletionErr) } - f.NotifyUpdateEvent(ctx, entry, nil, shouldDeleteChunks, isFromOtherCluster) + f.NotifyUpdateEvent(ctx, entry, nil, shouldDeleteChunks, isFromOtherCluster, nil) return chunks, nil } -func (f *Filer) doDeleteEntryMetaAndData(ctx context.Context, entry *Entry, shouldDeleteChunks bool, isFromOtherCluster bool) (err error) { +func (f *Filer) doDeleteEntryMetaAndData(ctx context.Context, entry *Entry, shouldDeleteChunks bool, isFromOtherCluster bool, signatures []int32) (err error) { glog.V(3).Infof("deleting entry %v, delete chunks: %v", entry.FullPath, shouldDeleteChunks) @@ -108,7 +108,7 @@ func (f *Filer) doDeleteEntryMetaAndData(ctx context.Context, entry *Entry, shou return fmt.Errorf("filer store delete: %v", storeDeletionErr) } if !entry.IsDirectory() { - f.NotifyUpdateEvent(ctx, entry, nil, shouldDeleteChunks, isFromOtherCluster) + f.NotifyUpdateEvent(ctx, entry, nil, shouldDeleteChunks, isFromOtherCluster, signatures) } return nil diff --git a/weed/filer2/filer_deletion.go b/weed/filer2/filer_deletion.go index 2ff9dac63..dbee4a61d 100644 --- a/weed/filer2/filer_deletion.go +++ b/weed/filer2/filer_deletion.go @@ -70,16 +70,21 @@ func (f *Filer) loopProcessingDeletion() { func (f *Filer) DeleteChunks(chunks []*filer_pb.FileChunk) { for _, chunk := range chunks { + if !chunk.IsChunkManifest { + f.fileIdDeletionQueue.EnQueue(chunk.GetFileIdString()) + continue + } + dataChunks, manifestResolveErr := ResolveOneChunkManifest(f.MasterClient.LookupFileId, chunk) + if manifestResolveErr != nil { + glog.V(0).Infof("failed to resolve manifest %s: %v", chunk.FileId, manifestResolveErr) + } + for _, dChunk := range dataChunks { + f.fileIdDeletionQueue.EnQueue(dChunk.GetFileIdString()) + } f.fileIdDeletionQueue.EnQueue(chunk.GetFileIdString()) } } -// DeleteFileByFileId direct delete by file id. -// Only used when the fileId is not being managed by snapshots. -func (f *Filer) DeleteFileByFileId(fileId string) { - f.fileIdDeletionQueue.EnQueue(fileId) -} - func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) { if oldEntry == nil { diff --git a/weed/filer2/filer_notify.go b/weed/filer2/filer_notify.go index e5f9eba0a..96776a359 100644 --- a/weed/filer2/filer_notify.go +++ b/weed/filer2/filer_notify.go @@ -15,7 +15,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) -func (f *Filer) NotifyUpdateEvent(ctx context.Context, oldEntry, newEntry *Entry, deleteChunks, isFromOtherCluster bool) { +func (f *Filer) NotifyUpdateEvent(ctx context.Context, oldEntry, newEntry *Entry, deleteChunks, isFromOtherCluster bool, signatures []int32) { var fullpath string if oldEntry != nil { fullpath = string(oldEntry.FullPath) @@ -41,6 +41,7 @@ func (f *Filer) NotifyUpdateEvent(ctx context.Context, oldEntry, newEntry *Entry DeleteChunks: deleteChunks, NewParentPath: newParentPath, IsFromOtherCluster: isFromOtherCluster, + Signatures: append(signatures, f.Signature), } if notification.Queue != nil { @@ -67,12 +68,14 @@ func (f *Filer) logMetaEvent(ctx context.Context, fullpath string, eventNotifica return } - f.LocalMetaLogBuffer.AddToBuffer([]byte(dir), data) + f.LocalMetaLogBuffer.AddToBuffer([]byte(dir), data, event.TsNs) } func (f *Filer) logFlushFunc(startTime, stopTime time.Time, buf []byte) { + startTime, stopTime = startTime.UTC(), stopTime.UTC() + targetFile := fmt.Sprintf("%s/%04d-%02d-%02d/%02d-%02d.segment", SystemLogDir, startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(), // startTime.Second(), startTime.Nanosecond(), @@ -90,6 +93,7 @@ func (f *Filer) logFlushFunc(startTime, stopTime time.Time, buf []byte) { func (f *Filer) ReadPersistedLogBuffer(startTime time.Time, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (lastTsNs int64, err error) { + startTime = startTime.UTC() startDate := fmt.Sprintf("%04d-%02d-%02d", startTime.Year(), startTime.Month(), startTime.Day()) startHourMinute := fmt.Sprintf("%02d-%02d.segment", startTime.Hour(), startTime.Minute()) @@ -118,7 +122,7 @@ func (f *Filer) ReadPersistedLogBuffer(startTime time.Time, eachLogEntryFn func( if lastTsNs, err = ReadEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, eachLogEntryFn); err != nil { chunkedFileReader.Close() if err == io.EOF { - break + continue } return lastTsNs, fmt.Errorf("reading %s: %v", hourMinuteEntry.FullPath, err) } diff --git a/weed/filer2/filer_notify_append.go b/weed/filer2/filer_notify_append.go index 61bbc9c45..31cdb3c1c 100644 --- a/weed/filer2/filer_notify_append.go +++ b/weed/filer2/filer_notify_append.go @@ -41,7 +41,7 @@ func (f *Filer) appendToFile(targetFile string, data []byte) error { entry.Chunks = append(entry.Chunks, uploadResult.ToPbFileChunk(assignResult.Fid, offset)) // update the entry - err = f.CreateEntry(context.Background(), entry, false, false) + err = f.CreateEntry(context.Background(), entry, false, false, nil) return err } diff --git a/weed/filer2/filerstore.go b/weed/filer2/filerstore.go index 7c518c6fe..ab1d0e659 100644 --- a/weed/filer2/filerstore.go +++ b/weed/filer2/filerstore.go @@ -63,6 +63,9 @@ func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) err }() filer_pb.BeforeEntrySerialization(entry.Chunks) + if entry.Mime == "application/octet-stream" { + entry.Mime = "" + } return fsw.ActualStore.InsertEntry(ctx, entry) } @@ -74,6 +77,9 @@ func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) err }() filer_pb.BeforeEntrySerialization(entry.Chunks) + if entry.Mime == "application/octet-stream" { + entry.Mime = "" + } return fsw.ActualStore.UpdateEntry(ctx, entry) } diff --git a/weed/filer2/leveldb/leveldb_store_test.go b/weed/filer2/leveldb/leveldb_store_test.go index 81c761f56..5c064ee5a 100644 --- a/weed/filer2/leveldb/leveldb_store_test.go +++ b/weed/filer2/leveldb/leveldb_store_test.go @@ -31,7 +31,7 @@ func TestCreateAndFind(t *testing.T) { }, } - if err := filer.CreateEntry(ctx, entry1, false, false); err != nil { + if err := filer.CreateEntry(ctx, entry1, false, false, nil); err != nil { t.Errorf("create entry %v: %v", entry1.FullPath, err) return } diff --git a/weed/filer2/leveldb2/leveldb2_store_test.go b/weed/filer2/leveldb2/leveldb2_store_test.go index 27c1c954b..5faecf93e 100644 --- a/weed/filer2/leveldb2/leveldb2_store_test.go +++ b/weed/filer2/leveldb2/leveldb2_store_test.go @@ -31,7 +31,7 @@ func TestCreateAndFind(t *testing.T) { }, } - if err := filer.CreateEntry(ctx, entry1, false, false); err != nil { + if err := filer.CreateEntry(ctx, entry1, false, false, nil); err != nil { t.Errorf("create entry %v: %v", entry1.FullPath, err) return } diff --git a/weed/filer2/meta_aggregator.go b/weed/filer2/meta_aggregator.go index 00fcf27d1..f2792bd26 100644 --- a/weed/filer2/meta_aggregator.go +++ b/weed/filer2/meta_aggregator.go @@ -25,13 +25,15 @@ type MetaAggregator struct { ListenersCond *sync.Cond } +// MetaAggregator only aggregates data "on the fly". The logs are not re-persisted to disk. +// The old data comes from what each LocalMetadata persisted on disk. func NewMetaAggregator(filers []string, grpcDialOption grpc.DialOption) *MetaAggregator { t := &MetaAggregator{ filers: filers, grpcDialOption: grpcDialOption, } t.ListenersCond = sync.NewCond(&t.ListenersLock) - t.MetaLogBuffer = log_buffer.NewLogBuffer(time.Minute, nil, func() { + t.MetaLogBuffer = log_buffer.NewLogBuffer(LogFlushInterval, nil, func() { t.ListenersCond.Broadcast() }) return t @@ -48,7 +50,7 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, filer strin var maybeReplicateMetadataChange func(*filer_pb.SubscribeMetadataResponse) lastPersistTime := time.Now() changesSinceLastPersist := 0 - lastTsNs := int64(0) + lastTsNs := time.Now().Add(-LogFlushInterval).UnixNano() MaxChangeLimit := 100 @@ -88,7 +90,7 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, filer strin } dir := event.Directory // println("received meta change", dir, "size", len(data)) - ma.MetaLogBuffer.AddToBuffer([]byte(dir), data) + ma.MetaLogBuffer.AddToBuffer([]byte(dir), data, event.TsNs) if maybeReplicateMetadataChange != nil { maybeReplicateMetadataChange(event) } diff --git a/weed/filer2/reader_at.go b/weed/filer2/reader_at.go index 0bf528a42..0cea83ff9 100644 --- a/weed/filer2/reader_at.go +++ b/weed/filer2/reader_at.go @@ -109,7 +109,7 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) { glog.V(4).Infof("doReadAt [%d,%d), n:%v, err:%v", offset, offset+int64(len(p)), n, err) if err == nil && remaining > 0 && c.fileSize > startOffset { - delta := int(min(remaining, c.fileSize - startOffset)) + delta := int(min(remaining, c.fileSize-startOffset)) glog.V(4).Infof("zero2 [%d,%d) of file size %d bytes", startOffset, startOffset+int64(delta), c.fileSize) n += delta } @@ -129,7 +129,7 @@ func (c *ChunkReadAt) readFromWholeChunkData(chunkView *ChunkView) (chunkData [] chunkData = c.chunkCache.GetChunk(chunkView.FileId, chunkView.ChunkSize) if chunkData != nil { - glog.V(5).Infof("cache hit %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset-chunkView.Offset, chunkView.LogicOffset-chunkView.Offset+int64(len(chunkData))) + glog.V(4).Infof("cache hit %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset-chunkView.Offset, chunkView.LogicOffset-chunkView.Offset+int64(len(chunkData))) } else { glog.V(4).Infof("doFetchFullChunkData %s", chunkView.FileId) chunkData, err = c.doFetchFullChunkData(chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped) diff --git a/weed/filer2/reader_at_test.go b/weed/filer2/reader_at_test.go index 7377c5dbc..7bfc9a972 100644 --- a/weed/filer2/reader_at_test.go +++ b/weed/filer2/reader_at_test.go @@ -27,33 +27,33 @@ func TestReaderAt(t *testing.T) { visibles := []VisibleInterval{ { - start: 1, - stop: 2, - fileId: "1", + start: 1, + stop: 2, + fileId: "1", chunkSize: 9, }, { - start: 3, - stop: 4, - fileId: "3", + start: 3, + stop: 4, + fileId: "3", chunkSize: 1, }, { - start: 5, - stop: 6, - fileId: "5", + start: 5, + stop: 6, + fileId: "5", chunkSize: 2, }, { - start: 7, - stop: 9, - fileId: "7", + start: 7, + stop: 9, + fileId: "7", chunkSize: 2, }, { - start: 9, - stop: 10, - fileId: "9", + start: 9, + stop: 10, + fileId: "9", chunkSize: 2, }, } @@ -95,15 +95,15 @@ func TestReaderAt0(t *testing.T) { visibles := []VisibleInterval{ { - start: 2, - stop: 5, - fileId: "1", + start: 2, + stop: 5, + fileId: "1", chunkSize: 9, }, { - start: 7, - stop: 9, - fileId: "2", + start: 7, + stop: 9, + fileId: "2", chunkSize: 9, }, } @@ -129,9 +129,9 @@ func TestReaderAt1(t *testing.T) { visibles := []VisibleInterval{ { - start: 2, - stop: 5, - fileId: "1", + start: 2, + stop: 5, + fileId: "1", chunkSize: 9, }, } diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go index 46f5f22ed..f85b90a5d 100644 --- a/weed/filesys/dir.go +++ b/weed/filesys/dir.go @@ -63,7 +63,7 @@ func (dir *Dir) Attr(ctx context.Context, attr *fuse.Attr) error { attr.Gid = dir.entry.Attributes.Gid attr.Uid = dir.entry.Attributes.Uid - glog.V(5).Infof("dir Attr %s, attr: %+v", dir.FullPath(), attr) + glog.V(4).Infof("dir Attr %s, attr: %+v", dir.FullPath(), attr) return nil } @@ -142,7 +142,8 @@ func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest, TtlSec: dir.wfs.option.TtlSec, }, }, - OExcl: req.Flags&fuse.OpenExclusive != 0, + OExcl: req.Flags&fuse.OpenExclusive != 0, + Signatures: []int32{dir.wfs.signature}, } glog.V(1).Infof("create %s/%s: %v", dir.FullPath(), req.Name, req.Flags) @@ -151,6 +152,7 @@ func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest, if strings.Contains(err.Error(), "EEXIST") { return fuse.EEXIST } + glog.V(0).Infof("create %s/%s: %v", dir.FullPath(), req.Name, err) return fuse.EIO } @@ -168,7 +170,6 @@ func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest, node = dir.newFile(req.Name, request.Entry) file := node.(*File) - file.isOpen++ fh := dir.wfs.AcquireHandle(file, req.Uid, req.Gid) return file, fh, nil @@ -193,8 +194,9 @@ func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, err err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.CreateEntryRequest{ - Directory: dir.FullPath(), - Entry: newEntry, + Directory: dir.FullPath(), + Entry: newEntry, + Signatures: []int32{dir.wfs.signature}, } glog.V(1).Infof("mkdir: %v", request) @@ -221,7 +223,7 @@ func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, err func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.LookupResponse) (node fs.Node, err error) { - glog.V(5).Infof("dir Lookup %s: %s by %s", dir.FullPath(), req.Name, req.Header.String()) + glog.V(4).Infof("dir Lookup %s: %s by %s", dir.FullPath(), req.Name, req.Header.String()) fullFilePath := util.NewFullPath(dir.FullPath(), req.Name) dirPath := util.FullPath(dir.FullPath()) @@ -240,7 +242,7 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse. return nil, fuse.ENOENT } } else { - glog.V(5).Infof("dir Lookup cache hit %s", fullFilePath) + glog.V(4).Infof("dir Lookup cache hit %s", fullFilePath) } if entry != nil { @@ -268,7 +270,7 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse. func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) { - glog.V(5).Infof("dir ReadDirAll %s", dir.FullPath()) + glog.V(4).Infof("dir ReadDirAll %s", dir.FullPath()) processEachEntryFn := func(entry *filer_pb.Entry, isLast bool) error { fullpath := util.NewFullPath(dir.FullPath(), entry.Name) @@ -317,10 +319,9 @@ func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error { return nil } - // first, ensure the filer store can correctly delete glog.V(3).Infof("remove file: %v", req) - err = filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, false, false, false, false) + err = filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, false, false, false, false, dir.wfs.signature) if err != nil { glog.V(3).Infof("not found remove file %s/%s: %v", dir.FullPath(), req.Name, err) return fuse.ENOENT @@ -340,10 +341,10 @@ func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error { func (dir *Dir) removeFolder(req *fuse.RemoveRequest) error { glog.V(3).Infof("remove directory entry: %v", req) - err := filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, true, false, false, false) + err := filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, true, false, false, false, dir.wfs.signature) if err != nil { glog.V(0).Infof("remove %s/%s: %v", dir.FullPath(), req.Name, err) - if strings.Contains(err.Error(), "non-empty"){ + if strings.Contains(err.Error(), "non-empty") { return fuse.EEXIST } return fuse.ENOENT @@ -434,7 +435,7 @@ func (dir *Dir) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, resp } func (dir *Dir) Forget() { - glog.V(5).Infof("Forget dir %s", dir.FullPath()) + glog.V(4).Infof("Forget dir %s", dir.FullPath()) dir.wfs.fsNodeCache.DeleteFsNode(util.FullPath(dir.FullPath())) } @@ -458,8 +459,9 @@ func (dir *Dir) saveEntry() error { return dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.UpdateEntryRequest{ - Directory: parentDir, - Entry: dir.entry, + Directory: parentDir, + Entry: dir.entry, + Signatures: []int32{dir.wfs.signature}, } glog.V(1).Infof("save dir entry: %v", request) diff --git a/weed/filesys/dir_link.go b/weed/filesys/dir_link.go index bd564f413..d813dd96a 100644 --- a/weed/filesys/dir_link.go +++ b/weed/filesys/dir_link.go @@ -34,6 +34,7 @@ func (dir *Dir) Symlink(ctx context.Context, req *fuse.SymlinkRequest) (fs.Node, SymlinkTarget: req.Target, }, }, + Signatures: []int32{dir.wfs.signature}, } err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go index 88a1a4f55..1ab7d0961 100644 --- a/weed/filesys/dirty_page.go +++ b/weed/filesys/dirty_page.go @@ -29,10 +29,7 @@ var counter = int32(0) func (pages *ContinuousDirtyPages) AddPage(offset int64, data []byte) (chunks []*filer_pb.FileChunk, err error) { - pages.lock.Lock() - defer pages.lock.Unlock() - - glog.V(5).Infof("%s AddPage [%d,%d) of %d bytes", pages.f.fullpath(), offset, offset+int64(len(data)), pages.f.entry.Attributes.FileSize) + glog.V(4).Infof("%s AddPage [%d,%d) of %d bytes", pages.f.fullpath(), offset, offset+int64(len(data)), pages.f.entry.Attributes.FileSize) if len(data) > int(pages.f.wfs.option.ChunkSizeLimit) { // this is more than what buffer can hold. @@ -82,14 +79,6 @@ func (pages *ContinuousDirtyPages) flushAndSave(offset int64, data []byte) (chun return } -func (pages *ContinuousDirtyPages) FlushToStorage() (chunks []*filer_pb.FileChunk, err error) { - - pages.lock.Lock() - defer pages.lock.Unlock() - - return pages.saveExistingPagesToStorage() -} - func (pages *ContinuousDirtyPages) saveExistingPagesToStorage() (chunks []*filer_pb.FileChunk, err error) { var hasSavedData bool @@ -103,7 +92,9 @@ func (pages *ContinuousDirtyPages) saveExistingPagesToStorage() (chunks []*filer } if err == nil { - chunks = append(chunks, chunk) + if chunk != nil { + chunks = append(chunks, chunk) + } } else { return } @@ -121,9 +112,14 @@ func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (chunk *fi fileSize := int64(pages.f.entry.Attributes.FileSize) for { chunkSize := min(maxList.Size(), fileSize-maxList.Offset()) + if chunkSize == 0 { + return + } chunk, err = pages.saveToStorage(maxList.ToReader(), maxList.Offset(), chunkSize) if err == nil { - hasSavedData = true + if chunk != nil { + hasSavedData = true + } glog.V(4).Infof("saveToStorage %s %s [%d,%d) of %d bytes", pages.f.fullpath(), chunk.GetFileIdString(), maxList.Offset(), maxList.Offset()+chunkSize, fileSize) return } else { @@ -170,10 +166,5 @@ func min(x, y int64) int64 { } func (pages *ContinuousDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) { - - pages.lock.Lock() - defer pages.lock.Unlock() - return pages.intervals.ReadDataAt(data, startOffset) - } diff --git a/weed/filesys/dirty_page_interval_test.go b/weed/filesys/dirty_page_interval_test.go index ab3b37b7c..d02ad27fd 100644 --- a/weed/filesys/dirty_page_interval_test.go +++ b/weed/filesys/dirty_page_interval_test.go @@ -2,6 +2,7 @@ package filesys import ( "bytes" + "math/rand" "testing" ) @@ -66,6 +67,29 @@ func TestContinuousIntervals_RealCase1(t *testing.T) { } +func TestRandomWrites(t *testing.T) { + + c := &ContinuousIntervals{} + + data := make([]byte, 1024) + + for i := 0; i < 1024; i++ { + + start, stop := rand.Intn(len(data)), rand.Intn(len(data)) + if start > stop { + start, stop = stop, start + } + + rand.Read(data[start : stop+1]) + + c.AddInterval(data[start:stop+1], int64(start)) + + expectedData(t, c, 0, data...) + + } + +} + func expectedData(t *testing.T, c *ContinuousIntervals, offset int, data ...byte) { start, stop := int64(offset), int64(offset+len(data)) for _, list := range c.lists { diff --git a/weed/filesys/file.go b/weed/filesys/file.go index d57f6cc57..d2117bfbb 100644 --- a/weed/filesys/file.go +++ b/weed/filesys/file.go @@ -45,7 +45,7 @@ func (file *File) fullpath() util.FullPath { func (file *File) Attr(ctx context.Context, attr *fuse.Attr) error { - glog.V(5).Infof("file Attr %s, open:%v, existing attr: %+v", file.fullpath(), file.isOpen, attr) + glog.V(4).Infof("file Attr %s, open:%v, existing attr: %+v", file.fullpath(), file.isOpen, attr) if file.isOpen <= 0 { if err := file.maybeLoadEntry(ctx); err != nil { @@ -87,8 +87,6 @@ func (file *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.Op glog.V(4).Infof("file %v open %+v", file.fullpath(), req) - file.isOpen++ - handle := file.wfs.AcquireHandle(file, req.Uid, req.Gid) resp.Handle = fuse.HandleID(handle.handle) @@ -101,16 +99,26 @@ func (file *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.Op func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error { - glog.V(5).Infof("%v file setattr %+v", file.fullpath(), req) + glog.V(4).Infof("%v file setattr %+v", file.fullpath(), req) if err := file.maybeLoadEntry(ctx); err != nil { return err } + if file.isOpen > 0 { + file.wfs.handlesLock.Lock() + fileHandle := file.wfs.handles[file.fullpath().AsInode()] + file.wfs.handlesLock.Unlock() + + if fileHandle != nil { + fileHandle.Lock() + defer fileHandle.Unlock() + } + } if req.Valid.Size() { glog.V(4).Infof("%v file setattr set size=%v chunks=%d", file.fullpath(), req.Size, len(file.entry.Chunks)) - if req.Size < filer2.TotalSize(file.entry.Chunks) { + if req.Size < filer2.FileSize(file.entry) { // fmt.Printf("truncate %v \n", fullPath) var chunks []*filer_pb.FileChunk var truncatedChunks []*filer_pb.FileChunk @@ -129,10 +137,10 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f } } } - file.wfs.deleteFileChunks(truncatedChunks) file.entry.Chunks = chunks file.entryViewCache = nil file.reader = nil + file.wfs.deleteFileChunks(truncatedChunks) } file.entry.Attributes.FileSize = req.Size file.dirtyMetadata = true @@ -237,12 +245,12 @@ func (file *File) Fsync(ctx context.Context, req *fuse.FsyncRequest) error { func (file *File) Forget() { t := util.NewFullPath(file.dir.FullPath(), file.Name) - glog.V(5).Infof("Forget file %s", t) + glog.V(4).Infof("Forget file %s", t) file.wfs.fsNodeCache.DeleteFsNode(t) } func (file *File) maybeLoadEntry(ctx context.Context) error { - if file.entry == nil || file.isOpen <= 0 { + if file.entry == nil && file.isOpen <= 0 { entry, err := file.wfs.maybeLoadEntry(file.dir.FullPath(), file.Name) if err != nil { glog.V(3).Infof("maybeLoadEntry file %s/%s: %v", file.dir.FullPath(), file.Name, err) @@ -258,15 +266,14 @@ func (file *File) maybeLoadEntry(ctx context.Context) error { func (file *File) addChunks(chunks []*filer_pb.FileChunk) { sort.Slice(chunks, func(i, j int) bool { + if chunks[i].Mtime == chunks[j].Mtime { + return chunks[i].Fid.FileKey < chunks[j].Fid.FileKey + } return chunks[i].Mtime < chunks[j].Mtime }) - var newVisibles []filer2.VisibleInterval for _, chunk := range chunks { - newVisibles = filer2.MergeIntoVisibles(file.entryViewCache, newVisibles, chunk) - t := file.entryViewCache[:0] - file.entryViewCache = newVisibles - newVisibles = t + file.entryViewCache = filer2.MergeIntoVisibles(file.entryViewCache, chunk) } file.reader = nil @@ -286,8 +293,9 @@ func (file *File) saveEntry() error { return file.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.UpdateEntryRequest{ - Directory: file.dir.FullPath(), - Entry: file.entry, + Directory: file.dir.FullPath(), + Entry: file.entry, + Signatures: []int32{file.wfs.signature}, } glog.V(4).Infof("save file entry: %v", request) diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index 94590f842..a1f18df6f 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -7,6 +7,7 @@ import ( "math" "net/http" "os" + "sync" "time" "github.com/seaweedfs/fuse" @@ -22,6 +23,7 @@ type FileHandle struct { dirtyPages *ContinuousDirtyPages contentType string handle uint64 + sync.RWMutex f *File RequestId fuse.RequestID // unique ID for request @@ -41,6 +43,7 @@ func newFileHandle(file *File, uid, gid uint32) *FileHandle { if fh.f.entry != nil { fh.f.entry.Attributes.FileSize = filer2.FileSize(fh.f.entry) } + return fh } @@ -55,6 +58,12 @@ var _ = fs.HandleReleaser(&FileHandle{}) func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error { glog.V(4).Infof("%s read fh %d: [%d,%d) size %d resp.Data cap=%d", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(req.Size), req.Size, cap(resp.Data)) + fh.RLock() + defer fh.RUnlock() + + if req.Size <= 0 { + return nil + } buff := resp.Data[:cap(resp.Data)] if req.Size > cap(resp.Data) { @@ -65,25 +74,27 @@ func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fus totalRead, err := fh.readFromChunks(buff, req.Offset) if err == nil { maxStop := fh.readFromDirtyPages(buff, req.Offset) - totalRead = max(maxStop - req.Offset, totalRead) + totalRead = max(maxStop-req.Offset, totalRead) } if err != nil { - glog.Errorf("file handle read %s: %v", fh.f.fullpath(), err) - return fuse.EIO + glog.Warningf("file handle read %s %d: %v", fh.f.fullpath(), totalRead, err) + return nil } if totalRead > int64(len(buff)) { glog.Warningf("%s FileHandle Read %d: [%d,%d) size %d totalRead %d", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(req.Size), req.Size, totalRead) totalRead = min(int64(len(buff)), totalRead) } - resp.Data = buff[:totalRead] + // resp.Data = buff[:totalRead] + resp.Data = buff return err } func (fh *FileHandle) readFromDirtyPages(buff []byte, startOffset int64) (maxStop int64) { - return fh.dirtyPages.ReadDirtyDataAt(buff, startOffset) + maxStop = fh.dirtyPages.ReadDirtyDataAt(buff, startOffset) + return } func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { @@ -127,6 +138,9 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { // Write to the file handle func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) error { + fh.Lock() + defer fh.Unlock() + // write the request to volume servers data := make([]byte, len(req.Data)) copy(data, req.Data) @@ -162,34 +176,46 @@ func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) err glog.V(4).Infof("Release %v fh %d", fh.f.fullpath(), fh.handle) + fh.Lock() + defer fh.Unlock() + fh.f.isOpen-- - if fh.f.isOpen <= 0 { + if fh.f.isOpen < 0 { + glog.V(0).Infof("Release reset %s open count %d => %d", fh.f.Name, fh.f.isOpen, 0) + fh.f.isOpen = 0 + return nil + } + + if fh.f.isOpen == 0 { fh.doFlush(ctx, req.Header) fh.f.wfs.ReleaseHandle(fh.f.fullpath(), fuse.HandleID(fh.handle)) - fh.f.entryViewCache = nil - fh.f.reader = nil } return nil } func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { + + fh.Lock() + defer fh.Unlock() + return fh.doFlush(ctx, req.Header) } func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error { // fflush works at fh level // send the data to the OS - glog.V(4).Infof("doFlush %s fh %d %v", fh.f.fullpath(), fh.handle, header) + glog.V(4).Infof("doFlush %s fh %d", fh.f.fullpath(), fh.handle) - chunks, err := fh.dirtyPages.FlushToStorage() + chunks, err := fh.dirtyPages.saveExistingPagesToStorage() if err != nil { glog.Errorf("flush %s: %v", fh.f.fullpath(), err) return fuse.EIO } if len(chunks) > 0 { + fh.f.addChunks(chunks) fh.f.dirtyMetadata = true } @@ -218,8 +244,9 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error { } request := &filer_pb.CreateEntryRequest{ - Directory: fh.f.dir.FullPath(), - Entry: fh.f.entry, + Directory: fh.f.dir.FullPath(), + Entry: fh.f.entry, + Signatures: []int32{fh.f.wfs.signature}, } glog.V(4).Infof("%s set chunks: %v", fh.f.fullpath(), len(fh.f.entry.Chunks)) @@ -227,18 +254,16 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error { glog.V(4).Infof("%s chunks %d: %v [%d,%d)", fh.f.fullpath(), i, chunk.GetFileIdString(), chunk.Offset, chunk.Offset+int64(chunk.Size)) } - chunks, garbages := filer2.CompactFileChunks(filer2.LookupFn(fh.f.wfs), fh.f.entry.Chunks) + manifestChunks, nonManifestChunks := filer2.SeparateManifestChunks(fh.f.entry.Chunks) + + chunks, _ := filer2.CompactFileChunks(filer2.LookupFn(fh.f.wfs), nonManifestChunks) chunks, manifestErr := filer2.MaybeManifestize(fh.f.wfs.saveDataAsChunk(fh.f.dir.FullPath()), chunks) if manifestErr != nil { // not good, but should be ok glog.V(0).Infof("MaybeManifestize: %v", manifestErr) } - fh.f.entry.Chunks = chunks - // fh.f.entryViewCache = nil - - // special handling of one chunk md5 - if len(chunks) == 1 { - } + fh.f.entry.Chunks = append(chunks, manifestChunks...) + fh.f.entryViewCache = nil if err := filer_pb.CreateEntry(client, request); err != nil { glog.Errorf("fh flush create %s: %v", fh.f.fullpath(), err) @@ -247,11 +272,6 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error { fh.f.wfs.metaCache.InsertEntry(context.Background(), filer2.FromPbEntry(request.Directory, request.Entry)) - fh.f.wfs.deleteFileChunks(garbages) - for i, chunk := range garbages { - glog.V(4).Infof("garbage %s chunks %d: %v [%d,%d)", fh.f.fullpath(), i, chunk.GetFileIdString(), chunk.Offset, chunk.Offset+int64(chunk.Size)) - } - return nil }) diff --git a/weed/filesys/fscache_test.go b/weed/filesys/fscache_test.go index 8bfae1472..1152eb32e 100644 --- a/weed/filesys/fscache_test.go +++ b/weed/filesys/fscache_test.go @@ -95,7 +95,6 @@ func TestFsCacheMove(t *testing.T) { } - func TestFsCacheMove2(t *testing.T) { cache := newFsCache(nil) @@ -114,4 +113,3 @@ func TestFsCacheMove2(t *testing.T) { } } - diff --git a/weed/filesys/meta_cache/meta_cache.go b/weed/filesys/meta_cache/meta_cache.go index 15ec0903d..c0bb75f4a 100644 --- a/weed/filesys/meta_cache/meta_cache.go +++ b/weed/filesys/meta_cache/meta_cache.go @@ -50,6 +50,10 @@ func openMetaStore(dbFolder string) filer2.FilerStore { func (mc *MetaCache) InsertEntry(ctx context.Context, entry *filer2.Entry) error { mc.Lock() defer mc.Unlock() + return mc.doInsertEntry(ctx, entry) +} + +func (mc *MetaCache) doInsertEntry(ctx context.Context, entry *filer2.Entry) error { filer_pb.BeforeEntrySerialization(entry.Chunks) return mc.actualStore.InsertEntry(ctx, entry) } diff --git a/weed/filesys/meta_cache/meta_cache_init.go b/weed/filesys/meta_cache/meta_cache_init.go index cd98f4a7c..05983ec7d 100644 --- a/weed/filesys/meta_cache/meta_cache_init.go +++ b/weed/filesys/meta_cache/meta_cache_init.go @@ -14,11 +14,11 @@ func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.Full mc.visitedBoundary.EnsureVisited(dirPath, func(path util.FullPath) (childDirectories []string, err error) { - glog.V(5).Infof("ReadDirAllEntries %s ...", path) + glog.V(4).Infof("ReadDirAllEntries %s ...", path) err = filer_pb.ReadDirAllEntries(client, dirPath, "", func(pbEntry *filer_pb.Entry, isLast bool) error { entry := filer2.FromPbEntry(string(dirPath), pbEntry) - if err := mc.InsertEntry(context.Background(), entry); err != nil { + if err := mc.doInsertEntry(context.Background(), entry); err != nil { glog.V(0).Infof("read %s: %v", entry.FullPath, err) return err } diff --git a/weed/filesys/meta_cache/meta_cache_subscribe.go b/weed/filesys/meta_cache/meta_cache_subscribe.go index ca18411e0..3c0a9c2ac 100644 --- a/weed/filesys/meta_cache/meta_cache_subscribe.go +++ b/weed/filesys/meta_cache/meta_cache_subscribe.go @@ -12,10 +12,17 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) -func SubscribeMetaEvents(mc *MetaCache, client filer_pb.FilerClient, dir string, lastTsNs int64) error { +func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.FilerClient, dir string, lastTsNs int64) error { processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error { message := resp.EventNotification + + for _, sig := range message.Signatures { + if sig == selfSignature && selfSignature != 0 { + return nil + } + } + var oldPath util.FullPath var newEntry *filer2.Entry if message.OldEntry != nil { @@ -41,6 +48,7 @@ func SubscribeMetaEvents(mc *MetaCache, client filer_pb.FilerClient, dir string, ClientName: "mount", PathPrefix: dir, SinceNs: lastTsNs, + Signature: selfSignature, }) if err != nil { return fmt.Errorf("subscribe: %v", err) diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index f147d7548..93819dfa4 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -67,6 +67,7 @@ type WFS struct { chunkCache *chunk_cache.TieredChunkCache metaCache *meta_cache.MetaCache + signature int32 } type statsCache struct { filer_pb.StatisticsResponse @@ -82,6 +83,7 @@ func NewSeaweedFileSystem(option *Option) *WFS { return make([]byte, option.ChunkSizeLimit) }, }, + signature: util.RandomInt32(), } cacheUniqueId := util.Md5String([]byte(option.FilerGrpcAddress + option.FilerMountRootPath + util.Version()))[0:4] cacheDir := path.Join(option.CacheDir, cacheUniqueId) @@ -92,7 +94,7 @@ func NewSeaweedFileSystem(option *Option) *WFS { wfs.metaCache = meta_cache.NewMetaCache(path.Join(cacheDir, "meta")) startTime := time.Now() - go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano()) + go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano()) grace.OnInterrupt(func() { wfs.metaCache.Shutdown() }) @@ -118,10 +120,14 @@ func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHand inodeId := file.fullpath().AsInode() existingHandle, found := wfs.handles[inodeId] if found && existingHandle != nil { + file.isOpen++ return existingHandle } fileHandle = newFileHandle(file, uid, gid) + file.maybeLoadEntry(context.Background()) + file.isOpen++ + wfs.handles[inodeId] = fileHandle fileHandle.handle = inodeId @@ -142,7 +148,7 @@ func (wfs *WFS) ReleaseHandle(fullpath util.FullPath, handleId fuse.HandleID) { // Statfs is called to obtain file system metadata. Implements fuse.FSStatfser func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse.StatfsResponse) error { - glog.V(5).Infof("reading fs stats: %+v", req) + glog.V(4).Infof("reading fs stats: %+v", req) if wfs.stats.lastChecked < time.Now().Unix()-20 { @@ -154,13 +160,13 @@ func (wfs *WFS) Statfs(ctx context.Context, req *fuse.StatfsRequest, resp *fuse. Ttl: fmt.Sprintf("%ds", wfs.option.TtlSec), } - glog.V(5).Infof("reading filer stats: %+v", request) + glog.V(4).Infof("reading filer stats: %+v", request) resp, err := client.Statistics(context.Background(), request) if err != nil { glog.V(0).Infof("reading filer stats %v: %v", request, err) return err } - glog.V(5).Infof("read filer stats: %+v", resp) + glog.V(4).Infof("read filer stats: %+v", resp) wfs.stats.TotalSize = resp.TotalSize wfs.stats.UsedSize = resp.UsedSize diff --git a/weed/filesys/wfs_deletion.go b/weed/filesys/wfs_deletion.go index 203ebdad1..87a4e907f 100644 --- a/weed/filesys/wfs_deletion.go +++ b/weed/filesys/wfs_deletion.go @@ -18,6 +18,17 @@ func (wfs *WFS) deleteFileChunks(chunks []*filer_pb.FileChunk) { var fileIds []string for _, chunk := range chunks { + if !chunk.IsChunkManifest { + fileIds = append(fileIds, chunk.GetFileIdString()) + continue + } + dataChunks, manifestResolveErr := filer2.ResolveOneChunkManifest(filer2.LookupFn(wfs), chunk) + if manifestResolveErr != nil { + glog.V(0).Infof("failed to resolve manifest %s: %v", chunk.FileId, manifestResolveErr) + } + for _, dChunk := range dataChunks { + fileIds = append(fileIds, dChunk.GetFileIdString()) + } fileIds = append(fileIds, chunk.GetFileIdString()) } @@ -38,7 +49,7 @@ func (wfs *WFS) deleteFileIds(grpcDialOption grpc.DialOption, client filer_pb.Se m := make(map[string]operation.LookupResult) - glog.V(5).Infof("deleteFileIds lookup volume id locations: %v", vids) + glog.V(4).Infof("deleteFileIds lookup volume id locations: %v", vids) resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{ VolumeIds: vids, }) diff --git a/weed/filesys/xattr.go b/weed/filesys/xattr.go index 091a70fa3..92e43b675 100644 --- a/weed/filesys/xattr.go +++ b/weed/filesys/xattr.go @@ -119,5 +119,5 @@ func (wfs *WFS) maybeLoadEntry(dir, name string) (entry *filer_pb.Entry, err err if cacheErr == filer_pb.ErrNotFound { return nil, fuse.ENOENT } - return cachedEntry.ToProtoEntry(), nil + return cachedEntry.ToProtoEntry(), cacheErr } diff --git a/weed/messaging/broker/broker_grpc_server.go b/weed/messaging/broker/broker_grpc_server.go index abd5c9f73..1950326ec 100644 --- a/weed/messaging/broker/broker_grpc_server.go +++ b/weed/messaging/broker/broker_grpc_server.go @@ -19,7 +19,7 @@ func (broker *MessageBroker) DeleteTopic(c context.Context, request *messaging_p if exists, err := filer_pb.Exists(broker, dir, entry, true); err != nil { return nil, err } else if exists { - err = filer_pb.Remove(broker, dir, entry, true, true, true, false) + err = filer_pb.Remove(broker, dir, entry, true, true, true, false, 0) } return resp, nil } diff --git a/weed/messaging/broker/broker_grpc_server_publish.go b/weed/messaging/broker/broker_grpc_server_publish.go index dc11061af..154bf8a44 100644 --- a/weed/messaging/broker/broker_grpc_server_publish.go +++ b/weed/messaging/broker/broker_grpc_server_publish.go @@ -85,7 +85,7 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis continue } - tl.logBuffer.AddToBuffer(in.Data.Key, data) + tl.logBuffer.AddToBuffer(in.Data.Key, data, in.Data.EventTimeNs) if in.Data.IsClose { // println("server received closing") diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go index 9a7d653b5..8cc5a928c 100644 --- a/weed/messaging/broker/broker_grpc_server_subscribe.go +++ b/weed/messaging/broker/broker_grpc_server_subscribe.go @@ -125,6 +125,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs } func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTime time.Time, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (err error) { + startTime = startTime.UTC() startDate := fmt.Sprintf("%04d-%02d-%02d", startTime.Year(), startTime.Month(), startTime.Day()) startHourMinute := fmt.Sprintf("%02d-%02d.segment", startTime.Hour(), startTime.Minute()) diff --git a/weed/messaging/broker/topic_manager.go b/weed/messaging/broker/topic_manager.go index b563fffa1..93815f8f4 100644 --- a/weed/messaging/broker/topic_manager.go +++ b/weed/messaging/broker/topic_manager.go @@ -56,6 +56,7 @@ func (tm *TopicManager) buildLogBuffer(tl *TopicControl, tp TopicPartition, topi // fmt.Printf("flushing with topic config %+v\n", topicConfig) + startTime, stopTime = startTime.UTC(), stopTime.UTC() targetFile := fmt.Sprintf( "%s/%s/%s/%04d-%02d-%02d/%02d-%02d.part%02d", filer2.TopicsDir, tp.Namespace, tp.Topic, diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go index f59c7e1a9..5c1b946f5 100644 --- a/weed/operation/upload_content.go +++ b/weed/operation/upload_content.go @@ -3,7 +3,6 @@ package operation import ( "bytes" "encoding/json" - "errors" "fmt" "io" "io/ioutil" @@ -210,8 +209,8 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error req, postErr := http.NewRequest("POST", uploadUrl, body_buf) if postErr != nil { - glog.V(1).Infof("failing to upload to %s: %v", uploadUrl, postErr) - return nil, fmt.Errorf("failing to upload to %s: %v", uploadUrl, postErr) + glog.V(1).Infof("create upload request %s: %v", uploadUrl, postErr) + return nil, fmt.Errorf("create upload request %s: %v", uploadUrl, postErr) } req.Header.Set("Content-Type", content_type) for k, v := range pairMap { @@ -222,10 +221,10 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error } resp, post_err := HttpClient.Do(req) if post_err != nil { - glog.V(1).Infof("failing to upload to %v: %v", uploadUrl, post_err) - return nil, fmt.Errorf("failing to upload to %v: %v", uploadUrl, post_err) + glog.Errorf("upload to %v: %v", uploadUrl, post_err) + return nil, fmt.Errorf("upload to %v: %v", uploadUrl, post_err) } - defer resp.Body.Close() + defer util.CloseResponse(resp) var ret UploadResult etag := getEtag(resp) @@ -233,17 +232,19 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error ret.ETag = etag return &ret, nil } + resp_body, ra_err := ioutil.ReadAll(resp.Body) if ra_err != nil { - return nil, ra_err + return nil, fmt.Errorf("read response body %v: %v", uploadUrl, ra_err) } + unmarshal_err := json.Unmarshal(resp_body, &ret) if unmarshal_err != nil { - glog.V(0).Infoln("failing to read upload response", uploadUrl, string(resp_body)) - return nil, unmarshal_err + glog.Errorf("unmarshal %s: %v", uploadUrl, string(resp_body)) + return nil, fmt.Errorf("unmarshal %v: %v", uploadUrl, unmarshal_err) } if ret.Error != "" { - return nil, errors.New(ret.Error) + return nil, fmt.Errorf("unmarshalled error %v: %v", uploadUrl, ret.Error) } ret.ETag = etag ret.ContentMd5 = resp.Header.Get("Content-MD5") diff --git a/weed/pb/filer.proto b/weed/pb/filer.proto index dcc18f2a5..65947e674 100644 --- a/weed/pb/filer.proto +++ b/weed/pb/filer.proto @@ -102,6 +102,7 @@ message EventNotification { bool delete_chunks = 3; string new_parent_path = 4; bool is_from_other_cluster = 5; + repeated int32 signatures = 6; } message FileChunk { @@ -150,6 +151,7 @@ message CreateEntryRequest { Entry entry = 2; bool o_excl = 3; bool is_from_other_cluster = 4; + repeated int32 signatures = 5; } message CreateEntryResponse { @@ -160,6 +162,7 @@ message UpdateEntryRequest { string directory = 1; Entry entry = 2; bool is_from_other_cluster = 3; + repeated int32 signatures = 4; } message UpdateEntryResponse { } @@ -180,6 +183,7 @@ message DeleteEntryRequest { bool is_recursive = 5; bool ignore_recursive_error = 6; bool is_from_other_cluster = 7; + repeated int32 signatures = 8; } message DeleteEntryResponse { @@ -268,6 +272,7 @@ message SubscribeMetadataRequest { string client_name = 1; string path_prefix = 2; int64 since_ns = 3; + int32 signature = 4; } message SubscribeMetadataResponse { string directory = 1; diff --git a/weed/pb/filer_pb/filer.pb.go b/weed/pb/filer_pb/filer.pb.go index 127f3f358..3ce561eb2 100644 --- a/weed/pb/filer_pb/filer.pb.go +++ b/weed/pb/filer_pb/filer.pb.go @@ -396,11 +396,12 @@ type EventNotification struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - OldEntry *Entry `protobuf:"bytes,1,opt,name=old_entry,json=oldEntry,proto3" json:"old_entry,omitempty"` - NewEntry *Entry `protobuf:"bytes,2,opt,name=new_entry,json=newEntry,proto3" json:"new_entry,omitempty"` - DeleteChunks bool `protobuf:"varint,3,opt,name=delete_chunks,json=deleteChunks,proto3" json:"delete_chunks,omitempty"` - NewParentPath string `protobuf:"bytes,4,opt,name=new_parent_path,json=newParentPath,proto3" json:"new_parent_path,omitempty"` - IsFromOtherCluster bool `protobuf:"varint,5,opt,name=is_from_other_cluster,json=isFromOtherCluster,proto3" json:"is_from_other_cluster,omitempty"` + OldEntry *Entry `protobuf:"bytes,1,opt,name=old_entry,json=oldEntry,proto3" json:"old_entry,omitempty"` + NewEntry *Entry `protobuf:"bytes,2,opt,name=new_entry,json=newEntry,proto3" json:"new_entry,omitempty"` + DeleteChunks bool `protobuf:"varint,3,opt,name=delete_chunks,json=deleteChunks,proto3" json:"delete_chunks,omitempty"` + NewParentPath string `protobuf:"bytes,4,opt,name=new_parent_path,json=newParentPath,proto3" json:"new_parent_path,omitempty"` + IsFromOtherCluster bool `protobuf:"varint,5,opt,name=is_from_other_cluster,json=isFromOtherCluster,proto3" json:"is_from_other_cluster,omitempty"` + Signatures []int32 `protobuf:"varint,6,rep,packed,name=signatures,proto3" json:"signatures,omitempty"` } func (x *EventNotification) Reset() { @@ -470,6 +471,13 @@ func (x *EventNotification) GetIsFromOtherCluster() bool { return false } +func (x *EventNotification) GetSignatures() []int32 { + if x != nil { + return x.Signatures + } + return nil +} + type FileChunk struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -863,10 +871,11 @@ type CreateEntryRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Directory string `protobuf:"bytes,1,opt,name=directory,proto3" json:"directory,omitempty"` - Entry *Entry `protobuf:"bytes,2,opt,name=entry,proto3" json:"entry,omitempty"` - OExcl bool `protobuf:"varint,3,opt,name=o_excl,json=oExcl,proto3" json:"o_excl,omitempty"` - IsFromOtherCluster bool `protobuf:"varint,4,opt,name=is_from_other_cluster,json=isFromOtherCluster,proto3" json:"is_from_other_cluster,omitempty"` + Directory string `protobuf:"bytes,1,opt,name=directory,proto3" json:"directory,omitempty"` + Entry *Entry `protobuf:"bytes,2,opt,name=entry,proto3" json:"entry,omitempty"` + OExcl bool `protobuf:"varint,3,opt,name=o_excl,json=oExcl,proto3" json:"o_excl,omitempty"` + IsFromOtherCluster bool `protobuf:"varint,4,opt,name=is_from_other_cluster,json=isFromOtherCluster,proto3" json:"is_from_other_cluster,omitempty"` + Signatures []int32 `protobuf:"varint,5,rep,packed,name=signatures,proto3" json:"signatures,omitempty"` } func (x *CreateEntryRequest) Reset() { @@ -929,6 +938,13 @@ func (x *CreateEntryRequest) GetIsFromOtherCluster() bool { return false } +func (x *CreateEntryRequest) GetSignatures() []int32 { + if x != nil { + return x.Signatures + } + return nil +} + type CreateEntryResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -981,9 +997,10 @@ type UpdateEntryRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Directory string `protobuf:"bytes,1,opt,name=directory,proto3" json:"directory,omitempty"` - Entry *Entry `protobuf:"bytes,2,opt,name=entry,proto3" json:"entry,omitempty"` - IsFromOtherCluster bool `protobuf:"varint,3,opt,name=is_from_other_cluster,json=isFromOtherCluster,proto3" json:"is_from_other_cluster,omitempty"` + Directory string `protobuf:"bytes,1,opt,name=directory,proto3" json:"directory,omitempty"` + Entry *Entry `protobuf:"bytes,2,opt,name=entry,proto3" json:"entry,omitempty"` + IsFromOtherCluster bool `protobuf:"varint,3,opt,name=is_from_other_cluster,json=isFromOtherCluster,proto3" json:"is_from_other_cluster,omitempty"` + Signatures []int32 `protobuf:"varint,4,rep,packed,name=signatures,proto3" json:"signatures,omitempty"` } func (x *UpdateEntryRequest) Reset() { @@ -1039,6 +1056,13 @@ func (x *UpdateEntryRequest) GetIsFromOtherCluster() bool { return false } +func (x *UpdateEntryRequest) GetSignatures() []int32 { + if x != nil { + return x.Signatures + } + return nil +} + type UpdateEntryResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -1186,10 +1210,11 @@ type DeleteEntryRequest struct { Directory string `protobuf:"bytes,1,opt,name=directory,proto3" json:"directory,omitempty"` Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"` // bool is_directory = 3; - IsDeleteData bool `protobuf:"varint,4,opt,name=is_delete_data,json=isDeleteData,proto3" json:"is_delete_data,omitempty"` - IsRecursive bool `protobuf:"varint,5,opt,name=is_recursive,json=isRecursive,proto3" json:"is_recursive,omitempty"` - IgnoreRecursiveError bool `protobuf:"varint,6,opt,name=ignore_recursive_error,json=ignoreRecursiveError,proto3" json:"ignore_recursive_error,omitempty"` - IsFromOtherCluster bool `protobuf:"varint,7,opt,name=is_from_other_cluster,json=isFromOtherCluster,proto3" json:"is_from_other_cluster,omitempty"` + IsDeleteData bool `protobuf:"varint,4,opt,name=is_delete_data,json=isDeleteData,proto3" json:"is_delete_data,omitempty"` + IsRecursive bool `protobuf:"varint,5,opt,name=is_recursive,json=isRecursive,proto3" json:"is_recursive,omitempty"` + IgnoreRecursiveError bool `protobuf:"varint,6,opt,name=ignore_recursive_error,json=ignoreRecursiveError,proto3" json:"ignore_recursive_error,omitempty"` + IsFromOtherCluster bool `protobuf:"varint,7,opt,name=is_from_other_cluster,json=isFromOtherCluster,proto3" json:"is_from_other_cluster,omitempty"` + Signatures []int32 `protobuf:"varint,8,rep,packed,name=signatures,proto3" json:"signatures,omitempty"` } func (x *DeleteEntryRequest) Reset() { @@ -1266,6 +1291,13 @@ func (x *DeleteEntryRequest) GetIsFromOtherCluster() bool { return false } +func (x *DeleteEntryRequest) GetSignatures() []int32 { + if x != nil { + return x.Signatures + } + return nil +} + type DeleteEntryResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -2176,6 +2208,7 @@ type SubscribeMetadataRequest struct { ClientName string `protobuf:"bytes,1,opt,name=client_name,json=clientName,proto3" json:"client_name,omitempty"` PathPrefix string `protobuf:"bytes,2,opt,name=path_prefix,json=pathPrefix,proto3" json:"path_prefix,omitempty"` SinceNs int64 `protobuf:"varint,3,opt,name=since_ns,json=sinceNs,proto3" json:"since_ns,omitempty"` + Signature int32 `protobuf:"varint,4,opt,name=signature,proto3" json:"signature,omitempty"` } func (x *SubscribeMetadataRequest) Reset() { @@ -2231,6 +2264,13 @@ func (x *SubscribeMetadataRequest) GetSinceNs() int64 { return 0 } +func (x *SubscribeMetadataRequest) GetSignature() int32 { + if x != nil { + return x.Signature + } + return 0 +} + type SubscribeMetadataResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -2669,7 +2709,7 @@ var file_filer_proto_rawDesc = []byte{ 0x69, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x64, 0x69, 0x72, 0x12, 0x25, 0x0a, 0x05, 0x65, 0x6e, 0x74, 0x72, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x05, 0x65, - 0x6e, 0x74, 0x72, 0x79, 0x22, 0xef, 0x01, 0x0a, 0x11, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x4e, 0x6f, + 0x6e, 0x74, 0x72, 0x79, 0x22, 0x8f, 0x02, 0x0a, 0x11, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x2c, 0x0a, 0x09, 0x6f, 0x6c, 0x64, 0x5f, 0x65, 0x6e, 0x74, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x08, @@ -2684,7 +2724,9 @@ var file_filer_proto_rawDesc = []byte{ 0x61, 0x74, 0x68, 0x12, 0x31, 0x0a, 0x15, 0x69, 0x73, 0x5f, 0x66, 0x72, 0x6f, 0x6d, 0x5f, 0x6f, 0x74, 0x68, 0x65, 0x72, 0x5f, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x18, 0x05, 0x20, 0x01, 0x28, 0x08, 0x52, 0x12, 0x69, 0x73, 0x46, 0x72, 0x6f, 0x6d, 0x4f, 0x74, 0x68, 0x65, 0x72, 0x43, - 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x22, 0xe6, 0x02, 0x0a, 0x09, 0x46, 0x69, 0x6c, 0x65, 0x43, + 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x12, 0x1e, 0x0a, 0x0a, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, + 0x75, 0x72, 0x65, 0x73, 0x18, 0x06, 0x20, 0x03, 0x28, 0x05, 0x52, 0x0a, 0x73, 0x69, 0x67, 0x6e, + 0x61, 0x74, 0x75, 0x72, 0x65, 0x73, 0x22, 0xe6, 0x02, 0x0a, 0x09, 0x46, 0x69, 0x6c, 0x65, 0x43, 0x68, 0x75, 0x6e, 0x6b, 0x12, 0x17, 0x0a, 0x07, 0x66, 0x69, 0x6c, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x66, 0x69, 0x6c, 0x65, 0x49, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x06, 0x6f, @@ -2740,7 +2782,7 @@ var file_filer_proto_rawDesc = []byte{ 0x70, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x25, 0x0a, 0x0e, 0x73, 0x79, 0x6d, 0x6c, 0x69, 0x6e, 0x6b, 0x5f, 0x74, 0x61, 0x72, 0x67, 0x65, 0x74, 0x18, 0x0d, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x73, 0x79, 0x6d, 0x6c, 0x69, 0x6e, 0x6b, 0x54, 0x61, 0x72, 0x67, 0x65, 0x74, 0x12, 0x10, 0x0a, 0x03, - 0x6d, 0x64, 0x35, 0x18, 0x0e, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x03, 0x6d, 0x64, 0x35, 0x22, 0xa3, + 0x6d, 0x64, 0x35, 0x18, 0x0e, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x03, 0x6d, 0x64, 0x35, 0x22, 0xc3, 0x01, 0x0a, 0x12, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, @@ -2751,10 +2793,12 @@ var file_filer_proto_rawDesc = []byte{ 0x6c, 0x12, 0x31, 0x0a, 0x15, 0x69, 0x73, 0x5f, 0x66, 0x72, 0x6f, 0x6d, 0x5f, 0x6f, 0x74, 0x68, 0x65, 0x72, 0x5f, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x18, 0x04, 0x20, 0x01, 0x28, 0x08, 0x52, 0x12, 0x69, 0x73, 0x46, 0x72, 0x6f, 0x6d, 0x4f, 0x74, 0x68, 0x65, 0x72, 0x43, 0x6c, 0x75, - 0x73, 0x74, 0x65, 0x72, 0x22, 0x2b, 0x0a, 0x13, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x45, 0x6e, + 0x73, 0x74, 0x65, 0x72, 0x12, 0x1e, 0x0a, 0x0a, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, + 0x65, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x05, 0x52, 0x0a, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, + 0x75, 0x72, 0x65, 0x73, 0x22, 0x2b, 0x0a, 0x13, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, - 0x72, 0x22, 0x8c, 0x01, 0x0a, 0x12, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x45, 0x6e, 0x74, 0x72, + 0x72, 0x22, 0xac, 0x01, 0x0a, 0x12, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x79, 0x12, 0x25, 0x0a, 0x05, 0x65, 0x6e, 0x74, 0x72, 0x79, 0x18, @@ -2763,6 +2807,8 @@ var file_filer_proto_rawDesc = []byte{ 0x15, 0x69, 0x73, 0x5f, 0x66, 0x72, 0x6f, 0x6d, 0x5f, 0x6f, 0x74, 0x68, 0x65, 0x72, 0x5f, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x12, 0x69, 0x73, 0x46, 0x72, 0x6f, 0x6d, 0x4f, 0x74, 0x68, 0x65, 0x72, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, + 0x12, 0x1e, 0x0a, 0x0a, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x73, 0x18, 0x04, + 0x20, 0x03, 0x28, 0x05, 0x52, 0x0a, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x73, 0x22, 0x15, 0x0a, 0x13, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x80, 0x01, 0x0a, 0x14, 0x41, 0x70, 0x70, 0x65, 0x6e, 0x64, 0x54, 0x6f, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, @@ -2774,7 +2820,7 @@ var file_filer_proto_rawDesc = []byte{ 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x46, 0x69, 0x6c, 0x65, 0x43, 0x68, 0x75, 0x6e, 0x6b, 0x52, 0x06, 0x63, 0x68, 0x75, 0x6e, 0x6b, 0x73, 0x22, 0x17, 0x0a, 0x15, 0x41, 0x70, 0x70, 0x65, 0x6e, 0x64, 0x54, 0x6f, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x22, 0xf8, 0x01, 0x0a, 0x12, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x45, 0x6e, + 0x6e, 0x73, 0x65, 0x22, 0x98, 0x02, 0x0a, 0x12, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x79, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, @@ -2789,7 +2835,9 @@ var file_filer_proto_rawDesc = []byte{ 0x75, 0x72, 0x73, 0x69, 0x76, 0x65, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x31, 0x0a, 0x15, 0x69, 0x73, 0x5f, 0x66, 0x72, 0x6f, 0x6d, 0x5f, 0x6f, 0x74, 0x68, 0x65, 0x72, 0x5f, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x18, 0x07, 0x20, 0x01, 0x28, 0x08, 0x52, 0x12, 0x69, 0x73, 0x46, 0x72, - 0x6f, 0x6d, 0x4f, 0x74, 0x68, 0x65, 0x72, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x22, 0x2b, + 0x6f, 0x6d, 0x4f, 0x74, 0x68, 0x65, 0x72, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x12, 0x1e, + 0x0a, 0x0a, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x73, 0x18, 0x08, 0x20, 0x03, + 0x28, 0x05, 0x52, 0x0a, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x73, 0x22, 0x2b, 0x0a, 0x13, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x22, 0x9a, 0x01, 0x0a, 0x18, @@ -2894,149 +2942,151 @@ var file_filer_proto_rawDesc = []byte{ 0x62, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x64, 0x69, 0x72, 0x42, 0x75, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x12, 0x16, 0x0a, 0x06, 0x63, 0x69, 0x70, 0x68, 0x65, 0x72, 0x18, 0x07, 0x20, 0x01, 0x28, 0x08, 0x52, 0x06, 0x63, 0x69, 0x70, 0x68, 0x65, - 0x72, 0x22, 0x77, 0x0a, 0x18, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x4d, 0x65, - 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1f, 0x0a, - 0x0b, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x0a, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x1f, - 0x0a, 0x0b, 0x70, 0x61, 0x74, 0x68, 0x5f, 0x70, 0x72, 0x65, 0x66, 0x69, 0x78, 0x18, 0x02, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x0a, 0x70, 0x61, 0x74, 0x68, 0x50, 0x72, 0x65, 0x66, 0x69, 0x78, 0x12, - 0x19, 0x0a, 0x08, 0x73, 0x69, 0x6e, 0x63, 0x65, 0x5f, 0x6e, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, - 0x03, 0x52, 0x07, 0x73, 0x69, 0x6e, 0x63, 0x65, 0x4e, 0x73, 0x22, 0x9a, 0x01, 0x0a, 0x19, 0x53, - 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1c, 0x0a, 0x09, 0x64, 0x69, 0x72, 0x65, - 0x63, 0x74, 0x6f, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x64, 0x69, 0x72, - 0x65, 0x63, 0x74, 0x6f, 0x72, 0x79, 0x12, 0x4a, 0x0a, 0x12, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, - 0x6e, 0x6f, 0x74, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x45, 0x76, - 0x65, 0x6e, 0x74, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, - 0x11, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, - 0x6f, 0x6e, 0x12, 0x13, 0x0a, 0x05, 0x74, 0x73, 0x5f, 0x6e, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, - 0x03, 0x52, 0x04, 0x74, 0x73, 0x4e, 0x73, 0x22, 0x61, 0x0a, 0x08, 0x4c, 0x6f, 0x67, 0x45, 0x6e, - 0x74, 0x72, 0x79, 0x12, 0x13, 0x0a, 0x05, 0x74, 0x73, 0x5f, 0x6e, 0x73, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x03, 0x52, 0x04, 0x74, 0x73, 0x4e, 0x73, 0x12, 0x2c, 0x0a, 0x12, 0x70, 0x61, 0x72, 0x74, - 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x6b, 0x65, 0x79, 0x5f, 0x68, 0x61, 0x73, 0x68, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x05, 0x52, 0x10, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x4b, - 0x65, 0x79, 0x48, 0x61, 0x73, 0x68, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, - 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0x65, 0x0a, 0x14, 0x4b, 0x65, - 0x65, 0x70, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x65, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x1b, 0x0a, 0x09, 0x67, 0x72, 0x70, 0x63, 0x5f, 0x70, - 0x6f, 0x72, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x67, 0x72, 0x70, 0x63, 0x50, - 0x6f, 0x72, 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, - 0x18, 0x03, 0x20, 0x03, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, - 0x73, 0x22, 0x17, 0x0a, 0x15, 0x4b, 0x65, 0x65, 0x70, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, - 0x65, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x31, 0x0a, 0x13, 0x4c, 0x6f, - 0x63, 0x61, 0x74, 0x65, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x08, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x22, 0xcd, 0x01, - 0x0a, 0x14, 0x4c, 0x6f, 0x63, 0x61, 0x74, 0x65, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x66, 0x6f, 0x75, 0x6e, 0x64, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x05, 0x66, 0x6f, 0x75, 0x6e, 0x64, 0x12, 0x45, 0x0a, 0x09, - 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, - 0x27, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x4c, 0x6f, 0x63, 0x61, 0x74, - 0x65, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, - 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x52, 0x09, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, - 0x63, 0x65, 0x73, 0x1a, 0x58, 0x0a, 0x08, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, - 0x25, 0x0a, 0x0e, 0x67, 0x72, 0x70, 0x63, 0x5f, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x65, - 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x67, 0x72, 0x70, 0x63, 0x41, 0x64, 0x64, - 0x72, 0x65, 0x73, 0x73, 0x65, 0x73, 0x12, 0x25, 0x0a, 0x0e, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, - 0x63, 0x65, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0d, - 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x32, 0x8d, 0x0b, - 0x0a, 0x0c, 0x53, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x46, 0x69, 0x6c, 0x65, 0x72, 0x12, 0x67, - 0x0a, 0x14, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x44, 0x69, 0x72, 0x65, 0x63, 0x74, 0x6f, 0x72, - 0x79, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x25, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, - 0x62, 0x2e, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x44, 0x69, 0x72, 0x65, 0x63, 0x74, 0x6f, 0x72, - 0x79, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x26, 0x2e, - 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x44, - 0x69, 0x72, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x79, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x4e, 0x0a, 0x0b, 0x4c, 0x69, 0x73, 0x74, 0x45, - 0x6e, 0x74, 0x72, 0x69, 0x65, 0x73, 0x12, 0x1c, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, - 0x62, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x45, 0x6e, 0x74, 0x72, 0x69, 0x65, 0x73, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, - 0x4c, 0x69, 0x73, 0x74, 0x45, 0x6e, 0x74, 0x72, 0x69, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x12, 0x4c, 0x0a, 0x0b, 0x43, 0x72, 0x65, 0x61, 0x74, - 0x65, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x1c, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, - 0x62, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, - 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x4c, 0x0a, 0x0b, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x45, - 0x6e, 0x74, 0x72, 0x79, 0x12, 0x1c, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, - 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x55, 0x70, - 0x64, 0x61, 0x74, 0x65, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x22, 0x00, 0x12, 0x52, 0x0a, 0x0d, 0x41, 0x70, 0x70, 0x65, 0x6e, 0x64, 0x54, 0x6f, 0x45, - 0x6e, 0x74, 0x72, 0x79, 0x12, 0x1e, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, - 0x41, 0x70, 0x70, 0x65, 0x6e, 0x64, 0x54, 0x6f, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, - 0x41, 0x70, 0x70, 0x65, 0x6e, 0x64, 0x54, 0x6f, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x4c, 0x0a, 0x0b, 0x44, 0x65, 0x6c, 0x65, 0x74, - 0x65, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x1c, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, - 0x62, 0x2e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, - 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x5e, 0x0a, 0x11, 0x41, 0x74, 0x6f, 0x6d, 0x69, 0x63, 0x52, - 0x65, 0x6e, 0x61, 0x6d, 0x65, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x22, 0x2e, 0x66, 0x69, 0x6c, - 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x41, 0x74, 0x6f, 0x6d, 0x69, 0x63, 0x52, 0x65, 0x6e, 0x61, - 0x6d, 0x65, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, - 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x41, 0x74, 0x6f, 0x6d, 0x69, 0x63, - 0x52, 0x65, 0x6e, 0x61, 0x6d, 0x65, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x4f, 0x0a, 0x0c, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x56, + 0x72, 0x22, 0x95, 0x01, 0x0a, 0x18, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x4d, + 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1f, + 0x0a, 0x0b, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x0a, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0x12, + 0x1f, 0x0a, 0x0b, 0x70, 0x61, 0x74, 0x68, 0x5f, 0x70, 0x72, 0x65, 0x66, 0x69, 0x78, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x70, 0x61, 0x74, 0x68, 0x50, 0x72, 0x65, 0x66, 0x69, 0x78, + 0x12, 0x19, 0x0a, 0x08, 0x73, 0x69, 0x6e, 0x63, 0x65, 0x5f, 0x6e, 0x73, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x03, 0x52, 0x07, 0x73, 0x69, 0x6e, 0x63, 0x65, 0x4e, 0x73, 0x12, 0x1c, 0x0a, 0x09, 0x73, + 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x05, 0x52, 0x09, + 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x22, 0x9a, 0x01, 0x0a, 0x19, 0x53, 0x75, + 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1c, 0x0a, 0x09, 0x64, 0x69, 0x72, 0x65, 0x63, + 0x74, 0x6f, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x64, 0x69, 0x72, 0x65, + 0x63, 0x74, 0x6f, 0x72, 0x79, 0x12, 0x4a, 0x0a, 0x12, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x6e, + 0x6f, 0x74, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x1b, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x45, 0x76, 0x65, + 0x6e, 0x74, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x11, + 0x65, 0x76, 0x65, 0x6e, 0x74, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, + 0x6e, 0x12, 0x13, 0x0a, 0x05, 0x74, 0x73, 0x5f, 0x6e, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, + 0x52, 0x04, 0x74, 0x73, 0x4e, 0x73, 0x22, 0x61, 0x0a, 0x08, 0x4c, 0x6f, 0x67, 0x45, 0x6e, 0x74, + 0x72, 0x79, 0x12, 0x13, 0x0a, 0x05, 0x74, 0x73, 0x5f, 0x6e, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x03, 0x52, 0x04, 0x74, 0x73, 0x4e, 0x73, 0x12, 0x2c, 0x0a, 0x12, 0x70, 0x61, 0x72, 0x74, 0x69, + 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x6b, 0x65, 0x79, 0x5f, 0x68, 0x61, 0x73, 0x68, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x05, 0x52, 0x10, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x4b, 0x65, + 0x79, 0x48, 0x61, 0x73, 0x68, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0x65, 0x0a, 0x14, 0x4b, 0x65, 0x65, + 0x70, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x65, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x1b, 0x0a, 0x09, 0x67, 0x72, 0x70, 0x63, 0x5f, 0x70, 0x6f, + 0x72, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x67, 0x72, 0x70, 0x63, 0x50, 0x6f, + 0x72, 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x18, + 0x03, 0x20, 0x03, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, + 0x22, 0x17, 0x0a, 0x15, 0x4b, 0x65, 0x65, 0x70, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x65, + 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x31, 0x0a, 0x13, 0x4c, 0x6f, 0x63, + 0x61, 0x74, 0x65, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x12, 0x1a, 0x0a, 0x08, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x08, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x22, 0xcd, 0x01, 0x0a, + 0x14, 0x4c, 0x6f, 0x63, 0x61, 0x74, 0x65, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x66, 0x6f, 0x75, 0x6e, 0x64, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x08, 0x52, 0x05, 0x66, 0x6f, 0x75, 0x6e, 0x64, 0x12, 0x45, 0x0a, 0x09, 0x72, + 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x27, + 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x4c, 0x6f, 0x63, 0x61, 0x74, 0x65, + 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x52, + 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x52, 0x09, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, + 0x65, 0x73, 0x1a, 0x58, 0x0a, 0x08, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x25, + 0x0a, 0x0e, 0x67, 0x72, 0x70, 0x63, 0x5f, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x65, 0x73, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x67, 0x72, 0x70, 0x63, 0x41, 0x64, 0x64, 0x72, + 0x65, 0x73, 0x73, 0x65, 0x73, 0x12, 0x25, 0x0a, 0x0e, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, + 0x65, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0d, 0x72, + 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x32, 0x8d, 0x0b, 0x0a, + 0x0c, 0x53, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x46, 0x69, 0x6c, 0x65, 0x72, 0x12, 0x67, 0x0a, + 0x14, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x44, 0x69, 0x72, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x79, + 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x25, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, + 0x2e, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x44, 0x69, 0x72, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x79, + 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x26, 0x2e, 0x66, + 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x44, 0x69, + 0x72, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x79, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x4e, 0x0a, 0x0b, 0x4c, 0x69, 0x73, 0x74, 0x45, 0x6e, + 0x74, 0x72, 0x69, 0x65, 0x73, 0x12, 0x1c, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, + 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x45, 0x6e, 0x74, 0x72, 0x69, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x4c, + 0x69, 0x73, 0x74, 0x45, 0x6e, 0x74, 0x72, 0x69, 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x12, 0x4c, 0x0a, 0x0b, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, + 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x1c, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, + 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x43, + 0x72, 0x65, 0x61, 0x74, 0x65, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x22, 0x00, 0x12, 0x4c, 0x0a, 0x0b, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x45, 0x6e, + 0x74, 0x72, 0x79, 0x12, 0x1c, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x55, + 0x70, 0x64, 0x61, 0x74, 0x65, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x1d, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x55, 0x70, 0x64, + 0x61, 0x74, 0x65, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x22, 0x00, 0x12, 0x52, 0x0a, 0x0d, 0x41, 0x70, 0x70, 0x65, 0x6e, 0x64, 0x54, 0x6f, 0x45, 0x6e, + 0x74, 0x72, 0x79, 0x12, 0x1e, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x41, + 0x70, 0x70, 0x65, 0x6e, 0x64, 0x54, 0x6f, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x41, + 0x70, 0x70, 0x65, 0x6e, 0x64, 0x54, 0x6f, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x4c, 0x0a, 0x0b, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, + 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x1c, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, + 0x2e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x44, + 0x65, 0x6c, 0x65, 0x74, 0x65, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x22, 0x00, 0x12, 0x5e, 0x0a, 0x11, 0x41, 0x74, 0x6f, 0x6d, 0x69, 0x63, 0x52, 0x65, + 0x6e, 0x61, 0x6d, 0x65, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x22, 0x2e, 0x66, 0x69, 0x6c, 0x65, + 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x41, 0x74, 0x6f, 0x6d, 0x69, 0x63, 0x52, 0x65, 0x6e, 0x61, 0x6d, + 0x65, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, + 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x41, 0x74, 0x6f, 0x6d, 0x69, 0x63, 0x52, + 0x65, 0x6e, 0x61, 0x6d, 0x65, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x22, 0x00, 0x12, 0x4f, 0x0a, 0x0c, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x56, 0x6f, + 0x6c, 0x75, 0x6d, 0x65, 0x12, 0x1d, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, + 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x56, 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x41, + 0x73, 0x73, 0x69, 0x67, 0x6e, 0x56, 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x4f, 0x0a, 0x0c, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x56, 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x12, 0x1d, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, - 0x2e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x56, 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x52, 0x65, 0x71, + 0x2e, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x56, 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, - 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x56, 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x4f, 0x0a, 0x0c, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, - 0x56, 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x12, 0x1d, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, - 0x62, 0x2e, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x56, 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, - 0x2e, 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x56, 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x5b, 0x0a, 0x10, 0x44, 0x65, 0x6c, 0x65, 0x74, - 0x65, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x21, 0x2e, 0x66, 0x69, - 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x43, 0x6f, 0x6c, - 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x22, - 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, - 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x22, 0x00, 0x12, 0x49, 0x0a, 0x0a, 0x53, 0x74, 0x61, 0x74, 0x69, 0x73, 0x74, 0x69, - 0x63, 0x73, 0x12, 0x1b, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x74, - 0x61, 0x74, 0x69, 0x73, 0x74, 0x69, 0x63, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, - 0x1c, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x69, - 0x73, 0x74, 0x69, 0x63, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, - 0x6a, 0x0a, 0x15, 0x47, 0x65, 0x74, 0x46, 0x69, 0x6c, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, - 0x67, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x26, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, - 0x5f, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x46, 0x69, 0x6c, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, - 0x69, 0x67, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x1a, 0x27, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x46, - 0x69, 0x6c, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, - 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x60, 0x0a, 0x11, 0x53, + 0x4c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x56, 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x5b, 0x0a, 0x10, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, + 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x21, 0x2e, 0x66, 0x69, 0x6c, + 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x43, 0x6f, 0x6c, 0x6c, + 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x22, 0x2e, + 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x43, + 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x22, 0x00, 0x12, 0x49, 0x0a, 0x0a, 0x53, 0x74, 0x61, 0x74, 0x69, 0x73, 0x74, 0x69, 0x63, + 0x73, 0x12, 0x1b, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x74, 0x61, + 0x74, 0x69, 0x73, 0x74, 0x69, 0x63, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1c, + 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x69, 0x73, + 0x74, 0x69, 0x63, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x6a, + 0x0a, 0x15, 0x47, 0x65, 0x74, 0x46, 0x69, 0x6c, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, + 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x26, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, + 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x46, 0x69, 0x6c, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, + 0x67, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x27, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x46, 0x69, + 0x6c, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x60, 0x0a, 0x11, 0x53, 0x75, + 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, + 0x22, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, + 0x72, 0x69, 0x62, 0x65, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, - 0x12, 0x22, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, - 0x63, 0x72, 0x69, 0x62, 0x65, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, - 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, - 0x61, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x12, 0x65, 0x0a, - 0x16, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x4c, 0x6f, 0x63, 0x61, 0x6c, 0x4d, - 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x22, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, - 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x4d, 0x65, 0x74, 0x61, - 0x64, 0x61, 0x74, 0x61, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x66, 0x69, - 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, - 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x22, 0x00, 0x30, 0x01, 0x12, 0x56, 0x0a, 0x0d, 0x4b, 0x65, 0x65, 0x70, 0x43, 0x6f, 0x6e, 0x6e, - 0x65, 0x63, 0x74, 0x65, 0x64, 0x12, 0x1e, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, - 0x2e, 0x4b, 0x65, 0x65, 0x70, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x65, 0x64, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, - 0x2e, 0x4b, 0x65, 0x65, 0x70, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x65, 0x64, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x12, 0x4f, 0x0a, 0x0c, - 0x4c, 0x6f, 0x63, 0x61, 0x74, 0x65, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x12, 0x1d, 0x2e, 0x66, - 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x4c, 0x6f, 0x63, 0x61, 0x74, 0x65, 0x42, 0x72, - 0x6f, 0x6b, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x66, 0x69, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x12, 0x65, 0x0a, 0x16, + 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x4c, 0x6f, 0x63, 0x61, 0x6c, 0x4d, 0x65, + 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x12, 0x22, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, + 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x4d, 0x65, 0x74, 0x61, 0x64, + 0x61, 0x74, 0x61, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x66, 0x69, 0x6c, + 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x4d, + 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, + 0x00, 0x30, 0x01, 0x12, 0x56, 0x0a, 0x0d, 0x4b, 0x65, 0x65, 0x70, 0x43, 0x6f, 0x6e, 0x6e, 0x65, + 0x63, 0x74, 0x65, 0x64, 0x12, 0x1e, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, + 0x4b, 0x65, 0x65, 0x70, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x65, 0x64, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, + 0x4b, 0x65, 0x65, 0x70, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x65, 0x64, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x12, 0x4f, 0x0a, 0x0c, 0x4c, + 0x6f, 0x63, 0x61, 0x74, 0x65, 0x42, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x12, 0x1d, 0x2e, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x4c, 0x6f, 0x63, 0x61, 0x74, 0x65, 0x42, 0x72, 0x6f, - 0x6b, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x4f, 0x0a, - 0x10, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2e, 0x63, 0x6c, 0x69, 0x65, 0x6e, - 0x74, 0x42, 0x0a, 0x46, 0x69, 0x6c, 0x65, 0x72, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x5a, 0x2f, 0x67, - 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x68, 0x72, 0x69, 0x73, 0x6c, - 0x75, 0x73, 0x66, 0x2f, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2f, 0x77, 0x65, - 0x65, 0x64, 0x2f, 0x70, 0x62, 0x2f, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x62, 0x06, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x6b, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x66, 0x69, 0x6c, + 0x65, 0x72, 0x5f, 0x70, 0x62, 0x2e, 0x4c, 0x6f, 0x63, 0x61, 0x74, 0x65, 0x42, 0x72, 0x6f, 0x6b, + 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x4f, 0x0a, 0x10, + 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2e, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, + 0x42, 0x0a, 0x46, 0x69, 0x6c, 0x65, 0x72, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x5a, 0x2f, 0x67, 0x69, + 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x68, 0x72, 0x69, 0x73, 0x6c, 0x75, + 0x73, 0x66, 0x2f, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2f, 0x77, 0x65, 0x65, + 0x64, 0x2f, 0x70, 0x62, 0x2f, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x70, 0x62, 0x62, 0x06, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/weed/pb/filer_pb/filer_client.go b/weed/pb/filer_pb/filer_client.go index c5a8c311a..8ff276ab7 100644 --- a/weed/pb/filer_pb/filer_client.go +++ b/weed/pb/filer_pb/filer_client.go @@ -83,7 +83,7 @@ func doList(filerClient FilerClient, fullDirPath util.FullPath, prefix string, f InclusiveStartFrom: inclusive, } - glog.V(5).Infof("read directory: %v", request) + glog.V(4).Infof("read directory: %v", request) ctx, cancel := context.WithCancel(context.Background()) stream, err := client.ListEntries(ctx, request) if err != nil { @@ -214,24 +214,28 @@ func MkFile(filerClient FilerClient, parentDirectoryPath string, fileName string }) } -func Remove(filerClient FilerClient, parentDirectoryPath, name string, isDeleteData, isRecursive, ignoreRecursiveErr, isFromOtherCluster bool) error { +func Remove(filerClient FilerClient, parentDirectoryPath, name string, isDeleteData, isRecursive, ignoreRecursiveErr, isFromOtherCluster bool, signature int32) error { return filerClient.WithFilerClient(func(client SeaweedFilerClient) error { - if resp, err := client.DeleteEntry(context.Background(), &DeleteEntryRequest{ + deleteEntryRequest := &DeleteEntryRequest{ Directory: parentDirectoryPath, Name: name, IsDeleteData: isDeleteData, IsRecursive: isRecursive, IgnoreRecursiveError: ignoreRecursiveErr, IsFromOtherCluster: isFromOtherCluster, - }); err != nil { - if strings.Contains(err.Error(), ErrNotFound.Error()){ + } + if signature != 0 { + deleteEntryRequest.Signatures = []int32{signature} + } + if resp, err := client.DeleteEntry(context.Background(), deleteEntryRequest); err != nil { + if strings.Contains(err.Error(), ErrNotFound.Error()) { return nil } return err } else { if resp.Error != "" { - if strings.Contains(resp.Error, ErrNotFound.Error()){ + if strings.Contains(resp.Error, ErrNotFound.Error()) { return nil } return errors.New(resp.Error) diff --git a/weed/pb/filer_pb/signature.go b/weed/pb/filer_pb/signature.go new file mode 100644 index 000000000..e13afc656 --- /dev/null +++ b/weed/pb/filer_pb/signature.go @@ -0,0 +1,13 @@ +package filer_pb + +func (r *CreateEntryRequest) AddSignature(sig int32) { + r.Signatures = append(r.Signatures, sig) +} +func (r *CreateEntryRequest) HasSigned(sig int32) bool { + for _, s := range r.Signatures { + if s == sig { + return true + } + } + return false +} diff --git a/weed/pb/volume_server_pb/volume_server.pb.go b/weed/pb/volume_server_pb/volume_server.pb.go index bf12d5edb..4609d6dd8 100644 --- a/weed/pb/volume_server_pb/volume_server.pb.go +++ b/weed/pb/volume_server_pb/volume_server.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.25.0-devel +// protoc-gen-go v1.24.0 // protoc v3.12.3 // source: volume_server.proto diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go index 6429859b4..b90a642c9 100644 --- a/weed/replication/sink/filersink/filer_sink.go +++ b/weed/replication/sink/filersink/filer_sink.go @@ -25,6 +25,7 @@ type FilerSink struct { ttlSec int32 dataCenter string grpcDialOption grpc.DialOption + signature int32 } func init() { @@ -61,6 +62,7 @@ func (fs *FilerSink) initialize(grpcAddress string, dir string, fs.collection = collection fs.ttlSec = int32(ttlSec) fs.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client") + fs.signature = util.RandomInt32() return nil } @@ -69,7 +71,7 @@ func (fs *FilerSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bo dir, name := util.FullPath(key).DirAndName() glog.V(1).Infof("delete entry: %v", key) - err := filer_pb.Remove(fs, dir, name, deleteIncludeChunks, false, false, true) + err := filer_pb.Remove(fs, dir, name, deleteIncludeChunks, false, false, true, fs.signature) if err != nil { glog.V(0).Infof("delete entry %s: %v", key, err) return fmt.Errorf("delete entry %s: %v", key, err) @@ -114,6 +116,7 @@ func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry) error { Chunks: replicatedChunks, }, IsFromOtherCluster: true, + Signatures: []int32{fs.signature}, } glog.V(1).Infof("create: %v", request) @@ -193,6 +196,7 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent Directory: newParentPath, Entry: existingEntry, IsFromOtherCluster: true, + Signatures: []int32{fs.signature}, } if _, err := client.UpdateEntry(context.Background(), request); err != nil { diff --git a/weed/s3api/filer_multipart.go b/weed/s3api/filer_multipart.go index 24bbafe1d..a67a86454 100644 --- a/weed/s3api/filer_multipart.go +++ b/weed/s3api/filer_multipart.go @@ -57,8 +57,8 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa uploadDirectory := s3a.genUploadsFolder(*input.Bucket) + "/" + *input.UploadId entries, err := s3a.list(uploadDirectory, "", "", false, 0) - if err != nil { - glog.Errorf("completeMultipartUpload %s %s error: %v", *input.Bucket, *input.UploadId, err) + if err != nil || len(entries) == 0 { + glog.Errorf("completeMultipartUpload %s %s error: %v, entries:%d", *input.Bucket, *input.UploadId, err, len(entries)) return nil, ErrNoSuchUpload } diff --git a/weed/s3api/s3api_objects_list_handlers.go b/weed/s3api/s3api_objects_list_handlers.go index 3354dd2b3..254a99275 100644 --- a/weed/s3api/s3api_objects_list_handlers.go +++ b/weed/s3api/s3api_objects_list_handlers.go @@ -17,18 +17,18 @@ import ( ) type ListBucketResultV2 struct { - XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListBucketResult"` - Name string `xml:"Name"` - Prefix string `xml:"Prefix"` - MaxKeys int `xml:"MaxKeys"` - Delimiter string `xml:"Delimiter,omitempty"` - IsTruncated bool `xml:"IsTruncated"` - Contents []ListEntry `xml:"Contents,omitempty"` - CommonPrefixes []PrefixEntry `xml:"CommonPrefixes,omitempty"` - ContinuationToken string `xml:"ContinuationToken,omitempty"` - NextContinuationToken string `xml:"NextContinuationToken,omitempty"` - KeyCount int `xml:"KeyCount"` - StartAfter string `xml:"StartAfter,omitempty"` + XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListBucketResult"` + Name string `xml:"Name"` + Prefix string `xml:"Prefix"` + MaxKeys int `xml:"MaxKeys"` + Delimiter string `xml:"Delimiter,omitempty"` + IsTruncated bool `xml:"IsTruncated"` + Contents []ListEntry `xml:"Contents,omitempty"` + CommonPrefixes []PrefixEntry `xml:"CommonPrefixes,omitempty"` + ContinuationToken string `xml:"ContinuationToken,omitempty"` + NextContinuationToken string `xml:"NextContinuationToken,omitempty"` + KeyCount int `xml:"KeyCount"` + StartAfter string `xml:"StartAfter,omitempty"` } func (s3a *S3ApiServer) ListObjectsV2Handler(w http.ResponseWriter, r *http.Request) { diff --git a/weed/s3api/s3api_server.go b/weed/s3api/s3api_server.go index 010958245..25561447f 100644 --- a/weed/s3api/s3api_server.go +++ b/weed/s3api/s3api_server.go @@ -111,7 +111,7 @@ func (s3a *S3ApiServer) registerRouter(router *mux.Router) { } // ListBuckets - apiRouter.Methods("GET").Path("/").HandlerFunc(s3a.iam.Auth(s3a.ListBucketsHandler, ACTION_ADMIN)) + apiRouter.Methods("GET").Path("/").HandlerFunc(s3a.iam.Auth(s3a.ListBucketsHandler, ACTION_READ)) // NotFound apiRouter.NotFoundHandler = http.HandlerFunc(notFoundHandler) diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index 48e9253f0..405742e1e 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -171,7 +171,7 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr FullPath: util.JoinPath(req.Directory, req.Entry.Name), Attr: filer2.PbToEntryAttribute(req.Entry.Attributes), Chunks: chunks, - }, req.OExcl, req.IsFromOtherCluster) + }, req.OExcl, req.IsFromOtherCluster, req.Signatures) if createErr == nil { fs.filer.DeleteChunks(garbage) @@ -235,28 +235,26 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr glog.V(3).Infof("UpdateEntry %s: %v", filepath.Join(req.Directory, req.Entry.Name), err) } - fs.filer.NotifyUpdateEvent(ctx, entry, newEntry, true, req.IsFromOtherCluster) + fs.filer.NotifyUpdateEvent(ctx, entry, newEntry, true, req.IsFromOtherCluster, req.Signatures) return &filer_pb.UpdateEntryResponse{}, err } func (fs *FilerServer) cleanupChunks(existingEntry *filer2.Entry, newEntry *filer_pb.Entry) (chunks, garbage []*filer_pb.FileChunk, err error) { - chunks = newEntry.Chunks // remove old chunks if not included in the new ones if existingEntry != nil { garbage, err = filer2.MinusChunks(fs.lookupFileId, existingEntry.Chunks, newEntry.Chunks) if err != nil { - return chunks, nil, fmt.Errorf("MinusChunks: %v", err) + return newEntry.Chunks, nil, fmt.Errorf("MinusChunks: %v", err) } } // files with manifest chunks are usually large and append only, skip calculating covered chunks - var coveredChunks []*filer_pb.FileChunk - if !filer2.HasChunkManifest(newEntry.Chunks) { - chunks, coveredChunks = filer2.CompactFileChunks(fs.lookupFileId, newEntry.Chunks) - garbage = append(garbage, coveredChunks...) - } + manifestChunks, nonManifestChunks := filer2.SeparateManifestChunks(newEntry.Chunks) + + chunks, coveredChunks := filer2.CompactFileChunks(fs.lookupFileId, nonManifestChunks) + garbage = append(garbage, coveredChunks...) chunks, err = filer2.MaybeManifestize(fs.saveAsChunk( newEntry.Attributes.Replication, @@ -268,6 +266,9 @@ func (fs *FilerServer) cleanupChunks(existingEntry *filer2.Entry, newEntry *file // not good, but should be ok glog.V(0).Infof("MaybeManifestize: %v", err) } + + chunks = append(chunks, manifestChunks...) + return } @@ -311,7 +312,7 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo glog.V(0).Infof("MaybeManifestize: %v", err) } - err = fs.filer.CreateEntry(context.Background(), entry, false, false) + err = fs.filer.CreateEntry(context.Background(), entry, false, false, nil) return &filer_pb.AppendToEntryResponse{}, err } @@ -320,7 +321,7 @@ func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntr glog.V(4).Infof("DeleteEntry %v", req) - err = fs.filer.DeleteEntryMetaAndData(ctx, util.JoinPath(req.Directory, req.Name), req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData, req.IsFromOtherCluster) + err = fs.filer.DeleteEntryMetaAndData(ctx, util.JoinPath(req.Directory, req.Name), req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData, req.IsFromOtherCluster, nil) resp = &filer_pb.DeleteEntryResponse{} if err != nil { resp.Error = err.Error() diff --git a/weed/server/filer_grpc_server_rename.go b/weed/server/filer_grpc_server_rename.go index 9642fec24..cbb71682c 100644 --- a/weed/server/filer_grpc_server_rename.go +++ b/weed/server/filer_grpc_server_rename.go @@ -110,7 +110,7 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPat Attr: entry.Attr, Chunks: entry.Chunks, } - createErr := fs.filer.CreateEntry(ctx, newEntry, false, false) + createErr := fs.filer.CreateEntry(ctx, newEntry, false, false, nil) if createErr != nil { return createErr } @@ -124,7 +124,7 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPat } // delete old entry - deleteErr := fs.filer.DeleteEntryMetaAndData(ctx, oldPath, false, false, false, false) + deleteErr := fs.filer.DeleteEntryMetaAndData(ctx, oldPath, false, false, false, false, nil) if deleteErr != nil { return deleteErr } diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go index 4341f2091..2ad12e9c8 100644 --- a/weed/server/filer_grpc_server_sub_meta.go +++ b/weed/server/filer_grpc_server_sub_meta.go @@ -24,7 +24,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, lastReadTime := time.Unix(0, req.SinceNs) glog.V(0).Infof(" %v starts to subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) - eachEventNotificationFn := eachEventNotificationFn(req, stream, clientName) + eachEventNotificationFn := eachEventNotificationFn(req, stream, clientName, req.Signature) eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn) @@ -59,7 +59,7 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq lastReadTime := time.Unix(0, req.SinceNs) glog.V(0).Infof(" %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) - eachEventNotificationFn := eachEventNotificationFn(req, stream, clientName) + eachEventNotificationFn := eachEventNotificationFn(req, stream, clientName, req.Signature) eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn) @@ -104,9 +104,15 @@ func eachLogEntryFn(eachEventNotificationFn func(dirPath string, eventNotificati } } -func eachEventNotificationFn(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer, clientName string) func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error { +func eachEventNotificationFn(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer, clientName string, clientSignature int32) func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error { return func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error { + for _, sig := range eventNotification.Signatures { + if sig == clientSignature && clientSignature != 0 { + return nil + } + } + // get complete path to the file or directory var entryName string if eventNotification.OldEntry != nil { diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index d22376a45..0091ae3ce 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -111,7 +111,7 @@ func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { objectPath = objectPath[0 : len(objectPath)-1] } - err := fs.filer.DeleteEntryMetaAndData(context.Background(), util.FullPath(objectPath), isRecursive, ignoreRecursiveError, !skipChunkDeletion, false) + err := fs.filer.DeleteEntryMetaAndData(context.Background(), util.FullPath(objectPath), isRecursive, ignoreRecursiveError, !skipChunkDeletion, false, nil) if err != nil { glog.V(1).Infoln("deleting", objectPath, ":", err.Error()) httpStatus := http.StatusInternalServerError diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index 0365ea3ab..1d037f85f 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -148,7 +148,6 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa crTime = existingEntry.Crtime } - glog.V(4).Infoln("saving", path) entry := &filer2.Entry{ FullPath: util.FullPath(path), @@ -172,7 +171,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa Size: chunkOffset, } - if dbErr := fs.filer.CreateEntry(ctx, entry, false, false); dbErr != nil { + if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil); dbErr != nil { fs.filer.DeleteChunks(entry.Chunks) replyerr = dbErr filerResult.Error = dbErr.Error() diff --git a/weed/server/filer_server_handlers_write_cipher.go b/weed/server/filer_server_handlers_write_cipher.go index 6ec06d3de..670399425 100644 --- a/weed/server/filer_server_handlers_write_cipher.go +++ b/weed/server/filer_server_handlers_write_cipher.go @@ -80,7 +80,7 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht Size: int64(pu.OriginalDataSize), } - if dbErr := fs.filer.CreateEntry(ctx, entry, false, false); dbErr != nil { + if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil); dbErr != nil { fs.filer.DeleteChunks(entry.Chunks) err = dbErr filerResult.Error = dbErr.Error() diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index 579b30a6a..d310a27d4 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -87,7 +87,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ dn.UpAdjustMaxVolumeCountDelta(delta) } - glog.V(5).Infof("master received heartbeat %s", heartbeat.String()) + glog.V(4).Infof("master received heartbeat %s", heartbeat.String()) message := &master_pb.VolumeLocation{ Url: dn.Url(), PublicUrl: dn.PublicUrl, diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 377fac26f..ae59636ad 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -7,7 +7,6 @@ import ( "net/url" "os" "regexp" - "strconv" "strings" "sync" "time" @@ -210,7 +209,7 @@ func (ms *MasterServer) startAdminScripts() { scriptLines = append(scriptLines, "unlock") } - masterAddress := "localhost:" + strconv.Itoa(ms.option.Port) + masterAddress := fmt.Sprintf("%s:%d",ms.option.Host, ms.option.Port) var shellOptions shell.ShellOptions shellOptions.GrpcDialOption = security.LoadClientTLS(v, "grpc.master") diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go index c62a4a388..3a1b95f26 100644 --- a/weed/server/volume_grpc_client_to_master.go +++ b/weed/server/volume_grpc_client_to_master.go @@ -168,13 +168,17 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi return "", err } case <-volumeTickChan: - glog.V(5).Infof("volume server %s:%d heartbeat", vs.store.Ip, vs.store.Port) - if err = stream.Send(vs.store.CollectHeartbeat()); err != nil { - glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err) - return "", err + if vs.SendHeartbeat { + glog.V(4).Infof("volume server %s:%d heartbeat", vs.store.Ip, vs.store.Port) + if err = stream.Send(vs.store.CollectHeartbeat()); err != nil { + glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err) + return "", err + } + } else { + glog.V(4).Infof("volume server %s:%d skip send heartbeat", vs.store.Ip, vs.store.Port) } case <-ecShardTickChan: - glog.V(5).Infof("volume server %s:%d ec heartbeat", vs.store.Ip, vs.store.Port) + glog.V(4).Infof("volume server %s:%d ec heartbeat", vs.store.Ip, vs.store.Port) if err = stream.Send(vs.store.CollectErasureCodingHeartbeat()); err != nil { glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err) return "", err diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go index 3af37b491..6612e9045 100644 --- a/weed/server/volume_server.go +++ b/weed/server/volume_server.go @@ -31,6 +31,7 @@ type VolumeServer struct { MetricsAddress string MetricsIntervalSec int fileSizeLimitBytes int64 + SendHeartbeat bool } func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, @@ -66,16 +67,17 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.volume"), compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024, fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024, + SendHeartbeat: true, } vs.SeedMasterNodes = masterNodes vs.store = storage.NewStore(vs.grpcDialOption, port, ip, publicUrl, folders, maxCounts, minFreeSpacePercents, vs.needleMapKind) vs.guard = security.NewGuard(whiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec) handleStaticResources(adminMux) + adminMux.HandleFunc("/status", vs.statusHandler) if signingKey == "" || enableUiAccess { // only expose the volume server details for safe environments adminMux.HandleFunc("/ui/index.html", vs.uiStatusHandler) - adminMux.HandleFunc("/status", vs.guard.WhiteList(vs.statusHandler)) /* adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler)) adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler)) diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index 3d2629c19..f06189e34 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -70,6 +70,7 @@ type WebDavFileSystem struct { filer *filer2.Filer grpcDialOption grpc.DialOption chunkCache *chunk_cache.TieredChunkCache + signature int32 } type FileInfo struct { @@ -103,6 +104,7 @@ func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) { return &WebDavFileSystem{ option: option, chunkCache: chunkCache, + signature: util.RandomInt32(), }, nil } @@ -165,6 +167,7 @@ func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm Gid: fs.option.Gid, }, }, + Signatures: []int32{fs.signature}, } glog.V(1).Infof("mkdir: %v", request) @@ -216,6 +219,7 @@ func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, f TtlSec: 0, }, }, + Signatures: []int32{fs.signature}, }); err != nil { return fmt.Errorf("create %s: %v", fullFilePath, err) } @@ -255,7 +259,7 @@ func (fs *WebDavFileSystem) removeAll(ctx context.Context, fullFilePath string) dir, name := util.FullPath(fullFilePath).DirAndName() - return filer_pb.Remove(fs, dir, name, true, false, false, false) + return filer_pb.Remove(fs, dir, name, true, false, false, false, fs.signature) } @@ -422,8 +426,9 @@ func (f *WebDavFile) Write(buf []byte) (int, error) { f.entry.Attributes.Replication = replication request := &filer_pb.UpdateEntryRequest{ - Directory: dir, - Entry: f.entry, + Directory: dir, + Entry: f.entry, + Signatures: []int32{f.fs.signature}, } if _, err := client.UpdateEntry(ctx, request); err != nil { diff --git a/weed/shell/command_bucket_delete.go b/weed/shell/command_bucket_delete.go index 8f5f63b46..03c878e6a 100644 --- a/weed/shell/command_bucket_delete.go +++ b/weed/shell/command_bucket_delete.go @@ -49,6 +49,6 @@ func (c *commandBucketDelete) Do(args []string, commandEnv *CommandEnv, writer i return fmt.Errorf("read buckets: %v", err) } - return filer_pb.Remove(commandEnv, filerBucketsPath, *bucketName, false, true, true, false) + return filer_pb.Remove(commandEnv, filerBucketsPath, *bucketName, false, true, true, false, 0) } diff --git a/weed/shell/command_fs_meta_cat.go b/weed/shell/command_fs_meta_cat.go index 0679ec075..8cba2d520 100644 --- a/weed/shell/command_fs_meta_cat.go +++ b/weed/shell/command_fs_meta_cat.go @@ -3,6 +3,7 @@ package shell import ( "fmt" "io" + "sort" "github.com/golang/protobuf/jsonpb" @@ -54,6 +55,13 @@ func (c *commandFsMetaCat) Do(args []string, commandEnv *CommandEnv, writer io.W Indent: " ", } + sort.Slice(respLookupEntry.Entry.Chunks, func(i, j int) bool { + if respLookupEntry.Entry.Chunks[i].Offset == respLookupEntry.Entry.Chunks[j].Offset { + return respLookupEntry.Entry.Chunks[i].Mtime < respLookupEntry.Entry.Chunks[j].Mtime + } + return respLookupEntry.Entry.Chunks[i].Offset < respLookupEntry.Entry.Chunks[j].Offset + }) + text, marshalErr := m.MarshalToString(respLookupEntry.Entry) if marshalErr != nil { return fmt.Errorf("marshal meta: %v", marshalErr) diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index 6b5e4e735..e17f35c67 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -27,7 +27,7 @@ func (c *commandVolumeFixReplication) Name() string { func (c *commandVolumeFixReplication) Help() string { return `add replicas to volumes that are missing replicas - This command file all under-replicated volumes, and find volume servers with free slots. + This command finds all under-replicated volumes, and finds volume servers with free slots. If the free slots satisfy the replication requirement, the volume content is copied over and mounted. volume.fix.replication -n # do not take action diff --git a/weed/storage/disk_location_ec.go b/weed/storage/disk_location_ec.go index 72d3e2b3e..07fab96d9 100644 --- a/weed/storage/disk_location_ec.go +++ b/weed/storage/disk_location_ec.go @@ -3,6 +3,7 @@ package storage import ( "fmt" "io/ioutil" + "os" "path" "regexp" "sort" @@ -58,6 +59,9 @@ func (l *DiskLocation) LoadEcShard(collection string, vid needle.VolumeId, shard ecVolumeShard, err := erasure_coding.NewEcVolumeShard(l.Directory, collection, vid, shardId) if err != nil { + if err == os.ErrNotExist { + return os.ErrNotExist + } return fmt.Errorf("failed to create ec shard %d.%d: %v", vid, shardId, err) } l.ecVolumesLock.Lock() diff --git a/weed/storage/erasure_coding/ec_shard.go b/weed/storage/erasure_coding/ec_shard.go index 47e6d3d1e..74ed99198 100644 --- a/weed/storage/erasure_coding/ec_shard.go +++ b/weed/storage/erasure_coding/ec_shard.go @@ -5,6 +5,7 @@ import ( "os" "path" "strconv" + "strings" "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/storage/needle" @@ -29,11 +30,14 @@ func NewEcVolumeShard(dirname string, collection string, id needle.VolumeId, sha // open ecd file if v.ecdFile, e = os.OpenFile(baseFileName+ToExt(int(shardId)), os.O_RDONLY, 0644); e != nil { - return nil, fmt.Errorf("cannot read ec volume shard %s.%s: %v", baseFileName, ToExt(int(shardId)), e) + if e == os.ErrNotExist || strings.Contains(e.Error(), "no such file or directory") { + return nil, os.ErrNotExist + } + return nil, fmt.Errorf("cannot read ec volume shard %s%s: %v", baseFileName, ToExt(int(shardId)), e) } ecdFi, statErr := v.ecdFile.Stat() if statErr != nil { - return nil, fmt.Errorf("can not stat ec volume shard %s.%s: %v", baseFileName, ToExt(int(shardId)), statErr) + return nil, fmt.Errorf("can not stat ec volume shard %s%s: %v", baseFileName, ToExt(int(shardId)), statErr) } v.ecdFileSize = ecdFi.Size() diff --git a/weed/storage/needle/needle.go b/weed/storage/needle/needle.go index 0d962886b..34d29ab6e 100644 --- a/weed/storage/needle/needle.go +++ b/weed/storage/needle/needle.go @@ -24,7 +24,7 @@ const ( type Needle struct { Cookie Cookie `comment:"random number to mitigate brute force lookups"` Id NeedleId `comment:"needle id"` - Size Size `comment:"sum of DataSize,Data,NameSize,Name,MimeSize,Mime"` + Size Size `comment:"sum of DataSize,Data,NameSize,Name,MimeSize,Mime"` DataSize uint32 `comment:"Data size"` //version2 Data []byte `comment:"The actual file data"` diff --git a/weed/storage/needle_map/compact_map.go b/weed/storage/needle_map/compact_map.go index 81ff27c45..2b1a471bc 100644 --- a/weed/storage/needle_map/compact_map.go +++ b/weed/storage/needle_map/compact_map.go @@ -18,7 +18,7 @@ const SectionalNeedleIdLimit = 1<<32 - 1 type SectionalNeedleValue struct { Key SectionalNeedleId OffsetLower OffsetLower `comment:"Volume offset"` //since aligned to 8 bytes, range is 4G*8=32G - Size Size `comment:"Size of the data portion"` + Size Size `comment:"Size of the data portion"` } type SectionalNeedleValueExtra struct { @@ -116,7 +116,7 @@ func (cs *CompactSection) deleteOverflowEntry(key SectionalNeedleId) { }) if deleteCandidate != length && cs.overflow[deleteCandidate].Key == key { if cs.overflow[deleteCandidate].Size.IsValid() { - cs.overflow[deleteCandidate].Size = - cs.overflow[deleteCandidate].Size + cs.overflow[deleteCandidate].Size = -cs.overflow[deleteCandidate].Size } } } diff --git a/weed/storage/needle_map/needle_value.go b/weed/storage/needle_map/needle_value.go index f4687cb79..f8d614660 100644 --- a/weed/storage/needle_map/needle_value.go +++ b/weed/storage/needle_map/needle_value.go @@ -9,7 +9,7 @@ import ( type NeedleValue struct { Key NeedleId Offset Offset `comment:"Volume offset"` //since aligned to 8 bytes, range is 4G*8=32G - Size Size `comment:"Size of the data portion"` + Size Size `comment:"Size of the data portion"` } func (this NeedleValue) Less(than btree.Item) bool { diff --git a/weed/storage/store_ec.go b/weed/storage/store_ec.go index 8b4388519..bd7bdacbd 100644 --- a/weed/storage/store_ec.go +++ b/weed/storage/store_ec.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "os" "sort" "sync" "time" @@ -59,6 +60,8 @@ func (s *Store) MountEcShards(collection string, vid needle.VolumeId, shardId er EcIndexBits: uint32(shardBits.AddShardId(shardId)), } return nil + } else if err == os.ErrNotExist { + continue } else { return fmt.Errorf("%s load ec shard %d.%d: %v", location.Directory, vid, shardId, err) } diff --git a/weed/storage/types/needle_types.go b/weed/storage/types/needle_types.go index 7e30d2bd8..137b97d7f 100644 --- a/weed/storage/types/needle_types.go +++ b/weed/storage/types/needle_types.go @@ -18,7 +18,7 @@ func (s Size) IsDeleted() bool { return s < 0 || s == TombstoneFileSize } func (s Size) IsValid() bool { - return s >0 && s != TombstoneFileSize + return s > 0 && s != TombstoneFileSize } type OffsetLower struct { diff --git a/weed/util/bounded_tree/bounded_tree.go b/weed/util/bounded_tree/bounded_tree.go index 40b9c4e47..0e023c0d1 100644 --- a/weed/util/bounded_tree/bounded_tree.go +++ b/weed/util/bounded_tree/bounded_tree.go @@ -15,7 +15,7 @@ type Node struct { type BoundedTree struct { root *Node - sync.Mutex + sync.RWMutex } func NewBoundedTree() *BoundedTree { @@ -131,6 +131,9 @@ func (n *Node) getChild(childName string) *Node { func (t *BoundedTree) HasVisited(p util.FullPath) bool { + t.RLock() + defer t.RUnlock() + if t.root == nil { return true } diff --git a/weed/util/bytes.go b/weed/util/bytes.go index 890d50586..82e4ddeef 100644 --- a/weed/util/bytes.go +++ b/weed/util/bytes.go @@ -2,6 +2,7 @@ package util import ( "crypto/md5" + "crypto/rand" "encoding/base64" "fmt" "io" @@ -135,3 +136,9 @@ func Base64Md5ToBytes(contentMd5 string) []byte { } return data } + +func RandomInt32() int32 { + buf := make([]byte, 4) + rand.Read(buf) + return int32(BytesToUint32(buf)) +} diff --git a/weed/util/chunk_cache/chunk_cache.go b/weed/util/chunk_cache/chunk_cache.go index a1a054215..2b0c635a1 100644 --- a/weed/util/chunk_cache/chunk_cache.go +++ b/weed/util/chunk_cache/chunk_cache.go @@ -94,7 +94,7 @@ func (c *TieredChunkCache) SetChunk(fileId string, data []byte) { c.Lock() defer c.Unlock() - glog.V(5).Infof("SetChunk %s size %d\n", fileId, len(data)) + glog.V(4).Infof("SetChunk %s size %d\n", fileId, len(data)) c.doSetChunk(fileId, data) } diff --git a/weed/util/constants.go b/weed/util/constants.go index e614a5d24..d48b9e32d 100644 --- a/weed/util/constants.go +++ b/weed/util/constants.go @@ -5,7 +5,7 @@ import ( ) var ( - VERSION = fmt.Sprintf("%s %d.%d", sizeLimit, 1, 90) + VERSION = fmt.Sprintf("%s %d.%d", sizeLimit, 1, 92) COMMIT = "" ) diff --git a/weed/util/http_util.go b/weed/util/http_util.go index 5159fcd17..7cc64ea85 100644 --- a/weed/util/http_util.go +++ b/weed/util/http_util.go @@ -368,6 +368,7 @@ func ReadUrlAsReaderCloser(fileUrl string, rangeHeader string) (io.ReadCloser, e if err != nil { return nil, err } + defer CloseResponse(r) if r.StatusCode >= 400 { return nil, fmt.Errorf("%s: %s", fileUrl, r.Status) } diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index cb9565fb2..e4310b5c5 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -53,7 +53,7 @@ func NewLogBuffer(flushInterval time.Duration, flushFn func(startTime, stopTime return lb } -func (m *LogBuffer) AddToBuffer(partitionKey, data []byte) { +func (m *LogBuffer) AddToBuffer(partitionKey, data []byte, eventTsNs int64) { m.Lock() defer func() { @@ -64,16 +64,21 @@ func (m *LogBuffer) AddToBuffer(partitionKey, data []byte) { }() // need to put the timestamp inside the lock - ts := time.Now() - tsNs := ts.UnixNano() - if m.lastTsNs >= tsNs { + var ts time.Time + if eventTsNs == 0 { + ts = time.Now() + eventTsNs = ts.UnixNano() + } else { + ts = time.Unix(0, eventTsNs) + } + if m.lastTsNs >= eventTsNs { // this is unlikely to happen, but just in case - tsNs = m.lastTsNs + 1 - ts = time.Unix(0, tsNs) + eventTsNs = m.lastTsNs + 1 + ts = time.Unix(0, eventTsNs) } - m.lastTsNs = tsNs + m.lastTsNs = eventTsNs logEntry := &filer_pb.LogEntry{ - TsNs: tsNs, + TsNs: eventTsNs, PartitionKeyHash: util.HashToInt32(partitionKey), Data: data, } @@ -249,7 +254,7 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Bu return nil } -func (m *LogBuffer) ReleaseMeory(b *bytes.Buffer) { +func (m *LogBuffer) ReleaseMemory(b *bytes.Buffer) { bufferPool.Put(b) } diff --git a/weed/util/log_buffer/log_buffer_test.go b/weed/util/log_buffer/log_buffer_test.go index f9ccc95c2..3d77afb18 100644 --- a/weed/util/log_buffer/log_buffer_test.go +++ b/weed/util/log_buffer/log_buffer_test.go @@ -23,7 +23,7 @@ func TestNewLogBufferFirstBuffer(t *testing.T) { var buf = make([]byte, messageSize) for i := 0; i < messageCount; i++ { rand.Read(buf) - lb.AddToBuffer(nil, buf) + lb.AddToBuffer(nil, buf, 0) } receivedmessageCount := 0 diff --git a/weed/util/log_buffer/log_read.go b/weed/util/log_buffer/log_read.go index 2b73a8064..f0486ac46 100644 --- a/weed/util/log_buffer/log_read.go +++ b/weed/util/log_buffer/log_read.go @@ -20,14 +20,14 @@ func (logBuffer *LogBuffer) LoopProcessLogData( lastReadTime := startTreadTime defer func() { if bytesBuf != nil { - logBuffer.ReleaseMeory(bytesBuf) + logBuffer.ReleaseMemory(bytesBuf) } }() for { if bytesBuf != nil { - logBuffer.ReleaseMeory(bytesBuf) + logBuffer.ReleaseMemory(bytesBuf) } bytesBuf = logBuffer.ReadFromBuffer(lastReadTime) // fmt.Printf("ReadFromBuffer by %v\n", lastReadTime) |
