aboutsummaryrefslogtreecommitdiff
path: root/weed/filer/filer.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer/filer.go')
-rw-r--r--weed/filer/filer.go134
1 files changed, 90 insertions, 44 deletions
diff --git a/weed/filer/filer.go b/weed/filer/filer.go
index 1a20abefc..86827c50e 100644
--- a/weed/filer/filer.go
+++ b/weed/filer/filer.go
@@ -3,7 +3,11 @@ package filer
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/cluster"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"os"
+ "sort"
"strings"
"time"
@@ -33,8 +37,6 @@ type Filer struct {
fileIdDeletionQueue *util.UnboundedQueue
GrpcDialOption grpc.DialOption
DirBucketsPath string
- FsyncBuckets []string
- buckets *FilerBuckets
Cipher bool
LocalMetaLogBuffer *log_buffer.LogBuffer
metaLogCollection string
@@ -43,16 +45,18 @@ type Filer struct {
Signature int32
FilerConf *FilerConf
RemoteStorage *FilerRemoteStorage
+ UniqueFileId uint32
}
-func NewFiler(masters []string, grpcDialOption grpc.DialOption,
- filerHost string, filerGrpcPort uint32, collection string, replication string, dataCenter string, notifyFn func()) *Filer {
+func NewFiler(masters map[string]pb.ServerAddress, grpcDialOption grpc.DialOption, filerHost pb.ServerAddress,
+ filerGroup string, collection string, replication string, dataCenter string, notifyFn func()) *Filer {
f := &Filer{
- MasterClient: wdclient.NewMasterClient(grpcDialOption, "filer", filerHost, filerGrpcPort, dataCenter, masters),
+ MasterClient: wdclient.NewMasterClient(grpcDialOption, filerGroup, cluster.FilerType, filerHost, dataCenter, masters),
fileIdDeletionQueue: util.NewUnboundedQueue(),
GrpcDialOption: grpcDialOption,
FilerConf: NewFilerConf(),
RemoteStorage: NewFilerRemoteStorage(),
+ UniqueFileId: uint32(util.RandomInt32()),
}
f.LocalMetaLogBuffer = log_buffer.NewLogBuffer("local", LogFlushInterval, f.logFlushFunc, notifyFn)
f.metaLogCollection = collection
@@ -63,32 +67,69 @@ func NewFiler(masters []string, grpcDialOption grpc.DialOption,
return f
}
-func (f *Filer) AggregateFromPeers(self string, filers []string) {
-
- // set peers
- found := false
- for _, peer := range filers {
- if peer == self {
- found = true
- }
+func (f *Filer) MaybeBootstrapFromPeers(self pb.ServerAddress, existingNodes []*master_pb.ClusterNodeUpdate, snapshotTime time.Time) (err error) {
+ if len(existingNodes) == 0 {
+ return
}
- if !found {
- filers = append(filers, self)
+ sort.Slice(existingNodes, func(i, j int) bool {
+ return existingNodes[i].CreatedAtNs < existingNodes[j].CreatedAtNs
+ })
+ earliestNode := existingNodes[0]
+ if earliestNode.Address == string(self) {
+ return
}
- f.MetaAggregator = NewMetaAggregator(filers, f.GrpcDialOption)
- f.MetaAggregator.StartLoopSubscribe(f, self)
+ glog.V(0).Infof("bootstrap from %v", earliestNode.Address)
+ err = pb.FollowMetadata(pb.ServerAddress(earliestNode.Address), f.GrpcDialOption, "bootstrap", int32(f.UniqueFileId), "/", nil,
+ 0, snapshotTime.UnixNano(), f.Signature, func(resp *filer_pb.SubscribeMetadataResponse) error {
+ return Replay(f.Store, resp)
+ }, pb.FatalOnError)
+ return
+}
+
+func (f *Filer) AggregateFromPeers(self pb.ServerAddress, existingNodes []*master_pb.ClusterNodeUpdate, startFrom time.Time) {
+
+ f.MetaAggregator = NewMetaAggregator(f, self, f.GrpcDialOption)
+ f.MasterClient.OnPeerUpdate = f.MetaAggregator.OnPeerUpdate
+
+ for _, peerUpdate := range existingNodes {
+ f.MetaAggregator.OnPeerUpdate(peerUpdate, startFrom)
+ }
}
-func (f *Filer) SetStore(store FilerStore) {
- f.Store = NewFilerStoreWrapper(store)
+func (f *Filer) ListExistingPeerUpdates() (existingNodes []*master_pb.ClusterNodeUpdate) {
+
+ if grpcErr := pb.WithMasterClient(false, f.MasterClient.GetMaster(), f.GrpcDialOption, func(client master_pb.SeaweedClient) error {
+ resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
+ ClientType: cluster.FilerType,
+ FilerGroup: f.MasterClient.FilerGroup,
+ })
+
+ glog.V(0).Infof("the cluster has %d filers\n", len(resp.ClusterNodes))
+ for _, node := range resp.ClusterNodes {
+ existingNodes = append(existingNodes, &master_pb.ClusterNodeUpdate{
+ NodeType: cluster.FilerType,
+ Address: node.Address,
+ IsLeader: node.IsLeader,
+ IsAdd: true,
+ CreatedAtNs: node.CreatedAtNs,
+ })
+ }
+ return err
+ }); grpcErr != nil {
+ glog.V(0).Infof("connect to %s: %v", f.MasterClient.GetMaster(), grpcErr)
+ }
+ return
+}
- f.setOrLoadFilerStoreSignature(store)
+func (f *Filer) SetStore(store FilerStore) (isFresh bool) {
+ f.Store = NewFilerStoreWrapper(store)
+ return f.setOrLoadFilerStoreSignature(store)
}
-func (f *Filer) setOrLoadFilerStoreSignature(store FilerStore) {
+func (f *Filer) setOrLoadFilerStoreSignature(store FilerStore) (isFresh bool) {
storeIdBytes, err := store.KvGet(context.Background(), []byte(FilerStoreId))
if err == ErrKvNotFound || err == nil && len(storeIdBytes) == 0 {
f.Signature = util.RandomInt32()
@@ -98,23 +139,25 @@ func (f *Filer) setOrLoadFilerStoreSignature(store FilerStore) {
glog.Fatalf("set %s=%d : %v", FilerStoreId, f.Signature, err)
}
glog.V(0).Infof("create %s to %d", FilerStoreId, f.Signature)
+ return true
} else if err == nil && len(storeIdBytes) == 4 {
f.Signature = int32(util.BytesToUint32(storeIdBytes))
glog.V(0).Infof("existing %s = %d", FilerStoreId, f.Signature)
} else {
glog.Fatalf("read %v=%v : %v", FilerStoreId, string(storeIdBytes), err)
}
+ return false
}
func (f *Filer) GetStore() (store FilerStore) {
return f.Store
}
-func (fs *Filer) GetMaster() string {
+func (fs *Filer) GetMaster() pb.ServerAddress {
return fs.MasterClient.GetMaster()
}
-func (fs *Filer) KeepConnectedToMaster() {
+func (fs *Filer) KeepMasterClientConnected() {
fs.MasterClient.KeepConnectedToMaster()
}
@@ -130,7 +173,7 @@ func (f *Filer) RollbackTransaction(ctx context.Context) error {
return f.Store.RollbackTransaction(ctx)
}
-func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFromOtherCluster bool, signatures []int32) error {
+func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFromOtherCluster bool, signatures []int32, skipCreateParentDir bool) error {
if string(entry.FullPath) == "/" {
return nil
@@ -148,9 +191,11 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr
if oldEntry == nil {
- dirParts := strings.Split(string(entry.FullPath), "/")
- if err := f.ensureParentDirecotryEntry(ctx, entry, dirParts, len(dirParts)-1, isFromOtherCluster); err != nil {
- return err
+ if !skipCreateParentDir {
+ dirParts := strings.Split(string(entry.FullPath), "/")
+ if err := f.ensureParentDirecotryEntry(ctx, entry, dirParts, len(dirParts)-1, isFromOtherCluster); err != nil {
+ return err
+ }
}
glog.V(4).Infof("InsertEntry %s: new entry: %v", entry.FullPath, entry.Name())
@@ -170,7 +215,6 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr
}
}
- f.maybeAddBucket(entry)
f.NotifyUpdateEvent(ctx, oldEntry, entry, true, isFromOtherCluster, signatures)
f.deleteChunksIfNotNew(oldEntry, entry)
@@ -207,15 +251,13 @@ func (f *Filer) ensureParentDirecotryEntry(ctx context.Context, entry *Entry, di
dirEntry = &Entry{
FullPath: util.FullPath(dirPath),
Attr: Attr{
- Mtime: now,
- Crtime: now,
- Mode: os.ModeDir | entry.Mode | 0111,
- Uid: entry.Uid,
- Gid: entry.Gid,
- Collection: entry.Collection,
- Replication: entry.Replication,
- UserName: entry.UserName,
- GroupNames: entry.GroupNames,
+ Mtime: now,
+ Crtime: now,
+ Mode: os.ModeDir | entry.Mode | 0111,
+ Uid: entry.Uid,
+ Gid: entry.Gid,
+ UserName: entry.UserName,
+ GroupNames: entry.GroupNames,
},
}
@@ -227,7 +269,6 @@ func (f *Filer) ensureParentDirecotryEntry(ctx context.Context, entry *Entry, di
return fmt.Errorf("mkdir %s: %v", dirPath, mkdirErr)
}
} else {
- f.maybeAddBucket(dirEntry)
f.NotifyUpdateEvent(ctx, nil, dirEntry, false, isFromOtherCluster, nil)
}
@@ -285,14 +326,19 @@ func (f *Filer) FindEntry(ctx context.Context, p util.FullPath) (entry *Entry, e
func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, startFileName string, inclusive bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (expiredCount int64, lastFileName string, err error) {
lastFileName, err = f.Store.ListDirectoryPrefixedEntries(ctx, p, startFileName, inclusive, limit, prefix, func(entry *Entry) bool {
- if entry.TtlSec > 0 {
- if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
- f.Store.DeleteOneEntry(ctx, entry)
- expiredCount++
- return true
+ select {
+ case <-ctx.Done():
+ return false
+ default:
+ if entry.TtlSec > 0 {
+ if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
+ f.Store.DeleteOneEntry(ctx, entry)
+ expiredCount++
+ return true
+ }
}
+ return eachEntryFunc(entry)
}
- return eachEntryFunc(entry)
})
if err != nil {
return expiredCount, lastFileName, err