aboutsummaryrefslogtreecommitdiff
path: root/weed/filer
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-07-23 10:50:28 -0700
committerchrislu <chris.lu@gmail.com>2022-07-23 10:50:28 -0700
commit64f3d6fb6e1acb007b3e4726962c7ea35bacc4c4 (patch)
tree30903fa25af0bcf797f93e354f68fb41216dc602 /weed/filer
parent2c8818351f418e3584a6c5410c396f747aebd725 (diff)
downloadseaweedfs-64f3d6fb6e1acb007b3e4726962c7ea35bacc4c4.tar.xz
seaweedfs-64f3d6fb6e1acb007b3e4726962c7ea35bacc4c4.zip
metadata subscription uses client epoch
Diffstat (limited to 'weed/filer')
-rw-r--r--weed/filer/filer.go14
-rw-r--r--weed/filer/filer_notify.go2
-rw-r--r--weed/filer/meta_aggregator.go12
3 files changed, 18 insertions, 10 deletions
diff --git a/weed/filer/filer.go b/weed/filer/filer.go
index 15fe69116..e953d014f 100644
--- a/weed/filer/filer.go
+++ b/weed/filer/filer.go
@@ -45,7 +45,8 @@ type Filer struct {
Signature int32
FilerConf *FilerConf
RemoteStorage *FilerRemoteStorage
- UniqueFileId uint32
+ UniqueFilerId int32
+ UniqueFilerEpoch int32
}
func NewFiler(masters map[string]pb.ServerAddress, grpcDialOption grpc.DialOption, filerHost pb.ServerAddress,
@@ -56,8 +57,12 @@ func NewFiler(masters map[string]pb.ServerAddress, grpcDialOption grpc.DialOptio
GrpcDialOption: grpcDialOption,
FilerConf: NewFilerConf(),
RemoteStorage: NewFilerRemoteStorage(),
- UniqueFileId: uint32(util.RandomInt32()),
+ UniqueFilerId: util.RandomInt32(),
}
+ if f.UniqueFilerId < 0 {
+ f.UniqueFilerId = -f.UniqueFilerId
+ }
+
f.LocalMetaLogBuffer = log_buffer.NewLogBuffer("local", LogFlushInterval, f.logFlushFunc, notifyFn)
f.metaLogCollection = collection
f.metaLogReplication = replication
@@ -79,8 +84,9 @@ func (f *Filer) MaybeBootstrapFromPeers(self pb.ServerAddress, existingNodes []*
return
}
- glog.V(0).Infof("bootstrap from %v clientId:%d", earliestNode.Address, f.UniqueFileId)
- err = pb.FollowMetadata(pb.ServerAddress(earliestNode.Address), f.GrpcDialOption, "bootstrap", int32(f.UniqueFileId), "/", nil,
+ glog.V(0).Infof("bootstrap from %v clientId:%d", earliestNode.Address, f.UniqueFilerId)
+ f.UniqueFilerEpoch++
+ err = pb.FollowMetadata(pb.ServerAddress(earliestNode.Address), f.GrpcDialOption, "bootstrap", f.UniqueFilerId, f.UniqueFilerEpoch, "/", nil,
0, snapshotTime.UnixNano(), f.Signature, func(resp *filer_pb.SubscribeMetadataResponse) error {
return Replay(f.Store, resp)
}, pb.FatalOnError)
diff --git a/weed/filer/filer_notify.go b/weed/filer/filer_notify.go
index 4d26a695c..66c77631e 100644
--- a/weed/filer/filer_notify.go
+++ b/weed/filer/filer_notify.go
@@ -94,7 +94,7 @@ func (f *Filer) logFlushFunc(startTime, stopTime time.Time, buf []byte) {
startTime, stopTime = startTime.UTC(), stopTime.UTC()
targetFile := fmt.Sprintf("%s/%04d-%02d-%02d/%02d-%02d.%08x", SystemLogDir,
- startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(), f.UniqueFileId,
+ startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(), f.UniqueFilerId,
// startTime.Second(), startTime.Nanosecond(),
)
diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go
index 5799e247e..a6c32428d 100644
--- a/weed/filer/meta_aggregator.go
+++ b/weed/filer/meta_aggregator.go
@@ -190,15 +190,17 @@ func (ma *MetaAggregator) doSubscribeToOneFiler(f *Filer, self pb.ServerAddress,
return nil
}
- glog.V(0).Infof("subscribing remote %s meta change: %v, clientId:%d", peer, time.Unix(0, lastTsNs), ma.filer.UniqueFileId)
+ glog.V(0).Infof("subscribing remote %s meta change: %v, clientId:%d", peer, time.Unix(0, lastTsNs), ma.filer.UniqueFilerId)
err = pb.WithFilerClient(true, peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
+ ma.filer.UniqueFilerEpoch++
stream, err := client.SubscribeLocalMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
- ClientName: "filer:" + string(self),
- PathPrefix: "/",
- SinceNs: lastTsNs,
- ClientId: int32(ma.filer.UniqueFileId),
+ ClientName: "filer:" + string(self),
+ PathPrefix: "/",
+ SinceNs: lastTsNs,
+ ClientId: ma.filer.UniqueFilerId,
+ ClientEpoch: ma.filer.UniqueFilerEpoch,
})
if err != nil {
return fmt.Errorf("subscribe: %v", err)