diff options
Diffstat (limited to 'weed/filer/filer.go')
| -rw-r--r-- | weed/filer/filer.go | 55 |
1 files changed, 20 insertions, 35 deletions
diff --git a/weed/filer/filer.go b/weed/filer/filer.go index 15fe69116..993175112 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -3,9 +3,9 @@ package filer import ( "context" "fmt" - "github.com/chrislusf/seaweedfs/weed/cluster" - "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/seaweedfs/seaweedfs/weed/cluster" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "os" "sort" "strings" @@ -13,11 +13,11 @@ import ( "google.golang.org/grpc" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" - "github.com/chrislusf/seaweedfs/weed/util/log_buffer" - "github.com/chrislusf/seaweedfs/weed/wdclient" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" + "github.com/seaweedfs/seaweedfs/weed/wdclient" ) const ( @@ -45,19 +45,24 @@ type Filer struct { Signature int32 FilerConf *FilerConf RemoteStorage *FilerRemoteStorage - UniqueFileId uint32 + UniqueFilerId int32 + UniqueFilerEpoch int32 } func NewFiler(masters map[string]pb.ServerAddress, grpcDialOption grpc.DialOption, filerHost pb.ServerAddress, filerGroup string, collection string, replication string, dataCenter string, notifyFn func()) *Filer { f := &Filer{ - MasterClient: wdclient.NewMasterClient(grpcDialOption, filerGroup, cluster.FilerType, filerHost, dataCenter, masters), + MasterClient: wdclient.NewMasterClient(grpcDialOption, filerGroup, cluster.FilerType, filerHost, dataCenter, "", masters), fileIdDeletionQueue: util.NewUnboundedQueue(), GrpcDialOption: grpcDialOption, FilerConf: NewFilerConf(), RemoteStorage: NewFilerRemoteStorage(), - UniqueFileId: uint32(util.RandomInt32()), + UniqueFilerId: util.RandomInt32(), } + if f.UniqueFilerId < 0 { + f.UniqueFilerId = -f.UniqueFilerId + } + f.LocalMetaLogBuffer = log_buffer.NewLogBuffer("local", LogFlushInterval, f.logFlushFunc, notifyFn) f.metaLogCollection = collection f.metaLogReplication = replication @@ -79,8 +84,9 @@ func (f *Filer) MaybeBootstrapFromPeers(self pb.ServerAddress, existingNodes []* return } - glog.V(0).Infof("bootstrap from %v clientId:%d", earliestNode.Address, f.UniqueFileId) - err = pb.FollowMetadata(pb.ServerAddress(earliestNode.Address), f.GrpcDialOption, "bootstrap", int32(f.UniqueFileId), "/", nil, + glog.V(0).Infof("bootstrap from %v clientId:%d", earliestNode.Address, f.UniqueFilerId) + f.UniqueFilerEpoch++ + err = pb.FollowMetadata(pb.ServerAddress(earliestNode.Address), f.GrpcDialOption, "bootstrap", f.UniqueFilerId, f.UniqueFilerEpoch, "/", nil, 0, snapshotTime.UnixNano(), f.Signature, func(resp *filer_pb.SubscribeMetadataResponse) error { return Replay(f.Store, resp) }, pb.FatalOnError) @@ -99,28 +105,7 @@ func (f *Filer) AggregateFromPeers(self pb.ServerAddress, existingNodes []*maste } func (f *Filer) ListExistingPeerUpdates() (existingNodes []*master_pb.ClusterNodeUpdate) { - - if grpcErr := pb.WithMasterClient(false, f.MasterClient.GetMaster(), f.GrpcDialOption, func(client master_pb.SeaweedClient) error { - resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{ - ClientType: cluster.FilerType, - FilerGroup: f.MasterClient.FilerGroup, - }) - - glog.V(0).Infof("the cluster has %d filers\n", len(resp.ClusterNodes)) - for _, node := range resp.ClusterNodes { - existingNodes = append(existingNodes, &master_pb.ClusterNodeUpdate{ - NodeType: cluster.FilerType, - Address: node.Address, - IsLeader: node.IsLeader, - IsAdd: true, - CreatedAtNs: node.CreatedAtNs, - }) - } - return err - }); grpcErr != nil { - glog.V(0).Infof("connect to %s: %v", f.MasterClient.GetMaster(), grpcErr) - } - return + return cluster.ListExistingPeerUpdates(f.GetMaster(), f.GrpcDialOption, f.MasterClient.FilerGroup, cluster.FilerType) } func (f *Filer) SetStore(store FilerStore) (isFresh bool) { |
