aboutsummaryrefslogtreecommitdiff
path: root/weed/command
diff options
context:
space:
mode:
Diffstat (limited to 'weed/command')
-rw-r--r--weed/command/filer_backup.go8
-rw-r--r--weed/command/filer_meta_backup.go6
-rw-r--r--weed/command/filer_meta_tail.go5
-rw-r--r--weed/command/filer_remote_gateway.go2
-rw-r--r--weed/command/filer_remote_gateway_buckets.go2
-rw-r--r--weed/command/filer_remote_sync.go2
-rw-r--r--weed/command/filer_remote_sync_dir.go2
-rw-r--r--weed/command/filer_sync.go10
8 files changed, 24 insertions, 13 deletions
diff --git a/weed/command/filer_backup.go b/weed/command/filer_backup.go
index 9e5041531..0b902f96f 100644
--- a/weed/command/filer_backup.go
+++ b/weed/command/filer_backup.go
@@ -54,8 +54,10 @@ func runFilerBackup(cmd *Command, args []string) bool {
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
+ clientId := util.RandomInt32()
+
for {
- err := doFilerBackup(grpcDialOption, &filerBackupOptions)
+ err := doFilerBackup(grpcDialOption, &filerBackupOptions, clientId)
if err != nil {
glog.Errorf("backup from %s: %v", *filerBackupOptions.filer, err)
time.Sleep(1747 * time.Millisecond)
@@ -69,7 +71,7 @@ const (
BackupKeyPrefix = "backup."
)
-func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOptions) error {
+func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOptions, clientId int32) error {
// find data sink
config := util.GetViper()
@@ -112,7 +114,7 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti
return setOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId), lastTsNs)
})
- return pb.FollowMetadata(sourceFiler, grpcDialOption, "backup_"+dataSink.GetName(),
+ return pb.FollowMetadata(sourceFiler, grpcDialOption, "backup_"+dataSink.GetName(), clientId,
sourcePath, nil, startFrom.UnixNano(), 0, processEventFnWithOffset, false)
}
diff --git a/weed/command/filer_meta_backup.go b/weed/command/filer_meta_backup.go
index d52ed3349..56c7f7a8c 100644
--- a/weed/command/filer_meta_backup.go
+++ b/weed/command/filer_meta_backup.go
@@ -27,7 +27,8 @@ type FilerMetaBackupOptions struct {
restart *bool
backupFilerConfig *string
- store filer.FilerStore
+ store filer.FilerStore
+ clientId int32
}
func init() {
@@ -36,6 +37,7 @@ func init() {
metaBackup.filerDirectory = cmdFilerMetaBackup.Flag.String("filerDir", "/", "a folder on the filer")
metaBackup.restart = cmdFilerMetaBackup.Flag.Bool("restart", false, "copy the full metadata before async incremental backup")
metaBackup.backupFilerConfig = cmdFilerMetaBackup.Flag.String("config", "", "path to filer.toml specifying backup filer store")
+ metaBackup.clientId = util.RandomInt32()
}
var cmdFilerMetaBackup = &Command{
@@ -195,7 +197,7 @@ func (metaBackup *FilerMetaBackupOptions) streamMetadataBackup() error {
return metaBackup.setOffset(lastTime)
})
- return pb.FollowMetadata(pb.ServerAddress(*metaBackup.filerAddress), metaBackup.grpcDialOption, "meta_backup",
+ return pb.FollowMetadata(pb.ServerAddress(*metaBackup.filerAddress), metaBackup.grpcDialOption, "meta_backup", metaBackup.clientId,
*metaBackup.filerDirectory, nil, startTime.UnixNano(), 0, processEventFnWithOffset, false)
}
diff --git a/weed/command/filer_meta_tail.go b/weed/command/filer_meta_tail.go
index 26b155440..1158ef1e0 100644
--- a/weed/command/filer_meta_tail.go
+++ b/weed/command/filer_meta_tail.go
@@ -6,7 +6,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/golang/protobuf/jsonpb"
jsoniter "github.com/json-iterator/go"
- "github.com/olivere/elastic/v7"
+ elastic "github.com/olivere/elastic/v7"
"os"
"path/filepath"
"strings"
@@ -48,6 +48,7 @@ func runFilerMetaTail(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
+ clientId := util.RandomInt32()
var filterFunc func(dir, fname string) bool
if *tailPattern != "" {
@@ -105,7 +106,7 @@ func runFilerMetaTail(cmd *Command, args []string) bool {
}
}
- tailErr := pb.FollowMetadata(pb.ServerAddress(*tailFiler), grpcDialOption, "tail",
+ tailErr := pb.FollowMetadata(pb.ServerAddress(*tailFiler), grpcDialOption, "tail", clientId,
*tailTarget, nil, time.Now().Add(-*tailStart).UnixNano(), 0,
func(resp *filer_pb.SubscribeMetadataResponse) error {
if !shouldPrint(resp) {
diff --git a/weed/command/filer_remote_gateway.go b/weed/command/filer_remote_gateway.go
index fa0239558..33454f378 100644
--- a/weed/command/filer_remote_gateway.go
+++ b/weed/command/filer_remote_gateway.go
@@ -28,6 +28,7 @@ type RemoteGatewayOptions struct {
mappings *remote_pb.RemoteStorageMapping
remoteConfs map[string]*remote_pb.RemoteConf
bucketsDir string
+ clientId int32
}
var _ = filer_pb.FilerClient(&RemoteGatewayOptions{})
@@ -54,6 +55,7 @@ func init() {
remoteGatewayOptions.timeAgo = cmdFilerRemoteGateway.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
remoteGatewayOptions.include = cmdFilerRemoteGateway.Flag.String("include", "", "pattens of new bucket names, e.g., s3*")
remoteGatewayOptions.exclude = cmdFilerRemoteGateway.Flag.String("exclude", "", "pattens of new bucket names, e.g., local*")
+ remoteGatewayOptions.clientId = util.RandomInt32()
}
var cmdFilerRemoteGateway = &Command{
diff --git a/weed/command/filer_remote_gateway_buckets.go b/weed/command/filer_remote_gateway_buckets.go
index fc11cdbc5..4f65b5842 100644
--- a/weed/command/filer_remote_gateway_buckets.go
+++ b/weed/command/filer_remote_gateway_buckets.go
@@ -38,7 +38,7 @@ func (option *RemoteGatewayOptions) followBucketUpdatesAndUploadToRemote(filerSo
lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, *option.timeAgo)
- return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, "filer.remote.sync",
+ return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, "filer.remote.sync", option.clientId,
option.bucketsDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false)
}
diff --git a/weed/command/filer_remote_sync.go b/weed/command/filer_remote_sync.go
index 681ea35e9..d6ccf7b79 100644
--- a/weed/command/filer_remote_sync.go
+++ b/weed/command/filer_remote_sync.go
@@ -18,6 +18,7 @@ type RemoteSyncOptions struct {
readChunkFromFiler *bool
timeAgo *time.Duration
dir *string
+ clientId int32
}
var _ = filer_pb.FilerClient(&RemoteSyncOptions{})
@@ -41,6 +42,7 @@ func init() {
remoteSyncOptions.dir = cmdFilerRemoteSynchronize.Flag.String("dir", "", "a mounted directory on filer")
remoteSyncOptions.readChunkFromFiler = cmdFilerRemoteSynchronize.Flag.Bool("filerProxy", false, "read file chunks from filer instead of volume servers")
remoteSyncOptions.timeAgo = cmdFilerRemoteSynchronize.Flag.Duration("timeAgo", 0, "start time before now, skipping previous metadata changes. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
+ remoteSyncOptions.clientId = util.RandomInt32()
}
var cmdFilerRemoteSynchronize = &Command{
diff --git a/weed/command/filer_remote_sync_dir.go b/weed/command/filer_remote_sync_dir.go
index 947f526bb..ccedc9d80 100644
--- a/weed/command/filer_remote_sync_dir.go
+++ b/weed/command/filer_remote_sync_dir.go
@@ -40,7 +40,7 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour
lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), mountedDir, *option.timeAgo)
- return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, "filer.remote.sync",
+ return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, "filer.remote.sync", option.clientId,
mountedDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false)
}
diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go
index 230b24a52..326bd1fbe 100644
--- a/weed/command/filer_sync.go
+++ b/weed/command/filer_sync.go
@@ -37,6 +37,7 @@ type SyncOptions struct {
bDebug *bool
aProxyByFiler *bool
bProxyByFiler *bool
+ clientId int32
}
var (
@@ -66,6 +67,7 @@ func init() {
syncOptions.bDebug = cmdFilerSynchronize.Flag.Bool("b.debug", false, "debug mode to print out filer B received files")
syncCpuProfile = cmdFilerSynchronize.Flag.String("cpuprofile", "", "cpu profile output file")
syncMemProfile = cmdFilerSynchronize.Flag.String("memprofile", "", "memory profile output file")
+ syncOptions.clientId = util.RandomInt32()
}
var cmdFilerSynchronize = &Command{
@@ -97,7 +99,7 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
filerB := pb.ServerAddress(*syncOptions.filerB)
go func() {
for {
- err := doSubscribeFilerMetaChanges(grpcDialOption, filerA, *syncOptions.aPath, *syncOptions.aProxyByFiler, filerB,
+ err := doSubscribeFilerMetaChanges(syncOptions.clientId, grpcDialOption, filerA, *syncOptions.aPath, *syncOptions.aProxyByFiler, filerB,
*syncOptions.bPath, *syncOptions.bReplication, *syncOptions.bCollection, *syncOptions.bTtlSec, *syncOptions.bProxyByFiler, *syncOptions.bDiskType, *syncOptions.bDebug)
if err != nil {
glog.Errorf("sync from %s to %s: %v", *syncOptions.filerA, *syncOptions.filerB, err)
@@ -109,7 +111,7 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
if !*syncOptions.isActivePassive {
go func() {
for {
- err := doSubscribeFilerMetaChanges(grpcDialOption, filerB, *syncOptions.bPath, *syncOptions.bProxyByFiler, filerA,
+ err := doSubscribeFilerMetaChanges(syncOptions.clientId, grpcDialOption, filerB, *syncOptions.bPath, *syncOptions.bProxyByFiler, filerA,
*syncOptions.aPath, *syncOptions.aReplication, *syncOptions.aCollection, *syncOptions.aTtlSec, *syncOptions.aProxyByFiler, *syncOptions.aDiskType, *syncOptions.aDebug)
if err != nil {
glog.Errorf("sync from %s to %s: %v", *syncOptions.filerB, *syncOptions.filerA, err)
@@ -124,7 +126,7 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
return true
}
-func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string,
+func doSubscribeFilerMetaChanges(clientId int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string,
replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool) error {
// read source filer signature
@@ -172,7 +174,7 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler pb.
return setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, lastTsNs)
})
- return pb.FollowMetadata(sourceFiler, grpcDialOption, "syncTo_"+string(targetFiler),
+ return pb.FollowMetadata(sourceFiler, grpcDialOption, "syncTo_"+string(targetFiler), clientId,
sourcePath, nil, sourceFilerOffsetTsNs, targetFilerSignature, processEventFnWithOffset, false)
}