diff options
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/filer_grpc_server_sub_meta.go | 22 | ||||
| -rw-r--r-- | weed/server/filer_server.go | 12 | ||||
| -rw-r--r-- | weed/server/master_grpc_server_cluster.go | 7 | ||||
| -rw-r--r-- | weed/server/master_server.go | 2 |
4 files changed, 33 insertions, 10 deletions
diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go index cc5b5cb6b..0540400a3 100644 --- a/weed/server/filer_grpc_server_sub_meta.go +++ b/weed/server/filer_grpc_server_sub_meta.go @@ -39,15 +39,19 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, var processedTsNs int64 var readPersistedLogErr error var readInMemoryLogErr error + var isDone bool for { glog.V(4).Infof("read on disk %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) - processedTsNs, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn) + processedTsNs, isDone, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, req.UntilNs, eachLogEntryFn) if readPersistedLogErr != nil { return fmt.Errorf("reading from persisted logs: %v", readPersistedLogErr) } + if isDone { + return nil + } if processedTsNs != 0 { lastReadTime = time.Unix(0, processedTsNs) @@ -55,7 +59,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, glog.V(4).Infof("read in memory %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) - lastReadTime, readInMemoryLogErr = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData("aggMeta:"+clientName, lastReadTime, func() bool { + lastReadTime, isDone, readInMemoryLogErr = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData("aggMeta:"+clientName, lastReadTime, req.UntilNs, func() bool { fs.filer.MetaAggregator.ListenersLock.Lock() fs.filer.MetaAggregator.ListenersCond.Wait() fs.filer.MetaAggregator.ListenersLock.Unlock() @@ -70,6 +74,9 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, break } } + if isDone { + return nil + } time.Sleep(1127 * time.Millisecond) } @@ -98,15 +105,19 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq var processedTsNs int64 var readPersistedLogErr error var readInMemoryLogErr error + var isDone bool for { // println("reading from persisted logs ...") glog.V(0).Infof("read on disk %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) - processedTsNs, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn) + processedTsNs, isDone, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, req.UntilNs, eachLogEntryFn) if readPersistedLogErr != nil { glog.V(0).Infof("read on disk %v local subscribe %s from %+v: %v", clientName, req.PathPrefix, lastReadTime, readPersistedLogErr) return fmt.Errorf("reading from persisted logs: %v", readPersistedLogErr) } + if isDone { + return nil + } if processedTsNs != 0 { lastReadTime = time.Unix(0, processedTsNs) @@ -119,7 +130,7 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq glog.V(0).Infof("read in memory %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) - lastReadTime, readInMemoryLogErr = fs.filer.LocalMetaLogBuffer.LoopProcessLogData("localMeta:"+clientName, lastReadTime, func() bool { + lastReadTime, isDone, readInMemoryLogErr = fs.filer.LocalMetaLogBuffer.LoopProcessLogData("localMeta:"+clientName, lastReadTime, req.UntilNs, func() bool { fs.listenersLock.Lock() fs.listenersCond.Wait() fs.listenersLock.Unlock() @@ -134,6 +145,9 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq break } } + if isDone { + return nil + } } return readInMemoryLogErr diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 82b15084d..d2c1d67f5 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -151,7 +151,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) // TODO deprecated, will be be removed after 2020-12-31 // replaced by https://github.com/chrislusf/seaweedfs/wiki/Path-Specific-Configuration // fs.filer.FsyncBuckets = v.GetStringSlice("filer.options.buckets_fsync") - fs.filer.LoadConfiguration(v) + isFresh := fs.filer.LoadConfiguration(v) notification.LoadConfiguration(v, "notification.") @@ -164,7 +164,15 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) readonlyMux.HandleFunc("/", fs.readonlyFilerHandler) } - fs.filer.AggregateFromPeers(option.Host) + existingNodes := fs.filer.ListExistingPeerUpdates() + startFromTime := time.Now().Add(-filer.LogFlushInterval) + if isFresh { + glog.V(0).Infof("%s bootstrap from peers %+v", option.Host, existingNodes) + if err := fs.filer.MaybeBootstrapFromPeers(option.Host, existingNodes, startFromTime); err == nil { + glog.Fatalf("%s bootstrap from %+v", option.Host, existingNodes) + } + } + fs.filer.AggregateFromPeers(option.Host, existingNodes, startFromTime) fs.filer.LoadBuckets() diff --git a/weed/server/master_grpc_server_cluster.go b/weed/server/master_grpc_server_cluster.go index 220398c6a..fea4a66aa 100644 --- a/weed/server/master_grpc_server_cluster.go +++ b/weed/server/master_grpc_server_cluster.go @@ -15,9 +15,10 @@ func (ms *MasterServer) ListClusterNodes(ctx context.Context, req *master_pb.Lis for _, node := range clusterNodes { resp.ClusterNodes = append(resp.ClusterNodes, &master_pb.ListClusterNodesResponse_ClusterNode{ - Address: string(node.Address), - Version: node.Version, - IsLeader: ms.Cluster.IsOneLeader(filerGroup, node.Address), + Address: string(node.Address), + Version: node.Version, + IsLeader: ms.Cluster.IsOneLeader(filerGroup, node.Address), + CreatedAtNs: node.CreatedTs.UnixNano(), }) } return resp, nil diff --git a/weed/server/master_server.go b/weed/server/master_server.go index e02552730..9bf840f08 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -342,7 +342,7 @@ func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer return seq } -func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate) { +func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startFrom time.Time) { if update.NodeType != cluster.MasterType || ms.Topo.HashicorpRaft == nil { return } |
