diff options
| author | chrislu <chris.lu@gmail.com> | 2022-07-23 10:50:28 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2022-07-23 10:50:28 -0700 |
| commit | 64f3d6fb6e1acb007b3e4726962c7ea35bacc4c4 (patch) | |
| tree | 30903fa25af0bcf797f93e354f68fb41216dc602 /weed/filer | |
| parent | 2c8818351f418e3584a6c5410c396f747aebd725 (diff) | |
| download | seaweedfs-64f3d6fb6e1acb007b3e4726962c7ea35bacc4c4.tar.xz seaweedfs-64f3d6fb6e1acb007b3e4726962c7ea35bacc4c4.zip | |
metadata subscription uses client epoch
Diffstat (limited to 'weed/filer')
| -rw-r--r-- | weed/filer/filer.go | 14 | ||||
| -rw-r--r-- | weed/filer/filer_notify.go | 2 | ||||
| -rw-r--r-- | weed/filer/meta_aggregator.go | 12 |
3 files changed, 18 insertions, 10 deletions
diff --git a/weed/filer/filer.go b/weed/filer/filer.go index 15fe69116..e953d014f 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -45,7 +45,8 @@ type Filer struct { Signature int32 FilerConf *FilerConf RemoteStorage *FilerRemoteStorage - UniqueFileId uint32 + UniqueFilerId int32 + UniqueFilerEpoch int32 } func NewFiler(masters map[string]pb.ServerAddress, grpcDialOption grpc.DialOption, filerHost pb.ServerAddress, @@ -56,8 +57,12 @@ func NewFiler(masters map[string]pb.ServerAddress, grpcDialOption grpc.DialOptio GrpcDialOption: grpcDialOption, FilerConf: NewFilerConf(), RemoteStorage: NewFilerRemoteStorage(), - UniqueFileId: uint32(util.RandomInt32()), + UniqueFilerId: util.RandomInt32(), } + if f.UniqueFilerId < 0 { + f.UniqueFilerId = -f.UniqueFilerId + } + f.LocalMetaLogBuffer = log_buffer.NewLogBuffer("local", LogFlushInterval, f.logFlushFunc, notifyFn) f.metaLogCollection = collection f.metaLogReplication = replication @@ -79,8 +84,9 @@ func (f *Filer) MaybeBootstrapFromPeers(self pb.ServerAddress, existingNodes []* return } - glog.V(0).Infof("bootstrap from %v clientId:%d", earliestNode.Address, f.UniqueFileId) - err = pb.FollowMetadata(pb.ServerAddress(earliestNode.Address), f.GrpcDialOption, "bootstrap", int32(f.UniqueFileId), "/", nil, + glog.V(0).Infof("bootstrap from %v clientId:%d", earliestNode.Address, f.UniqueFilerId) + f.UniqueFilerEpoch++ + err = pb.FollowMetadata(pb.ServerAddress(earliestNode.Address), f.GrpcDialOption, "bootstrap", f.UniqueFilerId, f.UniqueFilerEpoch, "/", nil, 0, snapshotTime.UnixNano(), f.Signature, func(resp *filer_pb.SubscribeMetadataResponse) error { return Replay(f.Store, resp) }, pb.FatalOnError) diff --git a/weed/filer/filer_notify.go b/weed/filer/filer_notify.go index 4d26a695c..66c77631e 100644 --- a/weed/filer/filer_notify.go +++ b/weed/filer/filer_notify.go @@ -94,7 +94,7 @@ func (f *Filer) logFlushFunc(startTime, stopTime time.Time, buf []byte) { startTime, stopTime = startTime.UTC(), stopTime.UTC() targetFile := fmt.Sprintf("%s/%04d-%02d-%02d/%02d-%02d.%08x", SystemLogDir, - startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(), f.UniqueFileId, + startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(), f.UniqueFilerId, // startTime.Second(), startTime.Nanosecond(), ) diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go index 5799e247e..a6c32428d 100644 --- a/weed/filer/meta_aggregator.go +++ b/weed/filer/meta_aggregator.go @@ -190,15 +190,17 @@ func (ma *MetaAggregator) doSubscribeToOneFiler(f *Filer, self pb.ServerAddress, return nil } - glog.V(0).Infof("subscribing remote %s meta change: %v, clientId:%d", peer, time.Unix(0, lastTsNs), ma.filer.UniqueFileId) + glog.V(0).Infof("subscribing remote %s meta change: %v, clientId:%d", peer, time.Unix(0, lastTsNs), ma.filer.UniqueFilerId) err = pb.WithFilerClient(true, peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + ma.filer.UniqueFilerEpoch++ stream, err := client.SubscribeLocalMetadata(ctx, &filer_pb.SubscribeMetadataRequest{ - ClientName: "filer:" + string(self), - PathPrefix: "/", - SinceNs: lastTsNs, - ClientId: int32(ma.filer.UniqueFileId), + ClientName: "filer:" + string(self), + PathPrefix: "/", + SinceNs: lastTsNs, + ClientId: ma.filer.UniqueFilerId, + ClientEpoch: ma.filer.UniqueFilerEpoch, }) if err != nil { return fmt.Errorf("subscribe: %v", err) |
