aboutsummaryrefslogtreecommitdiff
path: root/weed/command
diff options
context:
space:
mode:
authorhilimd <68371223+hilimd@users.noreply.github.com>2021-09-13 10:34:33 +0800
committerGitHub <noreply@github.com>2021-09-13 10:34:33 +0800
commit1de733fda507e1da94b2e4741c74ba7e5e2c5f76 (patch)
treeaed7ac29e27e0f8def942154603375396fae9489 /weed/command
parent27c05f8c0b5c7bda43babeb61d79684d11851111 (diff)
parent7591336a2269c1ad92266280634bcaea34f7a5d1 (diff)
downloadseaweedfs-1de733fda507e1da94b2e4741c74ba7e5e2c5f76.tar.xz
seaweedfs-1de733fda507e1da94b2e4741c74ba7e5e2c5f76.zip
Merge pull request #81 from chrislusf/master
sync
Diffstat (limited to 'weed/command')
-rw-r--r--weed/command/filer.go9
-rw-r--r--weed/command/filer_backup.go2
-rw-r--r--weed/command/filer_cat.go5
-rw-r--r--weed/command/filer_copy.go92
-rw-r--r--weed/command/filer_meta_backup.go2
-rw-r--r--weed/command/filer_meta_tail.go2
-rw-r--r--weed/command/filer_remote_sync.go254
-rw-r--r--weed/command/filer_remote_sync_buckets.go387
-rw-r--r--weed/command/filer_remote_sync_dir.go221
-rw-r--r--weed/command/filer_sync.go2
-rw-r--r--weed/command/filer_sync_std.go1
-rw-r--r--weed/command/imports.go4
-rw-r--r--weed/command/master.go7
-rw-r--r--weed/command/master_follower.go5
-rw-r--r--weed/command/mount_notsupported.go5
-rw-r--r--weed/command/mount_std.go1
-rw-r--r--weed/command/msg_broker.go3
-rw-r--r--weed/command/scaffold/filer.toml12
-rw-r--r--weed/command/server.go2
-rw-r--r--weed/command/volume.go10
20 files changed, 809 insertions, 217 deletions
diff --git a/weed/command/filer.go b/weed/command/filer.go
index ddee0852c..9a27978be 100644
--- a/weed/command/filer.go
+++ b/weed/command/filer.go
@@ -4,7 +4,6 @@ import (
"fmt"
"net/http"
"os"
- "strconv"
"strings"
"time"
@@ -134,7 +133,7 @@ func runFiler(cmd *Command, args []string) bool {
go stats_collect.StartMetricsServer(*f.metricsHttpPort)
- filerAddress := fmt.Sprintf("%s:%d", *f.ip, *f.port)
+ filerAddress := util.JoinHostPort(*f.ip, *f.port)
startDelay := time.Duration(2)
if *filerStartS3 {
filerS3Options.filer = &filerAddress
@@ -207,7 +206,7 @@ func (fo *FilerOptions) startFiler() {
}
if *fo.publicPort != 0 {
- publicListeningAddress := *fo.bindIp + ":" + strconv.Itoa(*fo.publicPort)
+ publicListeningAddress := util.JoinHostPort(*fo.bindIp, *fo.publicPort)
glog.V(0).Infoln("Start Seaweed filer server", util.Version(), "public at", publicListeningAddress)
publicListener, e := util.NewListener(publicListeningAddress, 0)
if e != nil {
@@ -222,7 +221,7 @@ func (fo *FilerOptions) startFiler() {
glog.V(0).Infof("Start Seaweed Filer %s at %s:%d", util.Version(), *fo.ip, *fo.port)
filerListener, e := util.NewListener(
- *fo.bindIp+":"+strconv.Itoa(*fo.port),
+ util.JoinHostPort(*fo.bindIp, *fo.port),
time.Duration(10)*time.Second,
)
if e != nil {
@@ -231,7 +230,7 @@ func (fo *FilerOptions) startFiler() {
// starting grpc server
grpcPort := *fo.port + 10000
- grpcL, err := util.NewListener(*fo.bindIp+":"+strconv.Itoa(grpcPort), 0)
+ grpcL, err := util.NewListener(util.JoinHostPort(*fo.bindIp, grpcPort), 0)
if err != nil {
glog.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err)
}
diff --git a/weed/command/filer_backup.go b/weed/command/filer_backup.go
index 0c450181b..5b6409187 100644
--- a/weed/command/filer_backup.go
+++ b/weed/command/filer_backup.go
@@ -113,6 +113,6 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti
})
return pb.FollowMetadata(sourceFiler, grpcDialOption, "backup_"+dataSink.GetName(),
- sourcePath, startFrom.UnixNano(), 0, processEventFnWithOffset, false)
+ sourcePath, nil, startFrom.UnixNano(), 0, processEventFnWithOffset, false)
}
diff --git a/weed/command/filer_cat.go b/weed/command/filer_cat.go
index a46098b04..09f5e97fe 100644
--- a/weed/command/filer_cat.go
+++ b/weed/command/filer_cat.go
@@ -108,6 +108,11 @@ func runFilerCat(cmd *Command, args []string) bool {
return err
}
+ if len(respLookupEntry.Entry.Content) > 0 {
+ _, err = writer.Write(respLookupEntry.Entry.Content)
+ return err
+ }
+
filerCat.filerClient = client
return filer.StreamContent(&filerCat, writer, respLookupEntry.Entry.Chunks, 0, math.MaxInt64)
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go
index 722f64679..05aa96292 100644
--- a/weed/command/filer_copy.go
+++ b/weed/command/filer_copy.go
@@ -115,7 +115,7 @@ func runCopy(cmd *Command, args []string) bool {
}
filerGrpcPort := filerPort + 10000
- filerGrpcAddress := fmt.Sprintf("%s:%d", filerUrl.Hostname(), filerGrpcPort)
+ filerGrpcAddress := util.JoinHostPort(filerUrl.Hostname(), int(filerGrpcPort))
copy.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
masters, collection, replication, dirBuckets, maxMB, cipher, err := readFilerConfiguration(copy.grpcDialOption, filerGrpcAddress)
@@ -392,8 +392,16 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err
}
targetUrl := "http://" + assignResult.Url + "/" + assignResult.FileId
-
- uploadResult, err := operation.UploadData(targetUrl, fileName, worker.options.cipher, data, false, mimeType, nil, security.EncodedJwt(assignResult.Auth))
+ uploadOption := &operation.UploadOption{
+ UploadUrl: targetUrl,
+ Filename: fileName,
+ Cipher: worker.options.cipher,
+ IsInputCompressed: false,
+ MimeType: mimeType,
+ PairMap: nil,
+ Jwt: security.EncodedJwt(assignResult.Auth),
+ }
+ uploadResult, err := operation.UploadData(data, uploadOption)
if err != nil {
return fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err)
}
@@ -498,7 +506,16 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
replication = assignResult.Replication
}
- uploadResult, err, _ := operation.Upload(targetUrl, fileName+"-"+strconv.FormatInt(i+1, 10), worker.options.cipher, io.NewSectionReader(f, i*chunkSize, chunkSize), false, "", nil, security.EncodedJwt(assignResult.Auth))
+ uploadOption := &operation.UploadOption{
+ UploadUrl: targetUrl,
+ Filename: fileName+"-"+strconv.FormatInt(i+1, 10),
+ Cipher: worker.options.cipher,
+ IsInputCompressed: false,
+ MimeType: "",
+ PairMap: nil,
+ Jwt: security.EncodedJwt(assignResult.Auth),
+ }
+ uploadResult, err, _ := operation.Upload(io.NewSectionReader(f, i*chunkSize, chunkSize), uploadOption)
if err != nil {
uploadError = fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err)
return
@@ -531,6 +548,11 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
return uploadError
}
+ manifestedChunks, manifestErr := filer.MaybeManifestize(worker.saveDataAsChunk, chunks)
+ if manifestErr != nil {
+ return fmt.Errorf("create manifest: %v", manifestErr)
+ }
+
if err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.CreateEntryRequest{
Directory: task.destinationUrlPath,
@@ -548,7 +570,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
Collection: collection,
TtlSec: worker.options.ttlSec,
},
- Chunks: chunks,
+ Chunks: manifestedChunks,
},
}
@@ -583,3 +605,63 @@ func detectMimeType(f *os.File) string {
}
return mimeType
}
+
+func (worker *FileCopyWorker) saveDataAsChunk(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error) {
+
+ var fileId, host string
+ var auth security.EncodedJwt
+
+ if flushErr := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+
+ ctx := context.Background()
+
+ assignErr := util.Retry("assignVolume", func() error {
+ request := &filer_pb.AssignVolumeRequest{
+ Count: 1,
+ Replication: *worker.options.replication,
+ Collection: *worker.options.collection,
+ TtlSec: worker.options.ttlSec,
+ DiskType: *worker.options.diskType,
+ Path: name,
+ }
+
+ resp, err := client.AssignVolume(ctx, request)
+ if err != nil {
+ return fmt.Errorf("assign volume failure %v: %v", request, err)
+ }
+ if resp.Error != "" {
+ return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
+ }
+
+ fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
+ collection, replication = resp.Collection, resp.Replication
+
+ return nil
+ })
+ if assignErr != nil {
+ return assignErr
+ }
+
+ return nil
+ }); flushErr != nil {
+ return nil, collection, replication, fmt.Errorf("filerGrpcAddress assign volume: %v", flushErr)
+ }
+
+ uploadOption := &operation.UploadOption{
+ UploadUrl: fmt.Sprintf("http://%s/%s", host, fileId),
+ Filename: name,
+ Cipher: worker.options.cipher,
+ IsInputCompressed: false,
+ MimeType: "",
+ PairMap: nil,
+ Jwt: auth,
+ }
+ uploadResult, flushErr, _ := operation.Upload(reader, uploadOption)
+ if flushErr != nil {
+ return nil, collection, replication, fmt.Errorf("upload data: %v", flushErr)
+ }
+ if uploadResult.Error != "" {
+ return nil, collection, replication, fmt.Errorf("upload result: %v", uploadResult.Error)
+ }
+ return uploadResult.ToPbFileChunk(fileId, offset), collection, replication, nil
+}
diff --git a/weed/command/filer_meta_backup.go b/weed/command/filer_meta_backup.go
index 6fe323fba..3757f63f1 100644
--- a/weed/command/filer_meta_backup.go
+++ b/weed/command/filer_meta_backup.go
@@ -196,7 +196,7 @@ func (metaBackup *FilerMetaBackupOptions) streamMetadataBackup() error {
})
return pb.FollowMetadata(*metaBackup.filerAddress, metaBackup.grpcDialOption, "meta_backup",
- *metaBackup.filerDirectory, startTime.UnixNano(), 0, processEventFnWithOffset, false)
+ *metaBackup.filerDirectory, nil, startTime.UnixNano(), 0, processEventFnWithOffset, false)
}
diff --git a/weed/command/filer_meta_tail.go b/weed/command/filer_meta_tail.go
index 28c0db99b..0363ae8d1 100644
--- a/weed/command/filer_meta_tail.go
+++ b/weed/command/filer_meta_tail.go
@@ -104,7 +104,7 @@ func runFilerMetaTail(cmd *Command, args []string) bool {
}
tailErr := pb.FollowMetadata(*tailFiler, grpcDialOption, "tail",
- *tailTarget, time.Now().Add(-*tailStart).UnixNano(), 0,
+ *tailTarget, nil, time.Now().Add(-*tailStart).UnixNano(), 0,
func(resp *filer_pb.SubscribeMetadataResponse) error {
if !shouldPrint(resp) {
return nil
diff --git a/weed/command/filer_remote_sync.go b/weed/command/filer_remote_sync.go
index 8b20957e4..3776ee4d9 100644
--- a/weed/command/filer_remote_sync.go
+++ b/weed/command/filer_remote_sync.go
@@ -3,32 +3,33 @@ package command
import (
"context"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/remote_storage"
+ "github.com/chrislusf/seaweedfs/weed/pb/remote_pb"
"github.com/chrislusf/seaweedfs/weed/replication/source"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/golang/protobuf/proto"
"google.golang.org/grpc"
+ "os"
"time"
)
type RemoteSyncOptions struct {
- filerAddress *string
- grpcDialOption grpc.DialOption
- readChunkFromFiler *bool
- debug *bool
- timeAgo *time.Duration
- dir *string
+ filerAddress *string
+ grpcDialOption grpc.DialOption
+ readChunkFromFiler *bool
+ debug *bool
+ timeAgo *time.Duration
+ dir *string
+ createBucketAt *string
+ createBucketRandomSuffix *bool
+
+ mappings *remote_pb.RemoteStorageMapping
+ remoteConfs map[string]*remote_pb.RemoteConf
+ bucketsDir string
}
-const (
- RemoteSyncKeyPrefix = "remote.sync."
-)
-
var _ = filer_pb.FilerClient(&RemoteSyncOptions{})
func (option *RemoteSyncOptions) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
@@ -47,20 +48,38 @@ var (
func init() {
cmdFilerRemoteSynchronize.Run = runFilerRemoteSynchronize // break init cycle
remoteSyncOptions.filerAddress = cmdFilerRemoteSynchronize.Flag.String("filer", "localhost:8888", "filer of the SeaweedFS cluster")
- remoteSyncOptions.dir = cmdFilerRemoteSynchronize.Flag.String("dir", "/", "a mounted directory on filer")
+ remoteSyncOptions.dir = cmdFilerRemoteSynchronize.Flag.String("dir", "", "a mounted directory on filer")
+ remoteSyncOptions.createBucketAt = cmdFilerRemoteSynchronize.Flag.String("createBucketAt", "", "one remote storage name to create new buckets in")
+ remoteSyncOptions.createBucketRandomSuffix = cmdFilerRemoteSynchronize.Flag.Bool("createBucketWithRandomSuffix", true, "add randomized suffix to bucket name to avoid conflicts")
remoteSyncOptions.readChunkFromFiler = cmdFilerRemoteSynchronize.Flag.Bool("filerProxy", false, "read file chunks from filer instead of volume servers")
remoteSyncOptions.debug = cmdFilerRemoteSynchronize.Flag.Bool("debug", false, "debug mode to print out filer updated remote files")
remoteSyncOptions.timeAgo = cmdFilerRemoteSynchronize.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
}
var cmdFilerRemoteSynchronize = &Command{
- UsageLine: "filer.remote.sync -filer=<filerHost>:<filerPort> -dir=/mount/s3_on_cloud",
- Short: "resumable continuously write back updates to remote storage if the directory is mounted to the remote storage",
- Long: `resumable continuously write back updates to remote storage if the directory is mounted to the remote storage
+ UsageLine: "filer.remote.sync",
+ Short: "resumable continuously write back updates to remote storage",
+ Long: `resumable continuously write back updates to remote storage
filer.remote.sync listens on filer update events.
If any mounted remote file is updated, it will fetch the updated content,
and write to the remote storage.
+
+ There are two modes:
+
+ 1)By default, watch /buckets folder and write back all changes.
+
+ # if there is only one remote storage configured
+ weed filer.remote.sync
+ # if there are multiple remote storages configured
+ # specify a remote storage to create new buckets.
+ weed filer.remote.sync -createBucketAt=cloud1
+
+ 2)Write back one mounted folder to remote storage
+
+ weed filer.remote.sync -dir=/mount/s3_on_cloud
+
+
`,
}
@@ -70,188 +89,57 @@ func runFilerRemoteSynchronize(cmd *Command, args []string) bool {
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
remoteSyncOptions.grpcDialOption = grpcDialOption
- // read filer remote storage mount mappings
- mappings, readErr := filer.ReadMountMappings(grpcDialOption, *remoteSyncOptions.filerAddress)
- if readErr != nil {
- fmt.Printf("read mount mapping: %v", readErr)
- return false
- }
+ dir := *remoteSyncOptions.dir
+ filerAddress := *remoteSyncOptions.filerAddress
filerSource := &source.FilerSource{}
filerSource.DoInitialize(
- *remoteSyncOptions.filerAddress,
- pb.ServerToGrpcAddress(*remoteSyncOptions.filerAddress),
+ filerAddress,
+ pb.ServerToGrpcAddress(filerAddress),
"/", // does not matter
*remoteSyncOptions.readChunkFromFiler,
)
- var found bool
- for dir, remoteStorageMountLocation := range mappings.Mappings {
- if *remoteSyncOptions.dir == dir {
- found = true
- storageConf, readErr := filer.ReadRemoteStorageConf(grpcDialOption, *remoteSyncOptions.filerAddress, remoteStorageMountLocation.Name)
- if readErr != nil {
- fmt.Printf("read remote storage configuration for %s: %v", dir, readErr)
- continue
- }
- fmt.Printf("synchronize %s to remote storage...\n", *remoteSyncOptions.dir)
- if err := util.Retry("filer.remote.sync "+dir, func() error {
- return followUpdatesAndUploadToRemote(&remoteSyncOptions, filerSource, dir, storageConf, remoteStorageMountLocation)
- }); err != nil {
- fmt.Printf("synchronize %s: %v\n", *remoteSyncOptions.dir, err)
- }
- break
- }
- }
- if !found {
- fmt.Printf("directory %s is not mounted to any remote storage\n", *remoteSyncOptions.dir)
- return false
- }
-
- return true
-}
-
-func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *source.FilerSource, mountedDir string, remoteStorage *filer_pb.RemoteConf, remoteStorageMountLocation *filer_pb.RemoteStorageLocation) error {
-
- dirHash := util.HashStringToLong(mountedDir)
-
- // 1. specified by timeAgo
- // 2. last offset timestamp for this directory
- // 3. directory creation time
- var lastOffsetTs time.Time
- if *option.timeAgo == 0 {
- mountedDirEntry, err := filer_pb.GetEntry(option, util.FullPath(mountedDir))
+ remoteSyncOptions.bucketsDir = "/buckets"
+ // check buckets again
+ remoteSyncOptions.WithFilerClient(func(filerClient filer_pb.SeaweedFilerClient) error {
+ resp, err := filerClient.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
- return fmt.Errorf("lookup %s: %v", mountedDir, err)
- }
-
- lastOffsetTsNs, err := getOffset(option.grpcDialOption, *option.filerAddress, RemoteSyncKeyPrefix, int32(dirHash))
- if err == nil && mountedDirEntry.Attributes.Crtime < lastOffsetTsNs/1000000 {
- lastOffsetTs = time.Unix(0, lastOffsetTsNs)
- glog.V(0).Infof("resume from %v", lastOffsetTs)
- } else {
- lastOffsetTs = time.Unix(mountedDirEntry.Attributes.Crtime, 0)
- }
- } else {
- lastOffsetTs = time.Now().Add(-*option.timeAgo)
- }
-
- client, err := remote_storage.GetRemoteStorage(remoteStorage)
- if err != nil {
- return err
- }
-
- eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
- message := resp.EventNotification
- if message.OldEntry == nil && message.NewEntry == nil {
- return nil
- }
- if message.OldEntry == nil && message.NewEntry != nil {
- if len(message.NewEntry.Chunks) == 0 {
- return nil
- }
- fmt.Printf("create: %+v\n", resp)
- if !shouldSendToRemote(message.NewEntry) {
- fmt.Printf("skipping creating: %+v\n", resp)
- return nil
- }
- dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation)
- if message.NewEntry.IsDirectory {
- return client.WriteDirectory(dest, message.NewEntry)
- }
- reader := filer.NewChunkStreamReader(filerSource, message.NewEntry.Chunks)
- remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader)
- if writeErr != nil {
- return writeErr
- }
- return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
- }
- if message.OldEntry != nil && message.NewEntry == nil {
- fmt.Printf("delete: %+v\n", resp)
- dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
- return client.DeleteFile(dest)
+ return err
}
- if message.OldEntry != nil && message.NewEntry != nil {
- oldDest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
- dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation)
- if !shouldSendToRemote(message.NewEntry) {
- fmt.Printf("skipping updating: %+v\n", resp)
- return nil
- }
- if message.NewEntry.IsDirectory {
- return client.WriteDirectory(dest, message.NewEntry)
- }
- if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name {
- if isSameChunks(message.OldEntry.Chunks, message.NewEntry.Chunks) {
- fmt.Printf("update meta: %+v\n", resp)
- return client.UpdateFileMetadata(dest, message.NewEntry)
- }
- }
- fmt.Printf("update: %+v\n", resp)
- if err := client.DeleteFile(oldDest); err != nil {
- return err
- }
- reader := filer.NewChunkStreamReader(filerSource, message.NewEntry.Chunks)
- remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader)
- if writeErr != nil {
- return writeErr
- }
- return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
- }
-
+ remoteSyncOptions.bucketsDir = resp.DirBuckets
return nil
- }
-
- processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error {
- lastTime := time.Unix(0, lastTsNs)
- glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, lastTime, float64(counter)/float64(3))
- return setOffset(option.grpcDialOption, *option.filerAddress, RemoteSyncKeyPrefix, int32(dirHash), lastTsNs)
})
- return pb.FollowMetadata(*option.filerAddress, option.grpcDialOption,
- "filer.remote.sync", mountedDir, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false)
-}
-
-func toRemoteStorageLocation(mountDir, sourcePath util.FullPath, remoteMountLocation *filer_pb.RemoteStorageLocation) *filer_pb.RemoteStorageLocation {
- source := string(sourcePath[len(mountDir):])
- dest := util.FullPath(remoteMountLocation.Path).Child(source)
- return &filer_pb.RemoteStorageLocation{
- Name: remoteMountLocation.Name,
- Bucket: remoteMountLocation.Bucket,
- Path: string(dest),
- }
-}
-
-func isSameChunks(a, b []*filer_pb.FileChunk) bool {
- if len(a) != len(b) {
- return false
- }
- for i := 0; i < len(a); i++ {
- x, y := a[i], b[i]
- if !proto.Equal(x, y) {
- return false
- }
- }
- return true
-}
-
-func shouldSendToRemote(entry *filer_pb.Entry) bool {
- if entry.RemoteEntry == nil {
+ if dir != "" && dir != remoteSyncOptions.bucketsDir {
+ fmt.Printf("synchronize %s to remote storage...\n", dir)
+ util.RetryForever("filer.remote.sync "+dir, func() error {
+ return followUpdatesAndUploadToRemote(&remoteSyncOptions, filerSource, dir)
+ }, func(err error) bool {
+ if err != nil {
+ glog.Errorf("synchronize %s: %v", dir, err)
+ }
+ return true
+ })
return true
}
- if entry.RemoteEntry.LastLocalSyncTsNs/1e9 < entry.Attributes.Mtime {
+
+ // read filer remote storage mount mappings
+ if detectErr := remoteSyncOptions.collectRemoteStorageConf(); detectErr != nil {
+ fmt.Fprintf(os.Stderr, "read mount info: %v\n", detectErr)
return true
}
- return false
-}
-func updateLocalEntry(filerClient filer_pb.FilerClient, dir string, entry *filer_pb.Entry, remoteEntry *filer_pb.RemoteEntry) error {
- entry.RemoteEntry = remoteEntry
- return filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
- _, err := client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{
- Directory: dir,
- Entry: entry,
- })
- return err
+ // synchronize /buckets folder
+ fmt.Printf("synchronize buckets in %s ...\n", remoteSyncOptions.bucketsDir)
+ util.RetryForever("filer.remote.sync buckets", func() error {
+ return remoteSyncOptions.followBucketUpdatesAndUploadToRemote(filerSource)
+ }, func(err error) bool {
+ if err != nil {
+ glog.Errorf("synchronize %s: %v", remoteSyncOptions.bucketsDir, err)
+ }
+ return true
})
+ return true
+
}
diff --git a/weed/command/filer_remote_sync_buckets.go b/weed/command/filer_remote_sync_buckets.go
new file mode 100644
index 000000000..70f9f49c1
--- /dev/null
+++ b/weed/command/filer_remote_sync_buckets.go
@@ -0,0 +1,387 @@
+package command
+
+import (
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/remote_pb"
+ "github.com/chrislusf/seaweedfs/weed/remote_storage"
+ "github.com/chrislusf/seaweedfs/weed/replication/source"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/golang/protobuf/proto"
+ "math"
+ "math/rand"
+ "strings"
+ "time"
+)
+
+func (option *RemoteSyncOptions) followBucketUpdatesAndUploadToRemote(filerSource *source.FilerSource) error {
+
+ // read filer remote storage mount mappings
+ if detectErr := option.collectRemoteStorageConf(); detectErr != nil {
+ return fmt.Errorf("read mount info: %v", detectErr)
+ }
+
+ eachEntryFunc, err := option.makeBucketedEventProcessor(filerSource)
+ if err != nil {
+ return err
+ }
+
+ processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error {
+ lastTime := time.Unix(0, lastTsNs)
+ glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, lastTime, float64(counter)/float64(3))
+ return remote_storage.SetSyncOffset(option.grpcDialOption, *option.filerAddress, option.bucketsDir, lastTsNs)
+ })
+
+ lastOffsetTs := collectLastSyncOffset(option, option.bucketsDir)
+
+ return pb.FollowMetadata(*option.filerAddress, option.grpcDialOption, "filer.remote.sync",
+ option.bucketsDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false)
+}
+
+func (option *RemoteSyncOptions) makeBucketedEventProcessor(filerSource *source.FilerSource) (pb.ProcessMetadataFunc, error) {
+
+ handleCreateBucket := func(entry *filer_pb.Entry) error {
+ if !entry.IsDirectory {
+ return nil
+ }
+ if entry.RemoteEntry != nil {
+ // this directory is imported from "remote.mount.buckets" or "remote.mount"
+ return nil
+ }
+ if option.mappings.PrimaryBucketStorageName != "" && *option.createBucketAt == "" {
+ *option.createBucketAt = option.mappings.PrimaryBucketStorageName
+ glog.V(0).Infof("%s is set as the primary remote storage", *option.createBucketAt)
+ }
+ if len(option.mappings.Mappings) == 1 && *option.createBucketAt == "" {
+ for k := range option.mappings.Mappings {
+ *option.createBucketAt = k
+ glog.V(0).Infof("%s is set as the only remote storage", *option.createBucketAt)
+ }
+ }
+ if *option.createBucketAt == "" {
+ return nil
+ }
+ remoteConf, found := option.remoteConfs[*option.createBucketAt]
+ if !found {
+ return fmt.Errorf("un-configured remote storage %s", *option.createBucketAt)
+ }
+
+ client, err := remote_storage.GetRemoteStorage(remoteConf)
+ if err != nil {
+ return err
+ }
+
+ bucketName := strings.ToLower(entry.Name)
+ if *option.createBucketRandomSuffix {
+ // https://docs.aws.amazon.com/AmazonS3/latest/userguide/bucketnamingrules.html
+ if len(bucketName)+5 > 63 {
+ bucketName = bucketName[:58]
+ }
+ bucketName = fmt.Sprintf("%s-%4d", bucketName, rand.Uint32()%10000)
+ }
+
+ glog.V(0).Infof("create bucket %s", bucketName)
+ if err := client.CreateBucket(bucketName); err != nil {
+ return fmt.Errorf("create bucket %s in %s: %v", bucketName, remoteConf.Name, err)
+ }
+
+ bucketPath := util.FullPath(option.bucketsDir).Child(entry.Name)
+ remoteLocation := &remote_pb.RemoteStorageLocation{
+ Name: *option.createBucketAt,
+ Bucket: bucketName,
+ Path: "/",
+ }
+
+ // need to add new mapping here before getting upates from metadata tailing
+ option.mappings.Mappings[string(bucketPath)] = remoteLocation
+
+ return filer.InsertMountMapping(option, string(bucketPath), remoteLocation)
+
+ }
+ handleDeleteBucket := func(entry *filer_pb.Entry) error {
+ if !entry.IsDirectory {
+ return nil
+ }
+
+ client, remoteStorageMountLocation, err := option.findRemoteStorageClient(entry.Name)
+ if err != nil {
+ return fmt.Errorf("findRemoteStorageClient %s: %v", entry.Name, err)
+ }
+
+ glog.V(0).Infof("delete remote bucket %s", remoteStorageMountLocation.Bucket)
+ if err := client.DeleteBucket(remoteStorageMountLocation.Bucket); err != nil {
+ return fmt.Errorf("delete remote bucket %s: %v", remoteStorageMountLocation.Bucket, err)
+ }
+
+ bucketPath := util.FullPath(option.bucketsDir).Child(entry.Name)
+
+ return filer.DeleteMountMapping(option, string(bucketPath))
+ }
+
+ handleEtcRemoteChanges := func(resp *filer_pb.SubscribeMetadataResponse) error {
+ message := resp.EventNotification
+ if message.NewEntry != nil {
+ // update
+ if message.NewEntry.Name == filer.REMOTE_STORAGE_MOUNT_FILE {
+ newMappings, readErr := filer.UnmarshalRemoteStorageMappings(message.NewEntry.Content)
+ if readErr != nil {
+ return fmt.Errorf("unmarshal mappings: %v", readErr)
+ }
+ option.mappings = newMappings
+ }
+ if strings.HasSuffix(message.NewEntry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) {
+ conf := &remote_pb.RemoteConf{}
+ if err := proto.Unmarshal(message.NewEntry.Content, conf); err != nil {
+ return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.NewEntry.Name, err)
+ }
+ option.remoteConfs[conf.Name] = conf
+ }
+ } else if message.OldEntry != nil {
+ // deletion
+ if strings.HasSuffix(message.OldEntry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) {
+ conf := &remote_pb.RemoteConf{}
+ if err := proto.Unmarshal(message.OldEntry.Content, conf); err != nil {
+ return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.OldEntry.Name, err)
+ }
+ delete(option.remoteConfs, conf.Name)
+ }
+ }
+
+ return nil
+ }
+
+ eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
+ message := resp.EventNotification
+ if strings.HasPrefix(resp.Directory, filer.DirectoryEtcRemote) {
+ return handleEtcRemoteChanges(resp)
+ }
+
+ if message.OldEntry == nil && message.NewEntry == nil {
+ return nil
+ }
+ if message.OldEntry == nil && message.NewEntry != nil {
+ if message.NewParentPath == option.bucketsDir {
+ return handleCreateBucket(message.NewEntry)
+ }
+ if !filer.HasData(message.NewEntry) {
+ return nil
+ }
+ bucket, remoteStorageMountLocation, remoteStorage, ok := option.detectBucketInfo(message.NewParentPath)
+ if !ok {
+ return nil
+ }
+ client, err := remote_storage.GetRemoteStorage(remoteStorage)
+ if err != nil {
+ return err
+ }
+ glog.V(2).Infof("create: %+v", resp)
+ if !shouldSendToRemote(message.NewEntry) {
+ glog.V(2).Infof("skipping creating: %+v", resp)
+ return nil
+ }
+ dest := toRemoteStorageLocation(bucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation)
+ if message.NewEntry.IsDirectory {
+ glog.V(0).Infof("mkdir %s", remote_storage.FormatLocation(dest))
+ return client.WriteDirectory(dest, message.NewEntry)
+ }
+ glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest))
+ reader := filer.NewFileReader(filerSource, message.NewEntry)
+ remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader)
+ if writeErr != nil {
+ return writeErr
+ }
+ return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
+ }
+ if message.OldEntry != nil && message.NewEntry == nil {
+ if resp.Directory == option.bucketsDir {
+ return handleDeleteBucket(message.OldEntry)
+ }
+ bucket, remoteStorageMountLocation, remoteStorage, ok := option.detectBucketInfo(resp.Directory)
+ if !ok {
+ return nil
+ }
+ client, err := remote_storage.GetRemoteStorage(remoteStorage)
+ if err != nil {
+ return err
+ }
+ glog.V(2).Infof("delete: %+v", resp)
+ dest := toRemoteStorageLocation(bucket, util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
+ if message.OldEntry.IsDirectory {
+ glog.V(0).Infof("rmdir %s", remote_storage.FormatLocation(dest))
+ return client.RemoveDirectory(dest)
+ }
+ glog.V(0).Infof("delete %s", remote_storage.FormatLocation(dest))
+ return client.DeleteFile(dest)
+ }
+ if message.OldEntry != nil && message.NewEntry != nil {
+ if resp.Directory == option.bucketsDir {
+ if message.NewParentPath == option.bucketsDir {
+ if message.OldEntry.Name == message.NewEntry.Name {
+ return nil
+ }
+ if err := handleCreateBucket(message.NewEntry); err != nil {
+ return err
+ }
+ if err := handleDeleteBucket(message.OldEntry); err != nil {
+ return err
+ }
+ }
+ }
+ oldBucket, oldRemoteStorageMountLocation, oldRemoteStorage, oldOk := option.detectBucketInfo(resp.Directory)
+ newBucket, newRemoteStorageMountLocation, newRemoteStorage, newOk := option.detectBucketInfo(message.NewParentPath)
+ if oldOk && newOk {
+ if !shouldSendToRemote(message.NewEntry) {
+ glog.V(2).Infof("skipping updating: %+v", resp)
+ return nil
+ }
+ client, err := remote_storage.GetRemoteStorage(oldRemoteStorage)
+ if err != nil {
+ return err
+ }
+ if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name {
+ // update the same entry
+ if message.NewEntry.IsDirectory {
+ // update directory property
+ return nil
+ }
+ if filer.IsSameData(message.OldEntry, message.NewEntry) {
+ glog.V(2).Infof("update meta: %+v", resp)
+ oldDest := toRemoteStorageLocation(oldBucket, util.NewFullPath(resp.Directory, message.OldEntry.Name), oldRemoteStorageMountLocation)
+ return client.UpdateFileMetadata(oldDest, message.OldEntry, message.NewEntry)
+ } else {
+ newDest := toRemoteStorageLocation(newBucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), newRemoteStorageMountLocation)
+ reader := filer.NewFileReader(filerSource, message.NewEntry)
+ glog.V(0).Infof("create %s", remote_storage.FormatLocation(newDest))
+ remoteEntry, writeErr := client.WriteFile(newDest, message.NewEntry, reader)
+ if writeErr != nil {
+ return writeErr
+ }
+ return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
+ }
+ }
+ }
+
+ // the following is entry rename
+ if oldOk {
+ client, err := remote_storage.GetRemoteStorage(oldRemoteStorage)
+ if err != nil {
+ return err
+ }
+ oldDest := toRemoteStorageLocation(oldBucket, util.NewFullPath(resp.Directory, message.OldEntry.Name), oldRemoteStorageMountLocation)
+ if message.OldEntry.IsDirectory {
+ return client.RemoveDirectory(oldDest)
+ }
+ glog.V(0).Infof("delete %s", remote_storage.FormatLocation(oldDest))
+ if err := client.DeleteFile(oldDest); err != nil {
+ return err
+ }
+ }
+ if newOk {
+ if !shouldSendToRemote(message.NewEntry) {
+ glog.V(2).Infof("skipping updating: %+v", resp)
+ return nil
+ }
+ client, err := remote_storage.GetRemoteStorage(newRemoteStorage)
+ if err != nil {
+ return err
+ }
+ newDest := toRemoteStorageLocation(newBucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), newRemoteStorageMountLocation)
+ if message.NewEntry.IsDirectory {
+ return client.WriteDirectory(newDest, message.NewEntry)
+ }
+ reader := filer.NewFileReader(filerSource, message.NewEntry)
+ glog.V(0).Infof("create %s", remote_storage.FormatLocation(newDest))
+ remoteEntry, writeErr := client.WriteFile(newDest, message.NewEntry, reader)
+ if writeErr != nil {
+ return writeErr
+ }
+ return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
+ }
+ }
+
+ return nil
+ }
+ return eachEntryFunc, nil
+}
+
+func (option *RemoteSyncOptions) findRemoteStorageClient(bucketName string) (client remote_storage.RemoteStorageClient, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, err error) {
+ bucket := util.FullPath(option.bucketsDir).Child(bucketName)
+
+ var isMounted bool
+ remoteStorageMountLocation, isMounted = option.mappings.Mappings[string(bucket)]
+ if !isMounted {
+ return nil, remoteStorageMountLocation, fmt.Errorf("%s is not mounted", bucket)
+ }
+ remoteConf, hasClient := option.remoteConfs[remoteStorageMountLocation.Name]
+ if !hasClient {
+ return nil, remoteStorageMountLocation, fmt.Errorf("%s mounted to un-configured %+v", bucket, remoteStorageMountLocation)
+ }
+
+ client, err = remote_storage.GetRemoteStorage(remoteConf)
+ if err != nil {
+ return nil, remoteStorageMountLocation, err
+ }
+ return client, remoteStorageMountLocation, nil
+}
+
+func (option *RemoteSyncOptions) detectBucketInfo(actualDir string) (bucket util.FullPath, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, remoteConf *remote_pb.RemoteConf, ok bool) {
+ bucket, ok = extractBucketPath(option.bucketsDir, actualDir)
+ if !ok {
+ return "", nil, nil, false
+ }
+ var isMounted bool
+ remoteStorageMountLocation, isMounted = option.mappings.Mappings[string(bucket)]
+ if !isMounted {
+ glog.Warningf("%s is not mounted", bucket)
+ return "", nil, nil, false
+ }
+ var hasClient bool
+ remoteConf, hasClient = option.remoteConfs[remoteStorageMountLocation.Name]
+ if !hasClient {
+ glog.Warningf("%s mounted to un-configured %+v", bucket, remoteStorageMountLocation)
+ return "", nil, nil, false
+ }
+ return bucket, remoteStorageMountLocation, remoteConf, true
+}
+
+func extractBucketPath(bucketsDir, dir string) (util.FullPath, bool) {
+ if !strings.HasPrefix(dir, bucketsDir+"/") {
+ return "", false
+ }
+ parts := strings.SplitN(dir[len(bucketsDir)+1:], "/", 2)
+ return util.FullPath(bucketsDir).Child(parts[0]), true
+}
+
+func (option *RemoteSyncOptions) collectRemoteStorageConf() (err error) {
+
+ if mappings, err := filer.ReadMountMappings(option.grpcDialOption, *option.filerAddress); err != nil {
+ return err
+ } else {
+ option.mappings = mappings
+ }
+
+ option.remoteConfs = make(map[string]*remote_pb.RemoteConf)
+ var lastConfName string
+ err = filer_pb.List(option, filer.DirectoryEtcRemote, "", func(entry *filer_pb.Entry, isLast bool) error {
+ if !strings.HasSuffix(entry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) {
+ return nil
+ }
+ conf := &remote_pb.RemoteConf{}
+ if err := proto.Unmarshal(entry.Content, conf); err != nil {
+ return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, entry.Name, err)
+ }
+ option.remoteConfs[conf.Name] = conf
+ lastConfName = conf.Name
+ return nil
+ }, "", false, math.MaxUint32)
+
+ if option.mappings.PrimaryBucketStorageName == "" && len(option.remoteConfs) == 1 {
+ glog.V(0).Infof("%s is set to the default remote storage", lastConfName)
+ option.mappings.PrimaryBucketStorageName = lastConfName
+ }
+
+ return
+}
diff --git a/weed/command/filer_remote_sync_dir.go b/weed/command/filer_remote_sync_dir.go
new file mode 100644
index 000000000..dc2e9a1fb
--- /dev/null
+++ b/weed/command/filer_remote_sync_dir.go
@@ -0,0 +1,221 @@
+package command
+
+import (
+ "context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/remote_pb"
+ "github.com/chrislusf/seaweedfs/weed/remote_storage"
+ "github.com/chrislusf/seaweedfs/weed/replication/source"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/golang/protobuf/proto"
+ "os"
+ "strings"
+ "time"
+)
+
+func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *source.FilerSource, mountedDir string) error {
+
+ // read filer remote storage mount mappings
+ _, _, remoteStorageMountLocation, remoteStorage, detectErr := filer.DetectMountInfo(option.grpcDialOption, *option.filerAddress, mountedDir)
+ if detectErr != nil {
+ return fmt.Errorf("read mount info: %v", detectErr)
+ }
+
+ eachEntryFunc, err := makeEventProcessor(remoteStorage, mountedDir, remoteStorageMountLocation, filerSource)
+ if err != nil {
+ return err
+ }
+
+ processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error {
+ lastTime := time.Unix(0, lastTsNs)
+ glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, lastTime, float64(counter)/float64(3))
+ return remote_storage.SetSyncOffset(option.grpcDialOption, *option.filerAddress, mountedDir, lastTsNs)
+ })
+
+ lastOffsetTs := collectLastSyncOffset(option, mountedDir)
+
+ return pb.FollowMetadata(*option.filerAddress, option.grpcDialOption, "filer.remote.sync",
+ mountedDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false)
+}
+
+func makeEventProcessor(remoteStorage *remote_pb.RemoteConf, mountedDir string, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, filerSource *source.FilerSource) (pb.ProcessMetadataFunc, error) {
+ client, err := remote_storage.GetRemoteStorage(remoteStorage)
+ if err != nil {
+ return nil, err
+ }
+
+ handleEtcRemoteChanges := func(resp *filer_pb.SubscribeMetadataResponse) error {
+ message := resp.EventNotification
+ if message.NewEntry == nil {
+ return nil
+ }
+ if message.NewEntry.Name == filer.REMOTE_STORAGE_MOUNT_FILE {
+ mappings, readErr := filer.UnmarshalRemoteStorageMappings(message.NewEntry.Content)
+ if readErr != nil {
+ return fmt.Errorf("unmarshal mappings: %v", readErr)
+ }
+ if remoteLoc, found := mappings.Mappings[mountedDir]; found {
+ if remoteStorageMountLocation.Bucket != remoteLoc.Bucket || remoteStorageMountLocation.Path != remoteLoc.Path {
+ glog.Fatalf("Unexpected mount changes %+v => %+v", remoteStorageMountLocation, remoteLoc)
+ }
+ } else {
+ glog.V(0).Infof("unmounted %s exiting ...", mountedDir)
+ os.Exit(0)
+ }
+ }
+ if message.NewEntry.Name == remoteStorage.Name+filer.REMOTE_STORAGE_CONF_SUFFIX {
+ conf := &remote_pb.RemoteConf{}
+ if err := proto.Unmarshal(message.NewEntry.Content, conf); err != nil {
+ return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.NewEntry.Name, err)
+ }
+ remoteStorage = conf
+ if newClient, err := remote_storage.GetRemoteStorage(remoteStorage); err == nil {
+ client = newClient
+ } else {
+ return err
+ }
+ }
+
+ return nil
+ }
+
+ eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
+ message := resp.EventNotification
+ if strings.HasPrefix(resp.Directory, filer.DirectoryEtcRemote) {
+ return handleEtcRemoteChanges(resp)
+ }
+
+ if message.OldEntry == nil && message.NewEntry == nil {
+ return nil
+ }
+ if message.OldEntry == nil && message.NewEntry != nil {
+ if !filer.HasData(message.NewEntry) {
+ return nil
+ }
+ glog.V(2).Infof("create: %+v", resp)
+ if !shouldSendToRemote(message.NewEntry) {
+ glog.V(2).Infof("skipping creating: %+v", resp)
+ return nil
+ }
+ dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation)
+ if message.NewEntry.IsDirectory {
+ glog.V(0).Infof("mkdir %s", remote_storage.FormatLocation(dest))
+ return client.WriteDirectory(dest, message.NewEntry)
+ }
+ glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest))
+ reader := filer.NewFileReader(filerSource, message.NewEntry)
+ remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader)
+ if writeErr != nil {
+ return writeErr
+ }
+ return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
+ }
+ if message.OldEntry != nil && message.NewEntry == nil {
+ glog.V(2).Infof("delete: %+v", resp)
+ dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
+ if message.OldEntry.IsDirectory {
+ glog.V(0).Infof("rmdir %s", remote_storage.FormatLocation(dest))
+ return client.RemoveDirectory(dest)
+ }
+ glog.V(0).Infof("delete %s", remote_storage.FormatLocation(dest))
+ return client.DeleteFile(dest)
+ }
+ if message.OldEntry != nil && message.NewEntry != nil {
+ oldDest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
+ dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation)
+ if !shouldSendToRemote(message.NewEntry) {
+ glog.V(2).Infof("skipping updating: %+v", resp)
+ return nil
+ }
+ if message.NewEntry.IsDirectory {
+ return client.WriteDirectory(dest, message.NewEntry)
+ }
+ if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name {
+ if filer.IsSameData(message.OldEntry, message.NewEntry) {
+ glog.V(2).Infof("update meta: %+v", resp)
+ return client.UpdateFileMetadata(dest, message.OldEntry, message.NewEntry)
+ }
+ }
+ glog.V(2).Infof("update: %+v", resp)
+ glog.V(0).Infof("delete %s", remote_storage.FormatLocation(oldDest))
+ if err := client.DeleteFile(oldDest); err != nil {
+ return err
+ }
+ reader := filer.NewFileReader(filerSource, message.NewEntry)
+ glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest))
+ remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader)
+ if writeErr != nil {
+ return writeErr
+ }
+ return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
+ }
+
+ return nil
+ }
+ return eachEntryFunc, nil
+}
+
+func collectLastSyncOffset(option *RemoteSyncOptions, mountedDir string) time.Time {
+ // 1. specified by timeAgo
+ // 2. last offset timestamp for this directory
+ // 3. directory creation time
+ var lastOffsetTs time.Time
+ if *option.timeAgo == 0 {
+ mountedDirEntry, err := filer_pb.GetEntry(option, util.FullPath(mountedDir))
+ if err != nil {
+ glog.V(0).Infof("get mounted directory %s: %v", mountedDir, err)
+ return time.Now()
+ }
+
+ lastOffsetTsNs, err := remote_storage.GetSyncOffset(option.grpcDialOption, *option.filerAddress, mountedDir)
+ if mountedDirEntry != nil {
+ if err == nil && mountedDirEntry.Attributes.Crtime < lastOffsetTsNs/1000000 {
+ lastOffsetTs = time.Unix(0, lastOffsetTsNs)
+ glog.V(0).Infof("resume from %v", lastOffsetTs)
+ } else {
+ lastOffsetTs = time.Unix(mountedDirEntry.Attributes.Crtime, 0)
+ }
+ } else {
+ lastOffsetTs = time.Now()
+ }
+ } else {
+ lastOffsetTs = time.Now().Add(-*option.timeAgo)
+ }
+ return lastOffsetTs
+}
+
+func toRemoteStorageLocation(mountDir, sourcePath util.FullPath, remoteMountLocation *remote_pb.RemoteStorageLocation) *remote_pb.RemoteStorageLocation {
+ source := string(sourcePath[len(mountDir):])
+ dest := util.FullPath(remoteMountLocation.Path).Child(source)
+ return &remote_pb.RemoteStorageLocation{
+ Name: remoteMountLocation.Name,
+ Bucket: remoteMountLocation.Bucket,
+ Path: string(dest),
+ }
+}
+
+func shouldSendToRemote(entry *filer_pb.Entry) bool {
+ if entry.RemoteEntry == nil {
+ return true
+ }
+ if entry.RemoteEntry.RemoteMtime < entry.Attributes.Mtime {
+ return true
+ }
+ return false
+}
+
+func updateLocalEntry(filerClient filer_pb.FilerClient, dir string, entry *filer_pb.Entry, remoteEntry *filer_pb.RemoteEntry) error {
+ remoteEntry.LastLocalSyncTsNs = time.Now().UnixNano()
+ entry.RemoteEntry = remoteEntry
+ return filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ _, err := client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{
+ Directory: dir,
+ Entry: entry,
+ })
+ return err
+ })
+}
diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go
index 5440811dd..33efdb2b7 100644
--- a/weed/command/filer_sync.go
+++ b/weed/command/filer_sync.go
@@ -171,7 +171,7 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so
})
return pb.FollowMetadata(sourceFiler, grpcDialOption, "syncTo_"+targetFiler,
- sourcePath, sourceFilerOffsetTsNs, targetFilerSignature, processEventFnWithOffset, false)
+ sourcePath, nil, sourceFilerOffsetTsNs, targetFilerSignature, processEventFnWithOffset, false)
}
diff --git a/weed/command/filer_sync_std.go b/weed/command/filer_sync_std.go
index 63851eaf8..1f9b6fa14 100644
--- a/weed/command/filer_sync_std.go
+++ b/weed/command/filer_sync_std.go
@@ -1,3 +1,4 @@
+//go:build !windows
// +build !windows
package command
diff --git a/weed/command/imports.go b/weed/command/imports.go
index ce0bf0e10..a2f59189f 100644
--- a/weed/command/imports.go
+++ b/weed/command/imports.go
@@ -3,6 +3,9 @@ package command
import (
_ "net/http/pprof"
+ _ "github.com/chrislusf/seaweedfs/weed/remote_storage/azure"
+ _ "github.com/chrislusf/seaweedfs/weed/remote_storage/gcs"
+ _ "github.com/chrislusf/seaweedfs/weed/remote_storage/hdfs"
_ "github.com/chrislusf/seaweedfs/weed/remote_storage/s3"
_ "github.com/chrislusf/seaweedfs/weed/replication/sink/azuresink"
@@ -27,4 +30,5 @@ import (
_ "github.com/chrislusf/seaweedfs/weed/filer/redis"
_ "github.com/chrislusf/seaweedfs/weed/filer/redis2"
_ "github.com/chrislusf/seaweedfs/weed/filer/sqlite"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/tikv"
)
diff --git a/weed/command/master.go b/weed/command/master.go
index 4eb43ee09..2605f6f4b 100644
--- a/weed/command/master.go
+++ b/weed/command/master.go
@@ -7,7 +7,6 @@ import (
"net/http"
"os"
"sort"
- "strconv"
"strings"
"time"
@@ -116,7 +115,7 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
r := mux.NewRouter()
ms := weed_server.NewMasterServer(r, masterOption.toMasterOption(masterWhiteList), peers)
- listeningAddress := *masterOption.ipBind + ":" + strconv.Itoa(*masterOption.port)
+ listeningAddress := util.JoinHostPort(*masterOption.ipBind, *masterOption.port)
glog.V(0).Infof("Start Seaweed Master %s at %s", util.Version(), listeningAddress)
masterListener, e := util.NewListener(listeningAddress, 0)
if e != nil {
@@ -132,7 +131,7 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
r.HandleFunc("/cluster/status", raftServer.StatusHandler).Methods("GET")
// starting grpc server
grpcPort := *masterOption.port + 10000
- grpcL, err := util.NewListener(*masterOption.ipBind+":"+strconv.Itoa(grpcPort), 0)
+ grpcL, err := util.NewListener(util.JoinHostPort(*masterOption.ipBind, grpcPort), 0)
if err != nil {
glog.Fatalf("master failed to listen on grpc port %d: %v", grpcPort, err)
}
@@ -163,7 +162,7 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
func checkPeers(masterIp string, masterPort int, peers string) (masterAddress string, cleanedPeers []string) {
glog.V(0).Infof("current: %s:%d peers:%s", masterIp, masterPort, peers)
- masterAddress = masterIp + ":" + strconv.Itoa(masterPort)
+ masterAddress = util.JoinHostPort(masterIp, masterPort)
if peers != "" {
cleanedPeers = strings.Split(peers, ",")
}
diff --git a/weed/command/master_follower.go b/weed/command/master_follower.go
index b628f7abf..48548a389 100644
--- a/weed/command/master_follower.go
+++ b/weed/command/master_follower.go
@@ -13,7 +13,6 @@ import (
"github.com/gorilla/mux"
"google.golang.org/grpc/reflection"
"net/http"
- "strconv"
"strings"
"time"
)
@@ -114,7 +113,7 @@ func startMasterFollower(masterOptions MasterOptions) {
r := mux.NewRouter()
ms := weed_server.NewMasterServer(r, option, masters)
- listeningAddress := *masterOptions.ipBind + ":" + strconv.Itoa(*masterOptions.port)
+ listeningAddress := util.JoinHostPort(*masterOptions.ipBind, *masterOptions.port)
glog.V(0).Infof("Start Seaweed Master %s at %s", util.Version(), listeningAddress)
masterListener, e := util.NewListener(listeningAddress, 0)
if e != nil {
@@ -123,7 +122,7 @@ func startMasterFollower(masterOptions MasterOptions) {
// starting grpc server
grpcPort := *masterOptions.port + 10000
- grpcL, err := util.NewListener(*masterOptions.ipBind+":"+strconv.Itoa(grpcPort), 0)
+ grpcL, err := util.NewListener(util.JoinHostPort(*masterOptions.ipBind, grpcPort), 0)
if err != nil {
glog.Fatalf("master failed to listen on grpc port %d: %v", grpcPort, err)
}
diff --git a/weed/command/mount_notsupported.go b/weed/command/mount_notsupported.go
index f3c0de3d6..1e5c9f53d 100644
--- a/weed/command/mount_notsupported.go
+++ b/weed/command/mount_notsupported.go
@@ -1,6 +1,5 @@
-// +build !linux
-// +build !darwin
-// +build !freebsd
+//go:build !linux && !darwin && !freebsd
+// +build !linux,!darwin,!freebsd
package command
diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go
index cdf340067..e393e5894 100644
--- a/weed/command/mount_std.go
+++ b/weed/command/mount_std.go
@@ -1,3 +1,4 @@
+//go:build linux || darwin || freebsd
// +build linux darwin freebsd
package command
diff --git a/weed/command/msg_broker.go b/weed/command/msg_broker.go
index db0b4148d..403bbe317 100644
--- a/weed/command/msg_broker.go
+++ b/weed/command/msg_broker.go
@@ -3,7 +3,6 @@ package command
import (
"context"
"fmt"
- "strconv"
"time"
"google.golang.org/grpc/reflection"
@@ -100,7 +99,7 @@ func (msgBrokerOpt *MessageBrokerOptions) startQueueServer() bool {
}, grpcDialOption)
// start grpc listener
- grpcL, err := util.NewListener(":"+strconv.Itoa(*msgBrokerOpt.port), 0)
+ grpcL, err := util.NewListener(util.JoinHostPort("", *msgBrokerOpt.port), 0)
if err != nil {
glog.Fatalf("failed to listen on grpc port %d: %v", *msgBrokerOpt.port, err)
}
diff --git a/weed/command/scaffold/filer.toml b/weed/command/scaffold/filer.toml
index 9e9258865..caf9d173d 100644
--- a/weed/command/scaffold/filer.toml
+++ b/weed/command/scaffold/filer.toml
@@ -44,7 +44,7 @@ dbFile = "./filer.db" # sqlite db file
# CREATE TABLE IF NOT EXISTS filemeta (
# dirhash BIGINT COMMENT 'first 64 bits of MD5 hash value of directory field',
# name VARCHAR(1000) BINARY COMMENT 'directory or file name',
-# directory TEXT COMMENT 'full path to parent directory',
+# directory TEXT BINARY COMMENT 'full path to parent directory',
# meta LONGBLOB,
# PRIMARY KEY (dirhash, name)
# ) DEFAULT CHARSET=utf8;
@@ -69,7 +69,7 @@ createTable = """
CREATE TABLE IF NOT EXISTS ` + "`%s`" + ` (
dirhash BIGINT,
name VARCHAR(1000) BINARY,
- directory TEXT,
+ directory TEXT BINARY,
meta LONGBLOB,
PRIMARY KEY (dirhash, name)
) DEFAULT CHARSET=utf8;
@@ -230,3 +230,11 @@ location = "/tmp/"
address = "localhost:6379"
password = ""
database = 1
+
+[tikv]
+enabled = false
+# If you have many pd address, use ',' split then:
+# pdaddrs = "pdhost1:2379, pdhost2:2379, pdhost3:2379"
+pdaddrs = "localhost:2379"
+# Concurrency for TiKV delete range
+deleterange_concurrency = 1
diff --git a/weed/command/server.go b/weed/command/server.go
index c784d90b9..b32d9d51e 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -194,7 +194,7 @@ func runServer(cmd *Command, args []string) bool {
filerOptions.disableHttp = serverDisableHttp
masterOptions.disableHttp = serverDisableHttp
- filerAddress := fmt.Sprintf("%s:%d", *serverIp, *filerOptions.port)
+ filerAddress := util.JoinHostPort(*serverIp, *filerOptions.port)
s3Options.filer = &filerAddress
webdavOptions.filer = &filerAddress
msgBrokerOptions.filer = &filerAddress
diff --git a/weed/command/volume.go b/weed/command/volume.go
index 235eff11b..3278107f5 100644
--- a/weed/command/volume.go
+++ b/weed/command/volume.go
@@ -194,7 +194,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
*v.publicPort = *v.port
}
if *v.publicUrl == "" {
- *v.publicUrl = *v.ip + ":" + strconv.Itoa(*v.publicPort)
+ *v.publicUrl = util.JoinHostPort(*v.ip, *v.publicPort)
}
volumeMux := http.NewServeMux()
@@ -308,7 +308,7 @@ func (v VolumeServerOptions) isSeparatedPublicPort() bool {
func (v VolumeServerOptions) startGrpcService(vs volume_server_pb.VolumeServerServer) *grpc.Server {
grpcPort := *v.port + 10000
- grpcL, err := util.NewListener(*v.bindIp+":"+strconv.Itoa(grpcPort), 0)
+ grpcL, err := util.NewListener(util.JoinHostPort(*v.bindIp, grpcPort), 0)
if err != nil {
glog.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err)
}
@@ -324,7 +324,7 @@ func (v VolumeServerOptions) startGrpcService(vs volume_server_pb.VolumeServerSe
}
func (v VolumeServerOptions) startPublicHttpService(handler http.Handler) httpdown.Server {
- publicListeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.publicPort)
+ publicListeningAddress := util.JoinHostPort(*v.bindIp, *v.publicPort)
glog.V(0).Infoln("Start Seaweed volume server", util.Version(), "public at", publicListeningAddress)
publicListener, e := util.NewListener(publicListeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second)
if e != nil {
@@ -351,7 +351,7 @@ func (v VolumeServerOptions) startClusterHttpService(handler http.Handler) httpd
keyFile = viper.GetString("https.volume.key")
}
- listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port)
+ listeningAddress := util.JoinHostPort(*v.bindIp, *v.port)
glog.V(0).Infof("Start Seaweed volume server %s at %s", util.Version(), listeningAddress)
listener, e := util.NewListener(listeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second)
if e != nil {
@@ -373,7 +373,7 @@ func (v VolumeServerOptions) startClusterHttpService(handler http.Handler) httpd
}
func (v VolumeServerOptions) startTcpService(volumeServer *weed_server.VolumeServer) {
- listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port+20000)
+ listeningAddress := util.JoinHostPort(*v.bindIp,*v.port+20000)
glog.V(0).Infoln("Start Seaweed volume server", util.Version(), "tcp at", listeningAddress)
listener, e := util.NewListener(listeningAddress, 0)
if e != nil {