aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/filer_grpc_server_sub_meta.go22
-rw-r--r--weed/server/filer_server.go12
-rw-r--r--weed/server/master_grpc_server_cluster.go7
-rw-r--r--weed/server/master_server.go2
4 files changed, 33 insertions, 10 deletions
diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go
index cc5b5cb6b..0540400a3 100644
--- a/weed/server/filer_grpc_server_sub_meta.go
+++ b/weed/server/filer_grpc_server_sub_meta.go
@@ -39,15 +39,19 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
var processedTsNs int64
var readPersistedLogErr error
var readInMemoryLogErr error
+ var isDone bool
for {
glog.V(4).Infof("read on disk %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
- processedTsNs, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn)
+ processedTsNs, isDone, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, req.UntilNs, eachLogEntryFn)
if readPersistedLogErr != nil {
return fmt.Errorf("reading from persisted logs: %v", readPersistedLogErr)
}
+ if isDone {
+ return nil
+ }
if processedTsNs != 0 {
lastReadTime = time.Unix(0, processedTsNs)
@@ -55,7 +59,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
glog.V(4).Infof("read in memory %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
- lastReadTime, readInMemoryLogErr = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData("aggMeta:"+clientName, lastReadTime, func() bool {
+ lastReadTime, isDone, readInMemoryLogErr = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData("aggMeta:"+clientName, lastReadTime, req.UntilNs, func() bool {
fs.filer.MetaAggregator.ListenersLock.Lock()
fs.filer.MetaAggregator.ListenersCond.Wait()
fs.filer.MetaAggregator.ListenersLock.Unlock()
@@ -70,6 +74,9 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
break
}
}
+ if isDone {
+ return nil
+ }
time.Sleep(1127 * time.Millisecond)
}
@@ -98,15 +105,19 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
var processedTsNs int64
var readPersistedLogErr error
var readInMemoryLogErr error
+ var isDone bool
for {
// println("reading from persisted logs ...")
glog.V(0).Infof("read on disk %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
- processedTsNs, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn)
+ processedTsNs, isDone, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, req.UntilNs, eachLogEntryFn)
if readPersistedLogErr != nil {
glog.V(0).Infof("read on disk %v local subscribe %s from %+v: %v", clientName, req.PathPrefix, lastReadTime, readPersistedLogErr)
return fmt.Errorf("reading from persisted logs: %v", readPersistedLogErr)
}
+ if isDone {
+ return nil
+ }
if processedTsNs != 0 {
lastReadTime = time.Unix(0, processedTsNs)
@@ -119,7 +130,7 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
glog.V(0).Infof("read in memory %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
- lastReadTime, readInMemoryLogErr = fs.filer.LocalMetaLogBuffer.LoopProcessLogData("localMeta:"+clientName, lastReadTime, func() bool {
+ lastReadTime, isDone, readInMemoryLogErr = fs.filer.LocalMetaLogBuffer.LoopProcessLogData("localMeta:"+clientName, lastReadTime, req.UntilNs, func() bool {
fs.listenersLock.Lock()
fs.listenersCond.Wait()
fs.listenersLock.Unlock()
@@ -134,6 +145,9 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
break
}
}
+ if isDone {
+ return nil
+ }
}
return readInMemoryLogErr
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index 82b15084d..d2c1d67f5 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -151,7 +151,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
// TODO deprecated, will be be removed after 2020-12-31
// replaced by https://github.com/chrislusf/seaweedfs/wiki/Path-Specific-Configuration
// fs.filer.FsyncBuckets = v.GetStringSlice("filer.options.buckets_fsync")
- fs.filer.LoadConfiguration(v)
+ isFresh := fs.filer.LoadConfiguration(v)
notification.LoadConfiguration(v, "notification.")
@@ -164,7 +164,15 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
readonlyMux.HandleFunc("/", fs.readonlyFilerHandler)
}
- fs.filer.AggregateFromPeers(option.Host)
+ existingNodes := fs.filer.ListExistingPeerUpdates()
+ startFromTime := time.Now().Add(-filer.LogFlushInterval)
+ if isFresh {
+ glog.V(0).Infof("%s bootstrap from peers %+v", option.Host, existingNodes)
+ if err := fs.filer.MaybeBootstrapFromPeers(option.Host, existingNodes, startFromTime); err == nil {
+ glog.Fatalf("%s bootstrap from %+v", option.Host, existingNodes)
+ }
+ }
+ fs.filer.AggregateFromPeers(option.Host, existingNodes, startFromTime)
fs.filer.LoadBuckets()
diff --git a/weed/server/master_grpc_server_cluster.go b/weed/server/master_grpc_server_cluster.go
index 220398c6a..fea4a66aa 100644
--- a/weed/server/master_grpc_server_cluster.go
+++ b/weed/server/master_grpc_server_cluster.go
@@ -15,9 +15,10 @@ func (ms *MasterServer) ListClusterNodes(ctx context.Context, req *master_pb.Lis
for _, node := range clusterNodes {
resp.ClusterNodes = append(resp.ClusterNodes, &master_pb.ListClusterNodesResponse_ClusterNode{
- Address: string(node.Address),
- Version: node.Version,
- IsLeader: ms.Cluster.IsOneLeader(filerGroup, node.Address),
+ Address: string(node.Address),
+ Version: node.Version,
+ IsLeader: ms.Cluster.IsOneLeader(filerGroup, node.Address),
+ CreatedAtNs: node.CreatedTs.UnixNano(),
})
}
return resp, nil
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index e02552730..9bf840f08 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -342,7 +342,7 @@ func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer
return seq
}
-func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate) {
+func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startFrom time.Time) {
if update.NodeType != cluster.MasterType || ms.Topo.HashicorpRaft == nil {
return
}