diff options
Diffstat (limited to 'weed/replication/sink/filer_sink.go')
| -rw-r--r-- | weed/replication/sink/filer_sink.go | 138 |
1 files changed, 82 insertions, 56 deletions
diff --git a/weed/replication/sink/filer_sink.go b/weed/replication/sink/filer_sink.go index 387bffb58..a39402aeb 100644 --- a/weed/replication/sink/filer_sink.go +++ b/weed/replication/sink/filer_sink.go @@ -1,14 +1,14 @@ package sink import ( + "fmt" + "context" + "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "fmt" - "strings" "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" - "context" - "sync" + "github.com/chrislusf/seaweedfs/weed/replication/source" ) type ReplicationSink interface { @@ -16,11 +16,17 @@ type ReplicationSink interface { CreateEntry(key string, entry *filer_pb.Entry) error UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) error GetDirectory() string + SetSourceFiler(s *source.FilerSource) } type FilerSink struct { grpcAddress string dir string + filerSource *source.FilerSource + replication string + collection string + ttlSec int32 + dataCenter string } func (fs *FilerSink) GetDirectory() string { @@ -34,6 +40,10 @@ func (fs *FilerSink) Initialize(configuration util.Configuration) error { ) } +func (fs *FilerSink) SetSourceFiler(s *source.FilerSource) { + fs.filerSource = s +} + func (fs *FilerSink) initialize(grpcAddress string, dir string) (err error) { fs.grpcAddress = grpcAddress fs.dir = dir @@ -65,13 +75,15 @@ func (fs *FilerSink) DeleteEntry(key string, entry *filer_pb.Entry, deleteInclud func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry) error { - replicatedChunks, err := replicateChunks(entry.Chunks) + replicatedChunks, err := fs.replicateChunks(entry.Chunks) if err != nil { glog.V(0).Infof("replicate entry chunks %s: %v", key, err) return fmt.Errorf("replicate entry chunks %s: %v", key, err) } + glog.V(0).Infof("replicated %s %+v ===> %+v", key, entry.Chunks, replicatedChunks) + return fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { dir, name := filer2.FullPath(key).DirAndName() @@ -96,70 +108,84 @@ func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry) error { }) } -func (fs *FilerSink) UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) error { - return nil -} +func (fs *FilerSink) UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (err error) { -func (fs *FilerSink) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { + ctx := context.Background() - grpcConnection, err := util.GrpcDial(fs.grpcAddress) - if err != nil { - return fmt.Errorf("fail to dial %s: %v", fs.grpcAddress, err) - } - defer grpcConnection.Close() + dir, name := filer2.FullPath(key).DirAndName() - client := filer_pb.NewSeaweedFilerClient(grpcConnection) + // find out what changed + deletedChunks, newChunks := compareChunks(oldEntry, newEntry) - return fn(client) -} + // read existing entry + var entry *filer_pb.Entry + err = fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { -func volumeId(fileId string) string { - lastCommaIndex := strings.LastIndex(fileId, ",") - if lastCommaIndex > 0 { - return fileId[:lastCommaIndex] - } - return fileId -} + request := &filer_pb.LookupDirectoryEntryRequest{ + Directory: dir, + Name: name, + } -func replicateChunks(sourceChunks []*filer_pb.FileChunk) (replicatedChunks []*filer_pb.FileChunk, err error) { - if len(sourceChunks) == 0 { - return - } - var wg sync.WaitGroup - for _, s := range sourceChunks { - wg.Add(1) - go func(chunk *filer_pb.FileChunk) { - defer wg.Done() - replicatedChunk, e := replicateOneChunk(chunk) - if e != nil { - err = e - } - replicatedChunks = append(replicatedChunks, replicatedChunk) - }(s) - } - wg.Wait() + glog.V(4).Infof("lookup directory entry: %v", request) + resp, err := client.LookupDirectoryEntry(ctx, request) + if err != nil { + glog.V(0).Infof("lookup %s: %v", key, err) + return err + } - return -} + entry = resp.Entry -func replicateOneChunk(sourceChunk *filer_pb.FileChunk) (*filer_pb.FileChunk, error) { + return nil + }) - fileId, err := fetchAndWrite(sourceChunk) if err != nil { - return nil, fmt.Errorf("copy %s: %v", sourceChunk.FileId, err) + return err } - return &filer_pb.FileChunk{ - FileId: fileId, - Offset: sourceChunk.Offset, - Size: sourceChunk.Size, - Mtime: sourceChunk.Mtime, - ETag: sourceChunk.ETag, - SourceFileId: sourceChunk.FileId, - }, nil -} + // delete the chunks that are deleted from the source + if deleteIncludeChunks { + // remove the deleted chunks. Actual data deletion happens in filer UpdateEntry FindUnusedFileChunks + entry.Chunks = minusChunks(entry.Chunks, deletedChunks) + } + + // replicate the chunks that are new in the source + replicatedChunks, err := fs.replicateChunks(newChunks) + entry.Chunks = append(entry.Chunks, replicatedChunks...) -func fetchAndWrite(sourceChunk *filer_pb.FileChunk) (fileId string, err error) { + // save updated meta data + return fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { + + request := &filer_pb.UpdateEntryRequest{ + Directory: dir, + Entry: entry, + } + + if _, err := client.UpdateEntry(ctx, request); err != nil { + return fmt.Errorf("update entry %s: %v", key, err) + } + return nil + }) + +} +func compareChunks(oldEntry, newEntry *filer_pb.Entry) (deletedChunks, newChunks []*filer_pb.FileChunk) { + deletedChunks = minusChunks(oldEntry.Chunks, newEntry.Chunks) + newChunks = minusChunks(newEntry.Chunks, oldEntry.Chunks) + return +} + +func minusChunks(as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk) { + for _, a := range as { + found := false + for _, b := range bs { + if a.FileId == b.FileId { + found = true + break + } + } + if !found { + delta = append(delta, a) + } + } return } |
