diff options
Diffstat (limited to 'weed/replication/sink/filer_sink.go')
| -rw-r--r-- | weed/replication/sink/filer_sink.go | 38 |
1 files changed, 20 insertions, 18 deletions
diff --git a/weed/replication/sink/filer_sink.go b/weed/replication/sink/filer_sink.go index 57ebeddff..387bffb58 100644 --- a/weed/replication/sink/filer_sink.go +++ b/weed/replication/sink/filer_sink.go @@ -12,36 +12,38 @@ import ( ) type ReplicationSink interface { - DeleteEntry(entry *filer_pb.Entry, deleteIncludeChunks bool) error - CreateEntry(entry *filer_pb.Entry) error - UpdateEntry(oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) error + DeleteEntry(key string, entry *filer_pb.Entry, deleteIncludeChunks bool) error + CreateEntry(key string, entry *filer_pb.Entry) error + UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) error + GetDirectory() string } type FilerSink struct { grpcAddress string - id string dir string } +func (fs *FilerSink) GetDirectory() string { + return fs.dir +} + func (fs *FilerSink) Initialize(configuration util.Configuration) error { return fs.initialize( configuration.GetString("grpcAddress"), - configuration.GetString("id"), configuration.GetString("directory"), ) } -func (fs *FilerSink) initialize(grpcAddress string, id string, dir string) (err error) { +func (fs *FilerSink) initialize(grpcAddress string, dir string) (err error) { fs.grpcAddress = grpcAddress - fs.id = id fs.dir = dir return nil } -func (fs *FilerSink) DeleteEntry(entry *filer_pb.Entry, deleteIncludeChunks bool) error { +func (fs *FilerSink) DeleteEntry(key string, entry *filer_pb.Entry, deleteIncludeChunks bool) error { return fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { - dir, name := filer2.FullPath(entry.Name).DirAndName() + dir, name := filer2.FullPath(key).DirAndName() request := &filer_pb.DeleteEntryRequest{ Directory: dir, @@ -53,26 +55,26 @@ func (fs *FilerSink) DeleteEntry(entry *filer_pb.Entry, deleteIncludeChunks bool glog.V(1).Infof("delete entry: %v", request) _, err := client.DeleteEntry(context.Background(), request) if err != nil { - glog.V(0).Infof("delete entry %s: %v", entry.Name, err) - return fmt.Errorf("delete entry %s: %v", entry.Name, err) + glog.V(0).Infof("delete entry %s: %v", key, err) + return fmt.Errorf("delete entry %s: %v", key, err) } return nil }) } -func (fs *FilerSink) CreateEntry(entry *filer_pb.Entry) error { +func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry) error { replicatedChunks, err := replicateChunks(entry.Chunks) if err != nil { - glog.V(0).Infof("replicate entry chunks %s: %v", entry.Name, err) - return fmt.Errorf("replicate entry chunks %s: %v", entry.Name, err) + glog.V(0).Infof("replicate entry chunks %s: %v", key, err) + return fmt.Errorf("replicate entry chunks %s: %v", key, err) } return fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { - dir, name := filer2.FullPath(entry.Name).DirAndName() + dir, name := filer2.FullPath(key).DirAndName() request := &filer_pb.CreateEntryRequest{ Directory: dir, @@ -86,15 +88,15 @@ func (fs *FilerSink) CreateEntry(entry *filer_pb.Entry) error { glog.V(1).Infof("create: %v", request) if _, err := client.CreateEntry(context.Background(), request); err != nil { - glog.V(0).Infof("create entry %s: %v", entry.Name, err) - return fmt.Errorf("create entry %s: %v", entry.Name, err) + glog.V(0).Infof("create entry %s: %v", key, err) + return fmt.Errorf("create entry %s: %v", key, err) } return nil }) } -func (fs *FilerSink) UpdateEntry(oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) error { +func (fs *FilerSink) UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) error { return nil } |
