diff options
Diffstat (limited to 'weed/filer/filer.go')
| -rw-r--r-- | weed/filer/filer.go | 134 |
1 files changed, 90 insertions, 44 deletions
diff --git a/weed/filer/filer.go b/weed/filer/filer.go index 1a20abefc..86827c50e 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -3,7 +3,11 @@ package filer import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/cluster" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "os" + "sort" "strings" "time" @@ -33,8 +37,6 @@ type Filer struct { fileIdDeletionQueue *util.UnboundedQueue GrpcDialOption grpc.DialOption DirBucketsPath string - FsyncBuckets []string - buckets *FilerBuckets Cipher bool LocalMetaLogBuffer *log_buffer.LogBuffer metaLogCollection string @@ -43,16 +45,18 @@ type Filer struct { Signature int32 FilerConf *FilerConf RemoteStorage *FilerRemoteStorage + UniqueFileId uint32 } -func NewFiler(masters []string, grpcDialOption grpc.DialOption, - filerHost string, filerGrpcPort uint32, collection string, replication string, dataCenter string, notifyFn func()) *Filer { +func NewFiler(masters map[string]pb.ServerAddress, grpcDialOption grpc.DialOption, filerHost pb.ServerAddress, + filerGroup string, collection string, replication string, dataCenter string, notifyFn func()) *Filer { f := &Filer{ - MasterClient: wdclient.NewMasterClient(grpcDialOption, "filer", filerHost, filerGrpcPort, dataCenter, masters), + MasterClient: wdclient.NewMasterClient(grpcDialOption, filerGroup, cluster.FilerType, filerHost, dataCenter, masters), fileIdDeletionQueue: util.NewUnboundedQueue(), GrpcDialOption: grpcDialOption, FilerConf: NewFilerConf(), RemoteStorage: NewFilerRemoteStorage(), + UniqueFileId: uint32(util.RandomInt32()), } f.LocalMetaLogBuffer = log_buffer.NewLogBuffer("local", LogFlushInterval, f.logFlushFunc, notifyFn) f.metaLogCollection = collection @@ -63,32 +67,69 @@ func NewFiler(masters []string, grpcDialOption grpc.DialOption, return f } -func (f *Filer) AggregateFromPeers(self string, filers []string) { - - // set peers - found := false - for _, peer := range filers { - if peer == self { - found = true - } +func (f *Filer) MaybeBootstrapFromPeers(self pb.ServerAddress, existingNodes []*master_pb.ClusterNodeUpdate, snapshotTime time.Time) (err error) { + if len(existingNodes) == 0 { + return } - if !found { - filers = append(filers, self) + sort.Slice(existingNodes, func(i, j int) bool { + return existingNodes[i].CreatedAtNs < existingNodes[j].CreatedAtNs + }) + earliestNode := existingNodes[0] + if earliestNode.Address == string(self) { + return } - f.MetaAggregator = NewMetaAggregator(filers, f.GrpcDialOption) - f.MetaAggregator.StartLoopSubscribe(f, self) + glog.V(0).Infof("bootstrap from %v", earliestNode.Address) + err = pb.FollowMetadata(pb.ServerAddress(earliestNode.Address), f.GrpcDialOption, "bootstrap", int32(f.UniqueFileId), "/", nil, + 0, snapshotTime.UnixNano(), f.Signature, func(resp *filer_pb.SubscribeMetadataResponse) error { + return Replay(f.Store, resp) + }, pb.FatalOnError) + return +} + +func (f *Filer) AggregateFromPeers(self pb.ServerAddress, existingNodes []*master_pb.ClusterNodeUpdate, startFrom time.Time) { + + f.MetaAggregator = NewMetaAggregator(f, self, f.GrpcDialOption) + f.MasterClient.OnPeerUpdate = f.MetaAggregator.OnPeerUpdate + + for _, peerUpdate := range existingNodes { + f.MetaAggregator.OnPeerUpdate(peerUpdate, startFrom) + } } -func (f *Filer) SetStore(store FilerStore) { - f.Store = NewFilerStoreWrapper(store) +func (f *Filer) ListExistingPeerUpdates() (existingNodes []*master_pb.ClusterNodeUpdate) { + + if grpcErr := pb.WithMasterClient(false, f.MasterClient.GetMaster(), f.GrpcDialOption, func(client master_pb.SeaweedClient) error { + resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{ + ClientType: cluster.FilerType, + FilerGroup: f.MasterClient.FilerGroup, + }) + + glog.V(0).Infof("the cluster has %d filers\n", len(resp.ClusterNodes)) + for _, node := range resp.ClusterNodes { + existingNodes = append(existingNodes, &master_pb.ClusterNodeUpdate{ + NodeType: cluster.FilerType, + Address: node.Address, + IsLeader: node.IsLeader, + IsAdd: true, + CreatedAtNs: node.CreatedAtNs, + }) + } + return err + }); grpcErr != nil { + glog.V(0).Infof("connect to %s: %v", f.MasterClient.GetMaster(), grpcErr) + } + return +} - f.setOrLoadFilerStoreSignature(store) +func (f *Filer) SetStore(store FilerStore) (isFresh bool) { + f.Store = NewFilerStoreWrapper(store) + return f.setOrLoadFilerStoreSignature(store) } -func (f *Filer) setOrLoadFilerStoreSignature(store FilerStore) { +func (f *Filer) setOrLoadFilerStoreSignature(store FilerStore) (isFresh bool) { storeIdBytes, err := store.KvGet(context.Background(), []byte(FilerStoreId)) if err == ErrKvNotFound || err == nil && len(storeIdBytes) == 0 { f.Signature = util.RandomInt32() @@ -98,23 +139,25 @@ func (f *Filer) setOrLoadFilerStoreSignature(store FilerStore) { glog.Fatalf("set %s=%d : %v", FilerStoreId, f.Signature, err) } glog.V(0).Infof("create %s to %d", FilerStoreId, f.Signature) + return true } else if err == nil && len(storeIdBytes) == 4 { f.Signature = int32(util.BytesToUint32(storeIdBytes)) glog.V(0).Infof("existing %s = %d", FilerStoreId, f.Signature) } else { glog.Fatalf("read %v=%v : %v", FilerStoreId, string(storeIdBytes), err) } + return false } func (f *Filer) GetStore() (store FilerStore) { return f.Store } -func (fs *Filer) GetMaster() string { +func (fs *Filer) GetMaster() pb.ServerAddress { return fs.MasterClient.GetMaster() } -func (fs *Filer) KeepConnectedToMaster() { +func (fs *Filer) KeepMasterClientConnected() { fs.MasterClient.KeepConnectedToMaster() } @@ -130,7 +173,7 @@ func (f *Filer) RollbackTransaction(ctx context.Context) error { return f.Store.RollbackTransaction(ctx) } -func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFromOtherCluster bool, signatures []int32) error { +func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFromOtherCluster bool, signatures []int32, skipCreateParentDir bool) error { if string(entry.FullPath) == "/" { return nil @@ -148,9 +191,11 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr if oldEntry == nil { - dirParts := strings.Split(string(entry.FullPath), "/") - if err := f.ensureParentDirecotryEntry(ctx, entry, dirParts, len(dirParts)-1, isFromOtherCluster); err != nil { - return err + if !skipCreateParentDir { + dirParts := strings.Split(string(entry.FullPath), "/") + if err := f.ensureParentDirecotryEntry(ctx, entry, dirParts, len(dirParts)-1, isFromOtherCluster); err != nil { + return err + } } glog.V(4).Infof("InsertEntry %s: new entry: %v", entry.FullPath, entry.Name()) @@ -170,7 +215,6 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr } } - f.maybeAddBucket(entry) f.NotifyUpdateEvent(ctx, oldEntry, entry, true, isFromOtherCluster, signatures) f.deleteChunksIfNotNew(oldEntry, entry) @@ -207,15 +251,13 @@ func (f *Filer) ensureParentDirecotryEntry(ctx context.Context, entry *Entry, di dirEntry = &Entry{ FullPath: util.FullPath(dirPath), Attr: Attr{ - Mtime: now, - Crtime: now, - Mode: os.ModeDir | entry.Mode | 0111, - Uid: entry.Uid, - Gid: entry.Gid, - Collection: entry.Collection, - Replication: entry.Replication, - UserName: entry.UserName, - GroupNames: entry.GroupNames, + Mtime: now, + Crtime: now, + Mode: os.ModeDir | entry.Mode | 0111, + Uid: entry.Uid, + Gid: entry.Gid, + UserName: entry.UserName, + GroupNames: entry.GroupNames, }, } @@ -227,7 +269,6 @@ func (f *Filer) ensureParentDirecotryEntry(ctx context.Context, entry *Entry, di return fmt.Errorf("mkdir %s: %v", dirPath, mkdirErr) } } else { - f.maybeAddBucket(dirEntry) f.NotifyUpdateEvent(ctx, nil, dirEntry, false, isFromOtherCluster, nil) } @@ -285,14 +326,19 @@ func (f *Filer) FindEntry(ctx context.Context, p util.FullPath) (entry *Entry, e func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (expiredCount int64, lastFileName string, err error) { lastFileName, err = f.Store.ListDirectoryPrefixedEntries(ctx, p, startFileName, inclusive, limit, prefix, func(entry *Entry) bool { - if entry.TtlSec > 0 { - if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) { - f.Store.DeleteOneEntry(ctx, entry) - expiredCount++ - return true + select { + case <-ctx.Done(): + return false + default: + if entry.TtlSec > 0 { + if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) { + f.Store.DeleteOneEntry(ctx, entry) + expiredCount++ + return true + } } + return eachEntryFunc(entry) } - return eachEntryFunc(entry) }) if err != nil { return expiredCount, lastFileName, err |
