diff options
Diffstat (limited to 'weed/command/filer_remote_gateway_buckets.go')
| -rw-r--r-- | weed/command/filer_remote_gateway_buckets.go | 387 |
1 files changed, 387 insertions, 0 deletions
diff --git a/weed/command/filer_remote_gateway_buckets.go b/weed/command/filer_remote_gateway_buckets.go new file mode 100644 index 000000000..e16e4f731 --- /dev/null +++ b/weed/command/filer_remote_gateway_buckets.go @@ -0,0 +1,387 @@ +package command + +import ( + "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/remote_pb" + "github.com/chrislusf/seaweedfs/weed/remote_storage" + "github.com/chrislusf/seaweedfs/weed/replication/source" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/golang/protobuf/proto" + "math" + "math/rand" + "strings" + "time" +) + +func (option *RemoteGatewayOptions) followBucketUpdatesAndUploadToRemote(filerSource *source.FilerSource) error { + + // read filer remote storage mount mappings + if detectErr := option.collectRemoteStorageConf(); detectErr != nil { + return fmt.Errorf("read mount info: %v", detectErr) + } + + eachEntryFunc, err := option.makeBucketedEventProcessor(filerSource) + if err != nil { + return err + } + + processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error { + lastTime := time.Unix(0, lastTsNs) + glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, lastTime, float64(counter)/float64(3)) + return remote_storage.SetSyncOffset(option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, lastTsNs) + }) + + lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, *option.timeAgo) + + return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, "filer.remote.sync", + option.bucketsDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false) +} + +func (option *RemoteGatewayOptions) makeBucketedEventProcessor(filerSource *source.FilerSource) (pb.ProcessMetadataFunc, error) { + + handleCreateBucket := func(entry *filer_pb.Entry) error { + if !entry.IsDirectory { + return nil + } + if entry.RemoteEntry != nil { + // this directory is imported from "remote.mount.buckets" or "remote.mount" + return nil + } + if option.mappings.PrimaryBucketStorageName != "" && *option.createBucketAt == "" { + *option.createBucketAt = option.mappings.PrimaryBucketStorageName + glog.V(0).Infof("%s is set as the primary remote storage", *option.createBucketAt) + } + if len(option.mappings.Mappings) == 1 && *option.createBucketAt == "" { + for k := range option.mappings.Mappings { + *option.createBucketAt = k + glog.V(0).Infof("%s is set as the only remote storage", *option.createBucketAt) + } + } + if *option.createBucketAt == "" { + return nil + } + remoteConf, found := option.remoteConfs[*option.createBucketAt] + if !found { + return fmt.Errorf("un-configured remote storage %s", *option.createBucketAt) + } + + client, err := remote_storage.GetRemoteStorage(remoteConf) + if err != nil { + return err + } + + bucketName := strings.ToLower(entry.Name) + if *option.createBucketRandomSuffix { + // https://docs.aws.amazon.com/AmazonS3/latest/userguide/bucketnamingrules.html + if len(bucketName)+5 > 63 { + bucketName = bucketName[:58] + } + bucketName = fmt.Sprintf("%s-%4d", bucketName, rand.Uint32()%10000) + } + + glog.V(0).Infof("create bucket %s", bucketName) + if err := client.CreateBucket(bucketName); err != nil { + return fmt.Errorf("create bucket %s in %s: %v", bucketName, remoteConf.Name, err) + } + + bucketPath := util.FullPath(option.bucketsDir).Child(entry.Name) + remoteLocation := &remote_pb.RemoteStorageLocation{ + Name: *option.createBucketAt, + Bucket: bucketName, + Path: "/", + } + + // need to add new mapping here before getting upates from metadata tailing + option.mappings.Mappings[string(bucketPath)] = remoteLocation + + return filer.InsertMountMapping(option, string(bucketPath), remoteLocation) + + } + handleDeleteBucket := func(entry *filer_pb.Entry) error { + if !entry.IsDirectory { + return nil + } + + client, remoteStorageMountLocation, err := option.findRemoteStorageClient(entry.Name) + if err != nil { + return fmt.Errorf("findRemoteStorageClient %s: %v", entry.Name, err) + } + + glog.V(0).Infof("delete remote bucket %s", remoteStorageMountLocation.Bucket) + if err := client.DeleteBucket(remoteStorageMountLocation.Bucket); err != nil { + return fmt.Errorf("delete remote bucket %s: %v", remoteStorageMountLocation.Bucket, err) + } + + bucketPath := util.FullPath(option.bucketsDir).Child(entry.Name) + + return filer.DeleteMountMapping(option, string(bucketPath)) + } + + handleEtcRemoteChanges := func(resp *filer_pb.SubscribeMetadataResponse) error { + message := resp.EventNotification + if message.NewEntry != nil { + // update + if message.NewEntry.Name == filer.REMOTE_STORAGE_MOUNT_FILE { + newMappings, readErr := filer.UnmarshalRemoteStorageMappings(message.NewEntry.Content) + if readErr != nil { + return fmt.Errorf("unmarshal mappings: %v", readErr) + } + option.mappings = newMappings + } + if strings.HasSuffix(message.NewEntry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) { + conf := &remote_pb.RemoteConf{} + if err := proto.Unmarshal(message.NewEntry.Content, conf); err != nil { + return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.NewEntry.Name, err) + } + option.remoteConfs[conf.Name] = conf + } + } else if message.OldEntry != nil { + // deletion + if strings.HasSuffix(message.OldEntry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) { + conf := &remote_pb.RemoteConf{} + if err := proto.Unmarshal(message.OldEntry.Content, conf); err != nil { + return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.OldEntry.Name, err) + } + delete(option.remoteConfs, conf.Name) + } + } + + return nil + } + + eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error { + message := resp.EventNotification + if strings.HasPrefix(resp.Directory, filer.DirectoryEtcRemote) { + return handleEtcRemoteChanges(resp) + } + + if message.OldEntry == nil && message.NewEntry == nil { + return nil + } + if message.OldEntry == nil && message.NewEntry != nil { + if message.NewParentPath == option.bucketsDir { + return handleCreateBucket(message.NewEntry) + } + if !filer.HasData(message.NewEntry) { + return nil + } + bucket, remoteStorageMountLocation, remoteStorage, ok := option.detectBucketInfo(message.NewParentPath) + if !ok { + return nil + } + client, err := remote_storage.GetRemoteStorage(remoteStorage) + if err != nil { + return err + } + glog.V(2).Infof("create: %+v", resp) + if !shouldSendToRemote(message.NewEntry) { + glog.V(2).Infof("skipping creating: %+v", resp) + return nil + } + dest := toRemoteStorageLocation(bucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation) + if message.NewEntry.IsDirectory { + glog.V(0).Infof("mkdir %s", remote_storage.FormatLocation(dest)) + return client.WriteDirectory(dest, message.NewEntry) + } + glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest)) + reader := filer.NewFileReader(filerSource, message.NewEntry) + remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader) + if writeErr != nil { + return writeErr + } + return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry) + } + if message.OldEntry != nil && message.NewEntry == nil { + if resp.Directory == option.bucketsDir { + return handleDeleteBucket(message.OldEntry) + } + bucket, remoteStorageMountLocation, remoteStorage, ok := option.detectBucketInfo(resp.Directory) + if !ok { + return nil + } + client, err := remote_storage.GetRemoteStorage(remoteStorage) + if err != nil { + return err + } + glog.V(2).Infof("delete: %+v", resp) + dest := toRemoteStorageLocation(bucket, util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation) + if message.OldEntry.IsDirectory { + glog.V(0).Infof("rmdir %s", remote_storage.FormatLocation(dest)) + return client.RemoveDirectory(dest) + } + glog.V(0).Infof("delete %s", remote_storage.FormatLocation(dest)) + return client.DeleteFile(dest) + } + if message.OldEntry != nil && message.NewEntry != nil { + if resp.Directory == option.bucketsDir { + if message.NewParentPath == option.bucketsDir { + if message.OldEntry.Name == message.NewEntry.Name { + return nil + } + if err := handleCreateBucket(message.NewEntry); err != nil { + return err + } + if err := handleDeleteBucket(message.OldEntry); err != nil { + return err + } + } + } + oldBucket, oldRemoteStorageMountLocation, oldRemoteStorage, oldOk := option.detectBucketInfo(resp.Directory) + newBucket, newRemoteStorageMountLocation, newRemoteStorage, newOk := option.detectBucketInfo(message.NewParentPath) + if oldOk && newOk { + if !shouldSendToRemote(message.NewEntry) { + glog.V(2).Infof("skipping updating: %+v", resp) + return nil + } + client, err := remote_storage.GetRemoteStorage(oldRemoteStorage) + if err != nil { + return err + } + if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name { + // update the same entry + if message.NewEntry.IsDirectory { + // update directory property + return nil + } + if filer.IsSameData(message.OldEntry, message.NewEntry) { + glog.V(2).Infof("update meta: %+v", resp) + oldDest := toRemoteStorageLocation(oldBucket, util.NewFullPath(resp.Directory, message.OldEntry.Name), oldRemoteStorageMountLocation) + return client.UpdateFileMetadata(oldDest, message.OldEntry, message.NewEntry) + } else { + newDest := toRemoteStorageLocation(newBucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), newRemoteStorageMountLocation) + reader := filer.NewFileReader(filerSource, message.NewEntry) + glog.V(0).Infof("create %s", remote_storage.FormatLocation(newDest)) + remoteEntry, writeErr := client.WriteFile(newDest, message.NewEntry, reader) + if writeErr != nil { + return writeErr + } + return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry) + } + } + } + + // the following is entry rename + if oldOk { + client, err := remote_storage.GetRemoteStorage(oldRemoteStorage) + if err != nil { + return err + } + oldDest := toRemoteStorageLocation(oldBucket, util.NewFullPath(resp.Directory, message.OldEntry.Name), oldRemoteStorageMountLocation) + if message.OldEntry.IsDirectory { + return client.RemoveDirectory(oldDest) + } + glog.V(0).Infof("delete %s", remote_storage.FormatLocation(oldDest)) + if err := client.DeleteFile(oldDest); err != nil { + return err + } + } + if newOk { + if !shouldSendToRemote(message.NewEntry) { + glog.V(2).Infof("skipping updating: %+v", resp) + return nil + } + client, err := remote_storage.GetRemoteStorage(newRemoteStorage) + if err != nil { + return err + } + newDest := toRemoteStorageLocation(newBucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), newRemoteStorageMountLocation) + if message.NewEntry.IsDirectory { + return client.WriteDirectory(newDest, message.NewEntry) + } + reader := filer.NewFileReader(filerSource, message.NewEntry) + glog.V(0).Infof("create %s", remote_storage.FormatLocation(newDest)) + remoteEntry, writeErr := client.WriteFile(newDest, message.NewEntry, reader) + if writeErr != nil { + return writeErr + } + return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry) + } + } + + return nil + } + return eachEntryFunc, nil +} + +func (option *RemoteGatewayOptions) findRemoteStorageClient(bucketName string) (client remote_storage.RemoteStorageClient, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, err error) { + bucket := util.FullPath(option.bucketsDir).Child(bucketName) + + var isMounted bool + remoteStorageMountLocation, isMounted = option.mappings.Mappings[string(bucket)] + if !isMounted { + return nil, remoteStorageMountLocation, fmt.Errorf("%s is not mounted", bucket) + } + remoteConf, hasClient := option.remoteConfs[remoteStorageMountLocation.Name] + if !hasClient { + return nil, remoteStorageMountLocation, fmt.Errorf("%s mounted to un-configured %+v", bucket, remoteStorageMountLocation) + } + + client, err = remote_storage.GetRemoteStorage(remoteConf) + if err != nil { + return nil, remoteStorageMountLocation, err + } + return client, remoteStorageMountLocation, nil +} + +func (option *RemoteGatewayOptions) detectBucketInfo(actualDir string) (bucket util.FullPath, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, remoteConf *remote_pb.RemoteConf, ok bool) { + bucket, ok = extractBucketPath(option.bucketsDir, actualDir) + if !ok { + return "", nil, nil, false + } + var isMounted bool + remoteStorageMountLocation, isMounted = option.mappings.Mappings[string(bucket)] + if !isMounted { + glog.Warningf("%s is not mounted", bucket) + return "", nil, nil, false + } + var hasClient bool + remoteConf, hasClient = option.remoteConfs[remoteStorageMountLocation.Name] + if !hasClient { + glog.Warningf("%s mounted to un-configured %+v", bucket, remoteStorageMountLocation) + return "", nil, nil, false + } + return bucket, remoteStorageMountLocation, remoteConf, true +} + +func extractBucketPath(bucketsDir, dir string) (util.FullPath, bool) { + if !strings.HasPrefix(dir, bucketsDir+"/") { + return "", false + } + parts := strings.SplitN(dir[len(bucketsDir)+1:], "/", 2) + return util.FullPath(bucketsDir).Child(parts[0]), true +} + +func (option *RemoteGatewayOptions) collectRemoteStorageConf() (err error) { + + if mappings, err := filer.ReadMountMappings(option.grpcDialOption, pb.ServerAddress(*option.filerAddress)); err != nil { + return err + } else { + option.mappings = mappings + } + + option.remoteConfs = make(map[string]*remote_pb.RemoteConf) + var lastConfName string + err = filer_pb.List(option, filer.DirectoryEtcRemote, "", func(entry *filer_pb.Entry, isLast bool) error { + if !strings.HasSuffix(entry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) { + return nil + } + conf := &remote_pb.RemoteConf{} + if err := proto.Unmarshal(entry.Content, conf); err != nil { + return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, entry.Name, err) + } + option.remoteConfs[conf.Name] = conf + lastConfName = conf.Name + return nil + }, "", false, math.MaxUint32) + + if option.mappings.PrimaryBucketStorageName == "" && len(option.remoteConfs) == 1 { + glog.V(0).Infof("%s is set to the default remote storage", lastConfName) + option.mappings.PrimaryBucketStorageName = lastConfName + } + + return +} |
