aboutsummaryrefslogtreecommitdiff
path: root/weed/filer/filer.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer/filer.go')
-rw-r--r--weed/filer/filer.go55
1 files changed, 20 insertions, 35 deletions
diff --git a/weed/filer/filer.go b/weed/filer/filer.go
index 15fe69116..993175112 100644
--- a/weed/filer/filer.go
+++ b/weed/filer/filer.go
@@ -3,9 +3,9 @@ package filer
import (
"context"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/cluster"
- "github.com/chrislusf/seaweedfs/weed/pb"
- "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/seaweedfs/seaweedfs/weed/cluster"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"os"
"sort"
"strings"
@@ -13,11 +13,11 @@ import (
"google.golang.org/grpc"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
- "github.com/chrislusf/seaweedfs/weed/util/log_buffer"
- "github.com/chrislusf/seaweedfs/weed/wdclient"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+ "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
+ "github.com/seaweedfs/seaweedfs/weed/wdclient"
)
const (
@@ -45,19 +45,24 @@ type Filer struct {
Signature int32
FilerConf *FilerConf
RemoteStorage *FilerRemoteStorage
- UniqueFileId uint32
+ UniqueFilerId int32
+ UniqueFilerEpoch int32
}
func NewFiler(masters map[string]pb.ServerAddress, grpcDialOption grpc.DialOption, filerHost pb.ServerAddress,
filerGroup string, collection string, replication string, dataCenter string, notifyFn func()) *Filer {
f := &Filer{
- MasterClient: wdclient.NewMasterClient(grpcDialOption, filerGroup, cluster.FilerType, filerHost, dataCenter, masters),
+ MasterClient: wdclient.NewMasterClient(grpcDialOption, filerGroup, cluster.FilerType, filerHost, dataCenter, "", masters),
fileIdDeletionQueue: util.NewUnboundedQueue(),
GrpcDialOption: grpcDialOption,
FilerConf: NewFilerConf(),
RemoteStorage: NewFilerRemoteStorage(),
- UniqueFileId: uint32(util.RandomInt32()),
+ UniqueFilerId: util.RandomInt32(),
}
+ if f.UniqueFilerId < 0 {
+ f.UniqueFilerId = -f.UniqueFilerId
+ }
+
f.LocalMetaLogBuffer = log_buffer.NewLogBuffer("local", LogFlushInterval, f.logFlushFunc, notifyFn)
f.metaLogCollection = collection
f.metaLogReplication = replication
@@ -79,8 +84,9 @@ func (f *Filer) MaybeBootstrapFromPeers(self pb.ServerAddress, existingNodes []*
return
}
- glog.V(0).Infof("bootstrap from %v clientId:%d", earliestNode.Address, f.UniqueFileId)
- err = pb.FollowMetadata(pb.ServerAddress(earliestNode.Address), f.GrpcDialOption, "bootstrap", int32(f.UniqueFileId), "/", nil,
+ glog.V(0).Infof("bootstrap from %v clientId:%d", earliestNode.Address, f.UniqueFilerId)
+ f.UniqueFilerEpoch++
+ err = pb.FollowMetadata(pb.ServerAddress(earliestNode.Address), f.GrpcDialOption, "bootstrap", f.UniqueFilerId, f.UniqueFilerEpoch, "/", nil,
0, snapshotTime.UnixNano(), f.Signature, func(resp *filer_pb.SubscribeMetadataResponse) error {
return Replay(f.Store, resp)
}, pb.FatalOnError)
@@ -99,28 +105,7 @@ func (f *Filer) AggregateFromPeers(self pb.ServerAddress, existingNodes []*maste
}
func (f *Filer) ListExistingPeerUpdates() (existingNodes []*master_pb.ClusterNodeUpdate) {
-
- if grpcErr := pb.WithMasterClient(false, f.MasterClient.GetMaster(), f.GrpcDialOption, func(client master_pb.SeaweedClient) error {
- resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
- ClientType: cluster.FilerType,
- FilerGroup: f.MasterClient.FilerGroup,
- })
-
- glog.V(0).Infof("the cluster has %d filers\n", len(resp.ClusterNodes))
- for _, node := range resp.ClusterNodes {
- existingNodes = append(existingNodes, &master_pb.ClusterNodeUpdate{
- NodeType: cluster.FilerType,
- Address: node.Address,
- IsLeader: node.IsLeader,
- IsAdd: true,
- CreatedAtNs: node.CreatedAtNs,
- })
- }
- return err
- }); grpcErr != nil {
- glog.V(0).Infof("connect to %s: %v", f.MasterClient.GetMaster(), grpcErr)
- }
- return
+ return cluster.ListExistingPeerUpdates(f.GetMaster(), f.GrpcDialOption, f.MasterClient.FilerGroup, cluster.FilerType)
}
func (f *Filer) SetStore(store FilerStore) (isFresh bool) {