diff options
Diffstat (limited to 'weed/command')
40 files changed, 435 insertions, 384 deletions
diff --git a/weed/command/autocomplete.go b/weed/command/autocomplete.go index 955ce4006..f63c8df41 100644 --- a/weed/command/autocomplete.go +++ b/weed/command/autocomplete.go @@ -2,9 +2,9 @@ package command import ( "fmt" - flag "github.com/chrislusf/seaweedfs/weed/util/fla9" "github.com/posener/complete" completeinstall "github.com/posener/complete/cmd/install" + flag "github.com/seaweedfs/seaweedfs/weed/util/fla9" "runtime" ) diff --git a/weed/command/backup.go b/weed/command/backup.go index c43b0d351..3b14705d7 100644 --- a/weed/command/backup.go +++ b/weed/command/backup.go @@ -2,15 +2,15 @@ package command import ( "fmt" - "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/storage/needle" - "github.com/chrislusf/seaweedfs/weed/storage/super_block" - "github.com/chrislusf/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/security" + "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/storage/super_block" + "github.com/seaweedfs/seaweedfs/weed/util" - "github.com/chrislusf/seaweedfs/weed/operation" - "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/seaweedfs/seaweedfs/weed/operation" + "github.com/seaweedfs/seaweedfs/weed/storage" ) var ( diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go index 9f18cc5b9..41aadc6db 100644 --- a/weed/command/benchmark.go +++ b/weed/command/benchmark.go @@ -3,7 +3,7 @@ package command import ( "bufio" "fmt" - "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb" "io" "math" "math/rand" @@ -16,11 +16,11 @@ import ( "google.golang.org/grpc" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/operation" - "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/util" - "github.com/chrislusf/seaweedfs/weed/wdclient" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/operation" + "github.com/seaweedfs/seaweedfs/weed/security" + "github.com/seaweedfs/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/wdclient" ) type BenchmarkOptions struct { @@ -129,7 +129,7 @@ func runBenchmark(cmd *Command, args []string) bool { defer pprof.StopCPUProfile() } - b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "", "client", "", "", pb.ServerAddresses(*b.masters).ToAddressMap()) + b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "", "client", "", "", "", pb.ServerAddresses(*b.masters).ToAddressMap()) go b.masterClient.KeepConnectedToMaster() b.masterClient.WaitUntilConnected() diff --git a/weed/command/command.go b/weed/command/command.go index 7635405dc..9fdf057e7 100644 --- a/weed/command/command.go +++ b/weed/command/command.go @@ -5,7 +5,7 @@ import ( "os" "strings" - flag "github.com/chrislusf/seaweedfs/weed/util/fla9" + flag "github.com/seaweedfs/seaweedfs/weed/util/fla9" ) var Commands = []*Command{ @@ -28,12 +28,12 @@ var Commands = []*Command{ cmdFilerSynchronize, cmdFix, cmdFuse, + cmdIam, cmdMaster, cmdMasterFollower, cmdMount, + cmdMqBroker, cmdS3, - cmdIam, - cmdMsgBroker, cmdScaffold, cmdServer, cmdShell, diff --git a/weed/command/compact.go b/weed/command/compact.go index 6df28440a..6c390ea1f 100644 --- a/weed/command/compact.go +++ b/weed/command/compact.go @@ -1,10 +1,10 @@ package command import ( - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/storage" - "github.com/chrislusf/seaweedfs/weed/storage/needle" - "github.com/chrislusf/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/storage" + "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/util" ) func init() { diff --git a/weed/command/download.go b/weed/command/download.go index a3c05b53d..de33643fc 100644 --- a/weed/command/download.go +++ b/weed/command/download.go @@ -10,10 +10,10 @@ import ( "google.golang.org/grpc" - "github.com/chrislusf/seaweedfs/weed/operation" - "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/operation" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/security" + "github.com/seaweedfs/seaweedfs/weed/util" ) var ( diff --git a/weed/command/export.go b/weed/command/export.go index 1c32e1050..e09d57056 100644 --- a/weed/command/export.go +++ b/weed/command/export.go @@ -13,13 +13,13 @@ import ( "text/template" "time" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/storage" - "github.com/chrislusf/seaweedfs/weed/storage/needle" - "github.com/chrislusf/seaweedfs/weed/storage/needle_map" - "github.com/chrislusf/seaweedfs/weed/storage/super_block" - "github.com/chrislusf/seaweedfs/weed/storage/types" - "github.com/chrislusf/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/storage" + "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/storage/needle_map" + "github.com/seaweedfs/seaweedfs/weed/storage/super_block" + "github.com/seaweedfs/seaweedfs/weed/storage/types" + "github.com/seaweedfs/seaweedfs/weed/util" ) const ( diff --git a/weed/command/filer.go b/weed/command/filer.go index 7e0e92d4a..7418a189b 100644 --- a/weed/command/filer.go +++ b/weed/command/filer.go @@ -12,14 +12,14 @@ import ( "google.golang.org/grpc/reflection" - "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/security" - weed_server "github.com/chrislusf/seaweedfs/weed/server" - stats_collect "github.com/chrislusf/seaweedfs/weed/stats" - "github.com/chrislusf/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/security" + weed_server "github.com/seaweedfs/seaweedfs/weed/server" + stats_collect "github.com/seaweedfs/seaweedfs/weed/stats" + "github.com/seaweedfs/seaweedfs/weed/util" ) var ( diff --git a/weed/command/filer_backup.go b/weed/command/filer_backup.go index d191c693b..30b6c4962 100644 --- a/weed/command/filer_backup.go +++ b/weed/command/filer_backup.go @@ -2,12 +2,13 @@ package command import ( "fmt" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/replication/source" - "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/replication/source" + "github.com/seaweedfs/seaweedfs/weed/security" + "github.com/seaweedfs/seaweedfs/weed/util" "google.golang.org/grpc" + "strings" "time" ) @@ -15,6 +16,7 @@ type FilerBackupOptions struct { isActivePassive *bool filer *string path *string + excludePaths *string debug *bool proxyByFiler *bool timeAgo *time.Duration @@ -28,6 +30,7 @@ func init() { cmdFilerBackup.Run = runFilerBackup // break init cycle filerBackupOptions.filer = cmdFilerBackup.Flag.String("filer", "localhost:8888", "filer of one SeaweedFS cluster") filerBackupOptions.path = cmdFilerBackup.Flag.String("filerPath", "/", "directory to sync on filer") + filerBackupOptions.excludePaths = cmdFilerBackup.Flag.String("filerExcludePaths", "", "exclude directories to sync on filer") filerBackupOptions.proxyByFiler = cmdFilerBackup.Flag.Bool("filerProxy", false, "read and write file chunks by filer instead of volume servers") filerBackupOptions.debug = cmdFilerBackup.Flag.Bool("debug", false, "debug mode to print out received files") filerBackupOptions.timeAgo = cmdFilerBackup.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"") @@ -55,9 +58,11 @@ func runFilerBackup(cmd *Command, args []string) bool { grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") clientId := util.RandomInt32() + var clientEpoch int32 for { - err := doFilerBackup(grpcDialOption, &filerBackupOptions, clientId) + clientEpoch++ + err := doFilerBackup(grpcDialOption, &filerBackupOptions, clientId, clientEpoch) if err != nil { glog.Errorf("backup from %s: %v", *filerBackupOptions.filer, err) time.Sleep(1747 * time.Millisecond) @@ -71,7 +76,7 @@ const ( BackupKeyPrefix = "backup." ) -func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOptions, clientId int32) error { +func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOptions, clientId int32, clientEpoch int32) error { // find data sink config := util.GetViper() @@ -82,6 +87,7 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti sourceFiler := pb.ServerAddress(*backupOption.filer) sourcePath := *backupOption.path + excludePaths := strings.Split(*backupOption.excludePaths, ",") timeAgo := *backupOption.timeAgo targetPath := dataSink.GetSinkToDirectory() debug := *backupOption.debug @@ -104,16 +110,20 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti // create filer sink filerSource := &source.FilerSource{} - filerSource.DoInitialize(sourceFiler.ToHttpAddress(), sourceFiler.ToGrpcAddress(), sourcePath, *backupOption.proxyByFiler) + filerSource.DoInitialize( + sourceFiler.ToHttpAddress(), + sourceFiler.ToGrpcAddress(), + sourcePath, + *backupOption.proxyByFiler) dataSink.SetSourceFiler(filerSource) - processEventFn := genProcessFunction(sourcePath, targetPath, dataSink, debug) + processEventFn := genProcessFunction(sourcePath, targetPath, excludePaths, dataSink, debug) processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3*time.Second, func(counter int64, lastTsNs int64) error { glog.V(0).Infof("backup %s progressed to %v %0.2f/sec", sourceFiler, time.Unix(0, lastTsNs), float64(counter)/float64(3)) return setOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId), lastTsNs) }) - return pb.FollowMetadata(sourceFiler, grpcDialOption, "backup_"+dataSink.GetName(), clientId, sourcePath, nil, startFrom.UnixNano(), 0, 0, processEventFnWithOffset, pb.TrivialOnError) + return pb.FollowMetadata(sourceFiler, grpcDialOption, "backup_"+dataSink.GetName(), clientId, clientEpoch, sourcePath, nil, startFrom.UnixNano(), 0, 0, processEventFnWithOffset, pb.TrivialOnError) } diff --git a/weed/command/filer_cat.go b/weed/command/filer_cat.go index ada843dea..c5ae7672e 100644 --- a/weed/command/filer_cat.go +++ b/weed/command/filer_cat.go @@ -3,17 +3,17 @@ package command import ( "context" "fmt" - "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/wdclient" + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/wdclient" "google.golang.org/grpc" "net/url" "os" "strings" - "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/security" + "github.com/seaweedfs/seaweedfs/weed/util" ) var ( diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index f20ae99bf..042952f7d 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -14,15 +14,15 @@ import ( "google.golang.org/grpc" - "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/operation" - "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/storage/needle" - "github.com/chrislusf/seaweedfs/weed/util" - "github.com/chrislusf/seaweedfs/weed/util/grace" - "github.com/chrislusf/seaweedfs/weed/wdclient" + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/operation" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/security" + "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/util/grace" + "github.com/seaweedfs/seaweedfs/weed/wdclient" ) var ( diff --git a/weed/command/filer_meta_backup.go b/weed/command/filer_meta_backup.go index cf679885d..05ad5ec2f 100644 --- a/weed/command/filer_meta_backup.go +++ b/weed/command/filer_meta_backup.go @@ -3,17 +3,17 @@ package command import ( "context" "fmt" - "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/spf13/viper" "google.golang.org/grpc" "reflect" "time" - "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/security" + "github.com/seaweedfs/seaweedfs/weed/util" ) var ( @@ -27,8 +27,9 @@ type FilerMetaBackupOptions struct { restart *bool backupFilerConfig *string - store filer.FilerStore - clientId int32 + store filer.FilerStore + clientId int32 + clientEpoch int32 } func init() { @@ -194,7 +195,8 @@ func (metaBackup *FilerMetaBackupOptions) streamMetadataBackup() error { return metaBackup.setOffset(lastTime) }) - return pb.FollowMetadata(pb.ServerAddress(*metaBackup.filerAddress), metaBackup.grpcDialOption, "meta_backup", metaBackup.clientId, + metaBackup.clientEpoch++ + return pb.FollowMetadata(pb.ServerAddress(*metaBackup.filerAddress), metaBackup.grpcDialOption, "meta_backup", metaBackup.clientId, metaBackup.clientEpoch, *metaBackup.filerDirectory, nil, startTime.UnixNano(), 0, 0, processEventFnWithOffset, pb.TrivialOnError) } diff --git a/weed/command/filer_meta_tail.go b/weed/command/filer_meta_tail.go index 66a87c3d9..adacf4978 100644 --- a/weed/command/filer_meta_tail.go +++ b/weed/command/filer_meta_tail.go @@ -2,16 +2,16 @@ package command import ( "fmt" - "github.com/chrislusf/seaweedfs/weed/pb" "github.com/golang/protobuf/jsonpb" + "github.com/seaweedfs/seaweedfs/weed/pb" "os" "path/filepath" "strings" "time" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/security" + "github.com/seaweedfs/seaweedfs/weed/util" ) func init() { @@ -110,7 +110,7 @@ func runFilerMetaTail(cmd *Command, args []string) bool { untilTsNs = time.Now().Add(-*tailStop).UnixNano() } - tailErr := pb.FollowMetadata(pb.ServerAddress(*tailFiler), grpcDialOption, "tail", clientId, *tailTarget, nil, + tailErr := pb.FollowMetadata(pb.ServerAddress(*tailFiler), grpcDialOption, "tail", clientId, 0, *tailTarget, nil, time.Now().Add(-*tailStart).UnixNano(), untilTsNs, 0, func(resp *filer_pb.SubscribeMetadataResponse) error { if !shouldPrint(resp) { return nil diff --git a/weed/command/filer_meta_tail_elastic.go b/weed/command/filer_meta_tail_elastic.go index 5776c4f97..a72f88902 100644 --- a/weed/command/filer_meta_tail_elastic.go +++ b/weed/command/filer_meta_tail_elastic.go @@ -5,10 +5,10 @@ package command import ( "context" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" jsoniter "github.com/json-iterator/go" elastic "github.com/olivere/elastic/v7" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/util" "strings" ) diff --git a/weed/command/filer_meta_tail_non_elastic.go b/weed/command/filer_meta_tail_non_elastic.go index f78f3ee09..989e32bec 100644 --- a/weed/command/filer_meta_tail_non_elastic.go +++ b/weed/command/filer_meta_tail_non_elastic.go @@ -4,7 +4,7 @@ package command import ( - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" ) func sendToElasticSearchFunc(servers string, esIndex string) (func(resp *filer_pb.SubscribeMetadataResponse) error, error) { diff --git a/weed/command/filer_remote_gateway.go b/weed/command/filer_remote_gateway.go index 33454f378..8d9421d8e 100644 --- a/weed/command/filer_remote_gateway.go +++ b/weed/command/filer_remote_gateway.go @@ -3,13 +3,13 @@ package command import ( "context" "fmt" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/pb/remote_pb" - "github.com/chrislusf/seaweedfs/weed/replication/source" - "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/remote_pb" + "github.com/seaweedfs/seaweedfs/weed/replication/source" + "github.com/seaweedfs/seaweedfs/weed/security" + "github.com/seaweedfs/seaweedfs/weed/util" "google.golang.org/grpc" "os" "time" @@ -29,6 +29,7 @@ type RemoteGatewayOptions struct { remoteConfs map[string]*remote_pb.RemoteConf bucketsDir string clientId int32 + clientEpoch int32 } var _ = filer_pb.FilerClient(&RemoteGatewayOptions{}) diff --git a/weed/command/filer_remote_gateway_buckets.go b/weed/command/filer_remote_gateway_buckets.go index 9fe0e29df..bdba71085 100644 --- a/weed/command/filer_remote_gateway_buckets.go +++ b/weed/command/filer_remote_gateway_buckets.go @@ -2,16 +2,16 @@ package command import ( "fmt" - "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/pb/remote_pb" - "github.com/chrislusf/seaweedfs/weed/remote_storage" - "github.com/chrislusf/seaweedfs/weed/replication/source" - "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants" - "github.com/chrislusf/seaweedfs/weed/util" "github.com/golang/protobuf/proto" + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/remote_pb" + "github.com/seaweedfs/seaweedfs/weed/remote_storage" + "github.com/seaweedfs/seaweedfs/weed/replication/source" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" + "github.com/seaweedfs/seaweedfs/weed/util" "math" "math/rand" "path/filepath" @@ -39,7 +39,8 @@ func (option *RemoteGatewayOptions) followBucketUpdatesAndUploadToRemote(filerSo lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, *option.timeAgo) - return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, "filer.remote.sync", option.clientId, + option.clientEpoch++ + return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, "filer.remote.sync", option.clientId, option.clientEpoch, option.bucketsDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, 0, processEventFnWithOffset, pb.TrivialOnError) } diff --git a/weed/command/filer_remote_sync.go b/weed/command/filer_remote_sync.go index d6ccf7b79..e7f6f971b 100644 --- a/weed/command/filer_remote_sync.go +++ b/weed/command/filer_remote_sync.go @@ -2,12 +2,12 @@ package command import ( "fmt" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/replication/source" - "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/replication/source" + "github.com/seaweedfs/seaweedfs/weed/security" + "github.com/seaweedfs/seaweedfs/weed/util" "google.golang.org/grpc" "time" ) @@ -19,6 +19,7 @@ type RemoteSyncOptions struct { timeAgo *time.Duration dir *string clientId int32 + clientEpoch int32 } var _ = filer_pb.FilerClient(&RemoteSyncOptions{}) diff --git a/weed/command/filer_remote_sync_dir.go b/weed/command/filer_remote_sync_dir.go index 5fc20be9a..14aed2465 100644 --- a/weed/command/filer_remote_sync_dir.go +++ b/weed/command/filer_remote_sync_dir.go @@ -7,15 +7,15 @@ import ( "strings" "time" - "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/pb/remote_pb" - "github.com/chrislusf/seaweedfs/weed/remote_storage" - "github.com/chrislusf/seaweedfs/weed/replication/source" - "github.com/chrislusf/seaweedfs/weed/util" "github.com/golang/protobuf/proto" + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/remote_pb" + "github.com/seaweedfs/seaweedfs/weed/remote_storage" + "github.com/seaweedfs/seaweedfs/weed/replication/source" + "github.com/seaweedfs/seaweedfs/weed/util" "google.golang.org/grpc" ) @@ -40,7 +40,8 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour lastOffsetTs := collectLastSyncOffset(option, option.grpcDialOption, pb.ServerAddress(*option.filerAddress), mountedDir, *option.timeAgo) - return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, "filer.remote.sync", option.clientId, + option.clientEpoch++ + return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, "filer.remote.sync", option.clientId, option.clientEpoch, mountedDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, 0, processEventFnWithOffset, pb.TrivialOnError) } diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go index bf0a3e140..ec965a5e3 100644 --- a/weed/command/filer_replication.go +++ b/weed/command/filer_replication.go @@ -4,11 +4,11 @@ import ( "context" "strings" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/replication" - "github.com/chrislusf/seaweedfs/weed/replication/sink" - "github.com/chrislusf/seaweedfs/weed/replication/sub" - "github.com/chrislusf/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/replication" + "github.com/seaweedfs/seaweedfs/weed/replication/sink" + "github.com/seaweedfs/seaweedfs/weed/replication/sub" + "github.com/seaweedfs/seaweedfs/weed/util" ) func init() { diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go index 1550d155a..fe1606d8c 100644 --- a/weed/command/filer_sync.go +++ b/weed/command/filer_sync.go @@ -4,17 +4,17 @@ import ( "context" "errors" "fmt" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/replication" - "github.com/chrislusf/seaweedfs/weed/replication/sink" - "github.com/chrislusf/seaweedfs/weed/replication/sink/filersink" - "github.com/chrislusf/seaweedfs/weed/replication/source" - "github.com/chrislusf/seaweedfs/weed/security" - statsCollect "github.com/chrislusf/seaweedfs/weed/stats" - "github.com/chrislusf/seaweedfs/weed/util" - "github.com/chrislusf/seaweedfs/weed/util/grace" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/replication" + "github.com/seaweedfs/seaweedfs/weed/replication/sink" + "github.com/seaweedfs/seaweedfs/weed/replication/sink/filersink" + "github.com/seaweedfs/seaweedfs/weed/replication/source" + "github.com/seaweedfs/seaweedfs/weed/security" + statsCollect "github.com/seaweedfs/seaweedfs/weed/stats" + "github.com/seaweedfs/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/util/grace" "google.golang.org/grpc" "os" "strings" @@ -26,7 +26,9 @@ type SyncOptions struct { filerA *string filerB *string aPath *string + aExcludePaths *string bPath *string + bExcludePaths *string aReplication *string bReplication *string aCollection *string @@ -43,6 +45,7 @@ type SyncOptions struct { bProxyByFiler *bool metricsHttpPort *int clientId int32 + clientEpoch int32 } var ( @@ -57,7 +60,9 @@ func init() { syncOptions.filerA = cmdFilerSynchronize.Flag.String("a", "", "filer A in one SeaweedFS cluster") syncOptions.filerB = cmdFilerSynchronize.Flag.String("b", "", "filer B in the other SeaweedFS cluster") syncOptions.aPath = cmdFilerSynchronize.Flag.String("a.path", "/", "directory to sync on filer A") + syncOptions.aExcludePaths = cmdFilerSynchronize.Flag.String("a.excludePaths", "", "exclude directories to sync on filer A") syncOptions.bPath = cmdFilerSynchronize.Flag.String("b.path", "/", "directory to sync on filer B") + syncOptions.bExcludePaths = cmdFilerSynchronize.Flag.String("b.excludePaths", "", "exclude directories to sync on filer B") syncOptions.aReplication = cmdFilerSynchronize.Flag.String("a.replication", "", "replication on filer A") syncOptions.bReplication = cmdFilerSynchronize.Flag.String("b.replication", "", "replication on filer B") syncOptions.aCollection = cmdFilerSynchronize.Flag.String("a.collection", "", "collection on filer A") @@ -131,9 +136,25 @@ func runFilerSynchronize(cmd *Command, args []string) bool { os.Exit(2) } for { - err := doSubscribeFilerMetaChanges(syncOptions.clientId, grpcDialOption, filerA, *syncOptions.aPath, *syncOptions.aProxyByFiler, filerB, - *syncOptions.bPath, *syncOptions.bReplication, *syncOptions.bCollection, *syncOptions.bTtlSec, *syncOptions.bProxyByFiler, *syncOptions.bDiskType, - *syncOptions.bDebug, aFilerSignature, bFilerSignature) + syncOptions.clientEpoch++ + err := doSubscribeFilerMetaChanges( + syncOptions.clientId, + syncOptions.clientEpoch, + grpcDialOption, + filerA, + *syncOptions.aPath, + strings.Split(*syncOptions.aExcludePaths, ","), + *syncOptions.aProxyByFiler, + filerB, + *syncOptions.bPath, + *syncOptions.bReplication, + *syncOptions.bCollection, + *syncOptions.bTtlSec, + *syncOptions.bProxyByFiler, + *syncOptions.bDiskType, + *syncOptions.bDebug, + aFilerSignature, + bFilerSignature) if err != nil { glog.Errorf("sync from %s to %s: %v", *syncOptions.filerA, *syncOptions.filerB, err) time.Sleep(1747 * time.Millisecond) @@ -151,9 +172,25 @@ func runFilerSynchronize(cmd *Command, args []string) bool { } go func() { for { - err := doSubscribeFilerMetaChanges(syncOptions.clientId, grpcDialOption, filerB, *syncOptions.bPath, *syncOptions.bProxyByFiler, filerA, - *syncOptions.aPath, *syncOptions.aReplication, *syncOptions.aCollection, *syncOptions.aTtlSec, *syncOptions.aProxyByFiler, *syncOptions.aDiskType, - *syncOptions.aDebug, bFilerSignature, aFilerSignature) + syncOptions.clientEpoch++ + err := doSubscribeFilerMetaChanges( + syncOptions.clientId, + syncOptions.clientEpoch, + grpcDialOption, + filerB, + *syncOptions.bPath, + strings.Split(*syncOptions.bExcludePaths, ","), + *syncOptions.bProxyByFiler, + filerA, + *syncOptions.aPath, + *syncOptions.aReplication, + *syncOptions.aCollection, + *syncOptions.aTtlSec, + *syncOptions.aProxyByFiler, + *syncOptions.aDiskType, + *syncOptions.aDebug, + bFilerSignature, + aFilerSignature) if err != nil { glog.Errorf("sync from %s to %s: %v", *syncOptions.filerB, *syncOptions.filerA, err) time.Sleep(2147 * time.Millisecond) @@ -183,7 +220,7 @@ func initOffsetFromTsMs(grpcDialOption grpc.DialOption, targetFiler pb.ServerAdd return nil } -func doSubscribeFilerMetaChanges(clientId int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string, +func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceExcludePaths []string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string, replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, sourceFilerSignature int32, targetFilerSignature int32) error { // if first time, start from now @@ -202,7 +239,7 @@ func doSubscribeFilerMetaChanges(clientId int32, grpcDialOption grpc.DialOption, filerSink.DoInitialize(targetFiler.ToHttpAddress(), targetFiler.ToGrpcAddress(), targetPath, replicationStr, collection, ttlSec, diskType, grpcDialOption, sinkWriteChunkByFiler) filerSink.SetSourceFiler(filerSource) - persistEventFn := genProcessFunction(sourcePath, targetPath, filerSink, debug) + persistEventFn := genProcessFunction(sourcePath, targetPath, sourceExcludePaths, filerSink, debug) processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error { message := resp.EventNotification @@ -226,7 +263,7 @@ func doSubscribeFilerMetaChanges(clientId int32, grpcDialOption grpc.DialOption, return setOffset(grpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature, lastTsNs) }) - return pb.FollowMetadata(sourceFiler, grpcDialOption, clientName, clientId, + return pb.FollowMetadata(sourceFiler, grpcDialOption, clientName, clientId, clientEpoch, sourcePath, nil, sourceFilerOffsetTsNs, 0, targetFilerSignature, processEventFnWithOffset, pb.RetryForeverOnError) } @@ -299,7 +336,7 @@ func setOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signature } -func genProcessFunction(sourcePath string, targetPath string, dataSink sink.ReplicationSink, debug bool) func(resp *filer_pb.SubscribeMetadataResponse) error { +func genProcessFunction(sourcePath string, targetPath string, excludePaths []string, dataSink sink.ReplicationSink, debug bool) func(resp *filer_pb.SubscribeMetadataResponse) error { // process function processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error { message := resp.EventNotification @@ -319,7 +356,11 @@ func genProcessFunction(sourcePath string, targetPath string, dataSink sink.Repl if !strings.HasPrefix(resp.Directory, sourcePath) { return nil } - + for _, excludePath := range excludePaths { + if strings.HasPrefix(resp.Directory, excludePath) { + return nil + } + } // handle deletions if filer_pb.IsDelete(resp) { if !strings.HasPrefix(string(sourceOldKey), sourcePath) { diff --git a/weed/command/fix.go b/weed/command/fix.go index d19496a79..8ab879d3b 100644 --- a/weed/command/fix.go +++ b/weed/command/fix.go @@ -8,13 +8,13 @@ import ( "strconv" "strings" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/storage" - "github.com/chrislusf/seaweedfs/weed/storage/needle" - "github.com/chrislusf/seaweedfs/weed/storage/needle_map" - "github.com/chrislusf/seaweedfs/weed/storage/super_block" - "github.com/chrislusf/seaweedfs/weed/storage/types" - "github.com/chrislusf/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/storage" + "github.com/seaweedfs/seaweedfs/weed/storage/needle" + "github.com/seaweedfs/seaweedfs/weed/storage/needle_map" + "github.com/seaweedfs/seaweedfs/weed/storage/super_block" + "github.com/seaweedfs/seaweedfs/weed/storage/types" + "github.com/seaweedfs/seaweedfs/weed/util" ) func init() { diff --git a/weed/command/iam.go b/weed/command/iam.go index 968d23095..43234aa70 100644 --- a/weed/command/iam.go +++ b/weed/command/iam.go @@ -5,13 +5,13 @@ import ( "fmt" "net/http" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/iamapi" - "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/util" "github.com/gorilla/mux" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/iamapi" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/security" + "github.com/seaweedfs/seaweedfs/weed/util" "time" ) diff --git a/weed/command/imports.go b/weed/command/imports.go index afdbc5a10..bcc9e173b 100644 --- a/weed/command/imports.go +++ b/weed/command/imports.go @@ -3,34 +3,34 @@ package command import ( _ "net/http/pprof" - _ "github.com/chrislusf/seaweedfs/weed/remote_storage/azure" - _ "github.com/chrislusf/seaweedfs/weed/remote_storage/gcs" - _ "github.com/chrislusf/seaweedfs/weed/remote_storage/s3" + _ "github.com/seaweedfs/seaweedfs/weed/remote_storage/azure" + _ "github.com/seaweedfs/seaweedfs/weed/remote_storage/gcs" + _ "github.com/seaweedfs/seaweedfs/weed/remote_storage/s3" - _ "github.com/chrislusf/seaweedfs/weed/replication/sink/azuresink" - _ "github.com/chrislusf/seaweedfs/weed/replication/sink/b2sink" - _ "github.com/chrislusf/seaweedfs/weed/replication/sink/filersink" - _ "github.com/chrislusf/seaweedfs/weed/replication/sink/gcssink" - _ "github.com/chrislusf/seaweedfs/weed/replication/sink/localsink" - _ "github.com/chrislusf/seaweedfs/weed/replication/sink/s3sink" + _ "github.com/seaweedfs/seaweedfs/weed/replication/sink/azuresink" + _ "github.com/seaweedfs/seaweedfs/weed/replication/sink/b2sink" + _ "github.com/seaweedfs/seaweedfs/weed/replication/sink/filersink" + _ "github.com/seaweedfs/seaweedfs/weed/replication/sink/gcssink" + _ "github.com/seaweedfs/seaweedfs/weed/replication/sink/localsink" + _ "github.com/seaweedfs/seaweedfs/weed/replication/sink/s3sink" - _ "github.com/chrislusf/seaweedfs/weed/filer/arangodb" - _ "github.com/chrislusf/seaweedfs/weed/filer/cassandra" - _ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7" - _ "github.com/chrislusf/seaweedfs/weed/filer/etcd" - _ "github.com/chrislusf/seaweedfs/weed/filer/hbase" - _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb" - _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb2" - _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb3" - _ "github.com/chrislusf/seaweedfs/weed/filer/mongodb" - _ "github.com/chrislusf/seaweedfs/weed/filer/mysql" - _ "github.com/chrislusf/seaweedfs/weed/filer/mysql2" - _ "github.com/chrislusf/seaweedfs/weed/filer/postgres" - _ "github.com/chrislusf/seaweedfs/weed/filer/postgres2" - _ "github.com/chrislusf/seaweedfs/weed/filer/redis" - _ "github.com/chrislusf/seaweedfs/weed/filer/redis2" - _ "github.com/chrislusf/seaweedfs/weed/filer/redis3" - _ "github.com/chrislusf/seaweedfs/weed/filer/sqlite" - _ "github.com/chrislusf/seaweedfs/weed/filer/tikv" - _ "github.com/chrislusf/seaweedfs/weed/filer/ydb" + _ "github.com/seaweedfs/seaweedfs/weed/filer/arangodb" + _ "github.com/seaweedfs/seaweedfs/weed/filer/cassandra" + _ "github.com/seaweedfs/seaweedfs/weed/filer/elastic/v7" + _ "github.com/seaweedfs/seaweedfs/weed/filer/etcd" + _ "github.com/seaweedfs/seaweedfs/weed/filer/hbase" + _ "github.com/seaweedfs/seaweedfs/weed/filer/leveldb" + _ "github.com/seaweedfs/seaweedfs/weed/filer/leveldb2" + _ "github.com/seaweedfs/seaweedfs/weed/filer/leveldb3" + _ "github.com/seaweedfs/seaweedfs/weed/filer/mongodb" + _ "github.com/seaweedfs/seaweedfs/weed/filer/mysql" + _ "github.com/seaweedfs/seaweedfs/weed/filer/mysql2" + _ "github.com/seaweedfs/seaweedfs/weed/filer/postgres" + _ "github.com/seaweedfs/seaweedfs/weed/filer/postgres2" + _ "github.com/seaweedfs/seaweedfs/weed/filer/redis" + _ "github.com/seaweedfs/seaweedfs/weed/filer/redis2" + _ "github.com/seaweedfs/seaweedfs/weed/filer/redis3" + _ "github.com/seaweedfs/seaweedfs/weed/filer/sqlite" + _ "github.com/seaweedfs/seaweedfs/weed/filer/tikv" + _ "github.com/seaweedfs/seaweedfs/weed/filer/ydb" ) diff --git a/weed/command/master.go b/weed/command/master.go index ab8466d47..bbae29cc2 100644 --- a/weed/command/master.go +++ b/weed/command/master.go @@ -2,28 +2,30 @@ package command import ( "fmt" - "golang.org/x/exp/slices" "net/http" "os" "path" "strings" "time" - "github.com/chrislusf/raft/protobuf" - stats_collect "github.com/chrislusf/seaweedfs/weed/stats" + "golang.org/x/exp/slices" + "github.com/gorilla/mux" + "github.com/seaweedfs/raft/protobuf" "github.com/spf13/viper" "google.golang.org/grpc/reflection" - "github.com/chrislusf/seaweedfs/weed/util/grace" + stats_collect "github.com/seaweedfs/seaweedfs/weed/stats" + + "github.com/seaweedfs/seaweedfs/weed/util/grace" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/pb/master_pb" - "github.com/chrislusf/seaweedfs/weed/security" - weed_server "github.com/chrislusf/seaweedfs/weed/server" - "github.com/chrislusf/seaweedfs/weed/storage/backend" - "github.com/chrislusf/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "github.com/seaweedfs/seaweedfs/weed/security" + weed_server "github.com/seaweedfs/seaweedfs/weed/server" + "github.com/seaweedfs/seaweedfs/weed/storage/backend" + "github.com/seaweedfs/seaweedfs/weed/util" ) var ( @@ -174,11 +176,12 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) { } else { raftServer, err = weed_server.NewRaftServer(raftServerOption) if raftServer == nil { - glog.Fatalf("please verify %s is writable, see https://github.com/chrislusf/seaweedfs/issues/717: %s", *masterOption.metaFolder, err) + glog.Fatalf("please verify %s is writable, see https://github.com/seaweedfs/seaweedfs/issues/717: %s", *masterOption.metaFolder, err) } } ms.SetRaftServer(raftServer) r.HandleFunc("/cluster/status", raftServer.StatusHandler).Methods("GET") + r.HandleFunc("/cluster/healthz", raftServer.HealthzHandler).Methods("GET", "HEAD") if *m.raftHashicorp { r.HandleFunc("/raft/stats", raftServer.StatsRaftHandler).Methods("GET") } diff --git a/weed/command/master_follower.go b/weed/command/master_follower.go index ec7d2758f..5e5244c05 100644 --- a/weed/command/master_follower.go +++ b/weed/command/master_follower.go @@ -7,13 +7,13 @@ import ( "time" "github.com/aws/aws-sdk-go/aws" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/pb/master_pb" - "github.com/chrislusf/seaweedfs/weed/security" - weed_server "github.com/chrislusf/seaweedfs/weed/server" - "github.com/chrislusf/seaweedfs/weed/util" "github.com/gorilla/mux" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "github.com/seaweedfs/seaweedfs/weed/security" + weed_server "github.com/seaweedfs/seaweedfs/weed/server" + "github.com/seaweedfs/seaweedfs/weed/util" "google.golang.org/grpc/reflection" ) diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index 1aff3c5bb..db5899109 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -6,16 +6,16 @@ package command import ( "context" "fmt" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/mount" - "github.com/chrislusf/seaweedfs/weed/mount/meta_cache" - "github.com/chrislusf/seaweedfs/weed/mount/unmount" - "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/pb/mount_pb" - "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/storage/types" "github.com/hanwen/go-fuse/v2/fuse" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/mount" + "github.com/seaweedfs/seaweedfs/weed/mount/meta_cache" + "github.com/seaweedfs/seaweedfs/weed/mount/unmount" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/mount_pb" + "github.com/seaweedfs/seaweedfs/weed/security" + "github.com/seaweedfs/seaweedfs/weed/storage/types" "google.golang.org/grpc/reflection" "net" "net/http" @@ -26,8 +26,8 @@ import ( "strings" "time" - "github.com/chrislusf/seaweedfs/weed/util" - "github.com/chrislusf/seaweedfs/weed/util/grace" + "github.com/seaweedfs/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/util/grace" ) func runMount(cmd *Command, args []string) bool { diff --git a/weed/command/mq_broker.go b/weed/command/mq_broker.go new file mode 100644 index 000000000..aa327cb15 --- /dev/null +++ b/weed/command/mq_broker.go @@ -0,0 +1,94 @@ +package command + +import ( + "google.golang.org/grpc/reflection" + + "github.com/seaweedfs/seaweedfs/weed/util/grace" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/mq/broker" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "github.com/seaweedfs/seaweedfs/weed/security" + "github.com/seaweedfs/seaweedfs/weed/util" +) + +var ( + mqBrokerStandaloneOptions MessageQueueBrokerOptions +) + +type MessageQueueBrokerOptions struct { + masters map[string]pb.ServerAddress + mastersString *string + filerGroup *string + ip *string + port *int + dataCenter *string + rack *string + cpuprofile *string + memprofile *string +} + +func init() { + cmdMqBroker.Run = runMqBroker // break init cycle + mqBrokerStandaloneOptions.mastersString = cmdMqBroker.Flag.String("master", "localhost:9333", "comma-separated master servers") + mqBrokerStandaloneOptions.filerGroup = cmdMqBroker.Flag.String("filerGroup", "", "share metadata with other filers in the same filerGroup") + mqBrokerStandaloneOptions.ip = cmdMqBroker.Flag.String("ip", util.DetectedHostAddress(), "broker host address") + mqBrokerStandaloneOptions.port = cmdMqBroker.Flag.Int("port", 17777, "broker gRPC listen port") + mqBrokerStandaloneOptions.dataCenter = cmdMqBroker.Flag.String("dataCenter", "", "prefer to read and write to volumes in this data center") + mqBrokerStandaloneOptions.rack = cmdMqBroker.Flag.String("rack", "", "prefer to write to volumes in this rack") + mqBrokerStandaloneOptions.cpuprofile = cmdMqBroker.Flag.String("cpuprofile", "", "cpu profile output file") + mqBrokerStandaloneOptions.memprofile = cmdMqBroker.Flag.String("memprofile", "", "memory profile output file") +} + +var cmdMqBroker = &Command{ + UsageLine: "mq.broker [-port=17777] [-master=<ip:port>]", + Short: "<WIP> start a message queue broker", + Long: `start a message queue broker + + The broker can accept gRPC calls to write or read messages. The messages are stored via filer. + The brokers are stateless. To scale up, just add more brokers. + +`, +} + +func runMqBroker(cmd *Command, args []string) bool { + + util.LoadConfiguration("security", false) + + mqBrokerStandaloneOptions.masters = pb.ServerAddresses(*mqBrokerStandaloneOptions.mastersString).ToAddressMap() + + return mqBrokerStandaloneOptions.startQueueServer() + +} + +func (mqBrokerOpt *MessageQueueBrokerOptions) startQueueServer() bool { + + grace.SetupProfiling(*mqBrokerStandaloneOptions.cpuprofile, *mqBrokerStandaloneOptions.memprofile) + + grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.msg_broker") + + qs, err := broker.NewMessageBroker(&broker.MessageQueueBrokerOption{ + Masters: mqBrokerOpt.masters, + FilerGroup: *mqBrokerOpt.filerGroup, + DataCenter: *mqBrokerOpt.dataCenter, + Rack: *mqBrokerOpt.rack, + DefaultReplication: "", + MaxMB: 0, + Ip: *mqBrokerOpt.ip, + Port: *mqBrokerOpt.port, + }, grpcDialOption) + + // start grpc listener + grpcL, _, err := util.NewIpAndLocalListeners("", *mqBrokerOpt.port, 0) + if err != nil { + glog.Fatalf("failed to listen on grpc port %d: %v", *mqBrokerOpt.port, err) + } + grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.msg_broker")) + mq_pb.RegisterSeaweedMessagingServer(grpcS, qs) + reflection.Register(grpcS) + grpcS.Serve(grpcL) + + return true + +} diff --git a/weed/command/msg_broker.go b/weed/command/msg_broker.go deleted file mode 100644 index 3274f599b..000000000 --- a/weed/command/msg_broker.go +++ /dev/null @@ -1,109 +0,0 @@ -package command - -import ( - "context" - "fmt" - "time" - - "google.golang.org/grpc/reflection" - - "github.com/chrislusf/seaweedfs/weed/util/grace" - - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/messaging/broker" - "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" - "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/util" -) - -var ( - messageBrokerStandaloneOptions MessageBrokerOptions -) - -type MessageBrokerOptions struct { - filer *string - ip *string - port *int - cpuprofile *string - memprofile *string -} - -func init() { - cmdMsgBroker.Run = runMsgBroker // break init cycle - messageBrokerStandaloneOptions.filer = cmdMsgBroker.Flag.String("filer", "localhost:8888", "filer server address") - messageBrokerStandaloneOptions.ip = cmdMsgBroker.Flag.String("ip", util.DetectedHostAddress(), "broker host address") - messageBrokerStandaloneOptions.port = cmdMsgBroker.Flag.Int("port", 17777, "broker gRPC listen port") - messageBrokerStandaloneOptions.cpuprofile = cmdMsgBroker.Flag.String("cpuprofile", "", "cpu profile output file") - messageBrokerStandaloneOptions.memprofile = cmdMsgBroker.Flag.String("memprofile", "", "memory profile output file") -} - -var cmdMsgBroker = &Command{ - UsageLine: "msgBroker [-port=17777] [-filer=<ip:port>]", - Short: "start a message queue broker", - Long: `start a message queue broker - - The broker can accept gRPC calls to write or read messages. The messages are stored via filer. - The brokers are stateless. To scale up, just add more brokers. - -`, -} - -func runMsgBroker(cmd *Command, args []string) bool { - - util.LoadConfiguration("security", false) - - return messageBrokerStandaloneOptions.startQueueServer() - -} - -func (msgBrokerOpt *MessageBrokerOptions) startQueueServer() bool { - - grace.SetupProfiling(*messageBrokerStandaloneOptions.cpuprofile, *messageBrokerStandaloneOptions.memprofile) - - filerAddress := pb.ServerAddress(*msgBrokerOpt.filer) - - grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.msg_broker") - cipher := false - - for { - err := pb.WithGrpcFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) - if err != nil { - return fmt.Errorf("get filer %s configuration: %v", filerAddress, err) - } - cipher = resp.Cipher - return nil - }) - if err != nil { - glog.V(0).Infof("wait to connect to filer %s grpc address %s", *msgBrokerOpt.filer, filerAddress.ToGrpcAddress()) - time.Sleep(time.Second) - } else { - glog.V(0).Infof("connected to filer %s grpc address %s", *msgBrokerOpt.filer, filerAddress.ToGrpcAddress()) - break - } - } - - qs, err := broker.NewMessageBroker(&broker.MessageBrokerOption{ - Filers: []pb.ServerAddress{filerAddress}, - DefaultReplication: "", - MaxMB: 0, - Ip: *msgBrokerOpt.ip, - Port: *msgBrokerOpt.port, - Cipher: cipher, - }, grpcDialOption) - - // start grpc listener - grpcL, _, err := util.NewIpAndLocalListeners("", *msgBrokerOpt.port, 0) - if err != nil { - glog.Fatalf("failed to listen on grpc port %d: %v", *msgBrokerOpt.port, err) - } - grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.msg_broker")) - messaging_pb.RegisterSeaweedMessagingServer(grpcS, qs) - reflection.Register(grpcS) - grpcS.Serve(grpcL) - - return true - -} diff --git a/weed/command/s3.go b/weed/command/s3.go index 42e447d90..8e88bfc2c 100644 --- a/weed/command/s3.go +++ b/weed/command/s3.go @@ -3,22 +3,22 @@ package command import ( "context" "fmt" - "github.com/chrislusf/seaweedfs/weed/s3api/s3err" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" "google.golang.org/grpc/reflection" "net/http" "time" - "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/pb/s3_pb" - "github.com/chrislusf/seaweedfs/weed/security" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/s3_pb" + "github.com/seaweedfs/seaweedfs/weed/security" "github.com/gorilla/mux" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/s3api" - stats_collect "github.com/chrislusf/seaweedfs/weed/stats" - "github.com/chrislusf/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/s3api" + stats_collect "github.com/seaweedfs/seaweedfs/weed/stats" + "github.com/seaweedfs/seaweedfs/weed/util" ) var ( diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go index fb81f9966..2c3a89ada 100644 --- a/weed/command/scaffold.go +++ b/weed/command/scaffold.go @@ -2,10 +2,10 @@ package command import ( "fmt" - "github.com/chrislusf/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/util" "path/filepath" - "github.com/chrislusf/seaweedfs/weed/command/scaffold" + "github.com/seaweedfs/seaweedfs/weed/command/scaffold" ) func init() { diff --git a/weed/command/scaffold/replication.toml b/weed/command/scaffold/replication.toml index c463c8077..cffe1b76f 100644 --- a/weed/command/scaffold/replication.toml +++ b/weed/command/scaffold/replication.toml @@ -13,6 +13,8 @@ grpcAddress = "localhost:18888" # this is not a directory on your hard drive, but on your filer. # i.e., all files with this "prefix" are sent to notification message queue. directory = "/buckets" +# files from the directory separated by space are excluded from sending notifications +excludeDirectories = "/buckets/tmp" [sink.local] enabled = false diff --git a/weed/command/server.go b/weed/command/server.go index b1812bb9b..f7cfa4805 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -7,12 +7,12 @@ import ( "strings" "time" - stats_collect "github.com/chrislusf/seaweedfs/weed/stats" + stats_collect "github.com/seaweedfs/seaweedfs/weed/stats" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/util" - "github.com/chrislusf/seaweedfs/weed/util/grace" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/util/grace" ) type ServerOptions struct { @@ -24,13 +24,13 @@ type ServerOptions struct { } var ( - serverOptions ServerOptions - masterOptions MasterOptions - filerOptions FilerOptions - s3Options S3Options - iamOptions IamOptions - webdavOptions WebDavOption - msgBrokerOptions MessageBrokerOptions + serverOptions ServerOptions + masterOptions MasterOptions + filerOptions FilerOptions + s3Options S3Options + iamOptions IamOptions + webdavOptions WebDavOption + mqBrokerOptions MessageQueueBrokerOptions ) func init() { @@ -74,7 +74,7 @@ var ( isStartingS3 = cmdServer.Flag.Bool("s3", false, "whether to start S3 gateway") isStartingIam = cmdServer.Flag.Bool("iam", false, "whether to start IAM service") isStartingWebDav = cmdServer.Flag.Bool("webdav", false, "whether to start WebDAV gateway") - isStartingMsgBroker = cmdServer.Flag.Bool("msgBroker", false, "whether to start message broker") + isStartingMqBroker = cmdServer.Flag.Bool("mq.broker", false, "whether to start message queue broker") serverWhiteList []string @@ -155,7 +155,7 @@ func init() { webdavOptions.cacheDir = cmdServer.Flag.String("webdav.cacheDir", os.TempDir(), "local cache directory for file chunks") webdavOptions.cacheSizeMB = cmdServer.Flag.Int64("webdav.cacheCapacityMB", 0, "local cache capacity in MB") - msgBrokerOptions.port = cmdServer.Flag.Int("msgBroker.port", 17777, "broker gRPC listen port") + mqBrokerOptions.port = cmdServer.Flag.Int("mq.broker.port", 17777, "message queue broker gRPC listen port") } @@ -179,7 +179,7 @@ func runServer(cmd *Command, args []string) bool { if *isStartingWebDav { *isStartingFiler = true } - if *isStartingMsgBroker { + if *isStartingMqBroker { *isStartingFiler = true } @@ -208,7 +208,9 @@ func runServer(cmd *Command, args []string) bool { serverOptions.v.idleConnectionTimeout = serverTimeout serverOptions.v.dataCenter = serverDataCenter serverOptions.v.rack = serverRack - msgBrokerOptions.ip = serverIp + mqBrokerOptions.ip = serverIp + mqBrokerOptions.masters = filerOptions.masters + mqBrokerOptions.filerGroup = filerOptions.filerGroup // serverOptions.v.pulseSeconds = pulseSeconds // masterOptions.pulseSeconds = pulseSeconds @@ -217,6 +219,8 @@ func runServer(cmd *Command, args []string) bool { filerOptions.dataCenter = serverDataCenter filerOptions.rack = serverRack + mqBrokerOptions.dataCenter = serverDataCenter + mqBrokerOptions.rack = serverRack filerOptions.disableHttp = serverDisableHttp masterOptions.disableHttp = serverDisableHttp @@ -224,7 +228,7 @@ func runServer(cmd *Command, args []string) bool { s3Options.filer = &filerAddress iamOptions.filer = &filerAddress webdavOptions.filer = &filerAddress - msgBrokerOptions.filer = &filerAddress + mqBrokerOptions.filerGroup = filerOptions.filerGroup go stats_collect.StartMetricsServer(*serverMetricsHttpPort) @@ -276,10 +280,10 @@ func runServer(cmd *Command, args []string) bool { }() } - if *isStartingMsgBroker { + if *isStartingMqBroker { go func() { time.Sleep(2 * time.Second) - msgBrokerOptions.startQueueServer() + mqBrokerOptions.startQueueServer() }() } diff --git a/weed/command/shell.go b/weed/command/shell.go index c32a8e614..f562f624e 100644 --- a/weed/command/shell.go +++ b/weed/command/shell.go @@ -2,11 +2,11 @@ package command import ( "fmt" - "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/shell" - "github.com/chrislusf/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/security" + "github.com/seaweedfs/seaweedfs/weed/shell" + "github.com/seaweedfs/seaweedfs/weed/util" ) var ( diff --git a/weed/command/update.go b/weed/command/update.go index 2d0dc42ad..89efae79a 100644 --- a/weed/command/update.go +++ b/weed/command/update.go @@ -19,8 +19,8 @@ import ( "strings" "time" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util" "golang.org/x/net/context/ctxhttp" ) @@ -76,8 +76,8 @@ func init() { var cmdUpdate = &Command{ UsageLine: "update [-dir=/path/to/dir] [-name=name] [-version=x.xx]", - Short: "get latest or specific version from https://github.com/chrislusf/seaweedfs", - Long: `get latest or specific version from https://github.com/chrislusf/seaweedfs`, + Short: "get latest or specific version from https://github.com/seaweedfs/seaweedfs", + Long: `get latest or specific version from https://github.com/seaweedfs/seaweedfs`, } func runUpdate(cmd *Command, args []string) bool { @@ -118,7 +118,7 @@ func runUpdate(cmd *Command, args []string) bool { func downloadRelease(ctx context.Context, target string, ver string) (version string, err error) { currentVersion := util.VERSION_NUMBER - rel, err := GitHubLatestRelease(ctx, ver, "chrislusf", "seaweedfs") + rel, err := GitHubLatestRelease(ctx, ver, "seaweedfs", "seaweedfs") if err != nil { return "", err } diff --git a/weed/command/upload.go b/weed/command/upload.go index f2b0b7fe4..389a72552 100644 --- a/weed/command/upload.go +++ b/weed/command/upload.go @@ -4,15 +4,15 @@ import ( "context" "encoding/json" "fmt" - "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "google.golang.org/grpc" "os" "path/filepath" - "github.com/chrislusf/seaweedfs/weed/operation" - "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/operation" + "github.com/seaweedfs/seaweedfs/weed/security" + "github.com/seaweedfs/seaweedfs/weed/util" ) var ( diff --git a/weed/command/version.go b/weed/command/version.go index 9caf7dc4e..8e9574691 100644 --- a/weed/command/version.go +++ b/weed/command/version.go @@ -4,7 +4,7 @@ import ( "fmt" "runtime" - "github.com/chrislusf/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/util" ) var cmdVersion = &Command{ diff --git a/weed/command/volume.go b/weed/command/volume.go index 158bdf162..45d13ac1f 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -10,25 +10,25 @@ import ( "strings" "time" - "github.com/chrislusf/seaweedfs/weed/storage/types" + "github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/spf13/viper" "google.golang.org/grpc" - "github.com/chrislusf/seaweedfs/weed/util/grace" + "github.com/seaweedfs/seaweedfs/weed/util/grace" - "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/util/httpdown" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/security" + "github.com/seaweedfs/seaweedfs/weed/util/httpdown" "google.golang.org/grpc/reflection" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" - weed_server "github.com/chrislusf/seaweedfs/weed/server" - stats_collect "github.com/chrislusf/seaweedfs/weed/stats" - "github.com/chrislusf/seaweedfs/weed/storage" - "github.com/chrislusf/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" + weed_server "github.com/seaweedfs/seaweedfs/weed/server" + stats_collect "github.com/seaweedfs/seaweedfs/weed/stats" + "github.com/seaweedfs/seaweedfs/weed/storage" + "github.com/seaweedfs/seaweedfs/weed/util" ) var ( diff --git a/weed/command/volume_test.go b/weed/command/volume_test.go index 7399f1248..801041a88 100644 --- a/weed/command/volume_test.go +++ b/weed/command/volume_test.go @@ -5,7 +5,7 @@ import ( "testing" "time" - "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/glog" ) func TestXYZ(t *testing.T) { diff --git a/weed/command/webdav.go b/weed/command/webdav.go index 689bf3c30..ea19ad1a7 100644 --- a/weed/command/webdav.go +++ b/weed/command/webdav.go @@ -9,12 +9,12 @@ import ( "strconv" "time" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/server" - "github.com/chrislusf/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/security" + "github.com/seaweedfs/seaweedfs/weed/server" + "github.com/seaweedfs/seaweedfs/weed/util" ) var ( |
