aboutsummaryrefslogtreecommitdiff
path: root/weed/replication/replicator.go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-09-09 11:21:23 -0700
committerChris Lu <chris.lu@gmail.com>2020-09-09 11:21:23 -0700
commit387ab6796f274151f802ccdab8756b959b5fb1cb (patch)
treea3b95f5bdba66f12c609b5e53b262b011a47a450 /weed/replication/replicator.go
parent4fc0bd1a8173e284ff919edb5214f5adf7a90f06 (diff)
downloadseaweedfs-387ab6796f274151f802ccdab8756b959b5fb1cb.tar.xz
seaweedfs-387ab6796f274151f802ccdab8756b959b5fb1cb.zip
filer: cross cluster synchronization
Diffstat (limited to 'weed/replication/replicator.go')
-rw-r--r--weed/replication/replicator.go27
1 files changed, 22 insertions, 5 deletions
diff --git a/weed/replication/replicator.go b/weed/replication/replicator.go
index 051199adb..e0535175e 100644
--- a/weed/replication/replicator.go
+++ b/weed/replication/replicator.go
@@ -3,6 +3,8 @@ package replication
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "google.golang.org/grpc"
"strings"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -43,28 +45,43 @@ func (r *Replicator) Replicate(ctx context.Context, key string, message *filer_p
key = newKey
if message.OldEntry != nil && message.NewEntry == nil {
glog.V(4).Infof("deleting %v", key)
- return r.sink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks)
+ return r.sink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
}
if message.OldEntry == nil && message.NewEntry != nil {
glog.V(4).Infof("creating %v", key)
- return r.sink.CreateEntry(key, message.NewEntry)
+ return r.sink.CreateEntry(key, message.NewEntry, message.Signatures)
}
if message.OldEntry == nil && message.NewEntry == nil {
glog.V(0).Infof("weird message %+v", message)
return nil
}
- foundExisting, err := r.sink.UpdateEntry(key, message.OldEntry, message.NewParentPath, message.NewEntry, message.DeleteChunks)
+ foundExisting, err := r.sink.UpdateEntry(key, message.OldEntry, message.NewParentPath, message.NewEntry, message.DeleteChunks, message.Signatures)
if foundExisting {
glog.V(4).Infof("updated %v", key)
return err
}
- err = r.sink.DeleteEntry(key, message.OldEntry.IsDirectory, false)
+ err = r.sink.DeleteEntry(key, message.OldEntry.IsDirectory, false, message.Signatures)
if err != nil {
return fmt.Errorf("delete old entry %v: %v", key, err)
}
glog.V(4).Infof("creating missing %v", key)
- return r.sink.CreateEntry(key, message.NewEntry)
+ return r.sink.CreateEntry(key, message.NewEntry, message.Signatures)
}
+
+func ReadFilerSignature(grpcDialOption grpc.DialOption, filer string) (filerSignature int32, readErr error) {
+ if readErr = pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ if resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}); err != nil {
+ return fmt.Errorf("GetFilerConfiguration %s: %v", filer, err)
+ } else {
+ filerSignature = resp.Signature
+ }
+ return nil
+ }); readErr != nil {
+ return 0, readErr
+ }
+ return filerSignature, nil
+}
+