diff options
| author | Chris Lu <chris.lu@gmail.com> | 2021-09-12 22:47:52 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2021-09-12 22:47:52 -0700 |
| commit | e5fc35ed0c970fea060a5b3b7a3f5efb5af425d6 (patch) | |
| tree | 3ad0436940263a24ac46d38a60dd1e35b2c1cdfe /weed/command | |
| parent | 2c9d4c8f43c1e95c75fc332ca83d19e33e5da3ac (diff) | |
| download | seaweedfs-e5fc35ed0c970fea060a5b3b7a3f5efb5af425d6.tar.xz seaweedfs-e5fc35ed0c970fea060a5b3b7a3f5efb5af425d6.zip | |
change server address from string to a type
Diffstat (limited to 'weed/command')
| -rw-r--r-- | weed/command/backup.go | 5 | ||||
| -rw-r--r-- | weed/command/benchmark.go | 4 | ||||
| -rw-r--r-- | weed/command/download.go | 3 | ||||
| -rw-r--r-- | weed/command/filer.go | 18 | ||||
| -rw-r--r-- | weed/command/filer_backup.go | 4 | ||||
| -rw-r--r-- | weed/command/filer_cat.go | 4 | ||||
| -rw-r--r-- | weed/command/filer_copy.go | 67 | ||||
| -rw-r--r-- | weed/command/filer_meta_backup.go | 4 | ||||
| -rw-r--r-- | weed/command/filer_meta_tail.go | 2 | ||||
| -rw-r--r-- | weed/command/filer_remote_sync.go | 8 | ||||
| -rw-r--r-- | weed/command/filer_remote_sync_buckets.go | 6 | ||||
| -rw-r--r-- | weed/command/filer_remote_sync_dir.go | 8 | ||||
| -rw-r--r-- | weed/command/filer_sync.go | 18 | ||||
| -rw-r--r-- | weed/command/iam.go | 23 | ||||
| -rw-r--r-- | weed/command/master.go | 24 | ||||
| -rw-r--r-- | weed/command/master_follower.go | 17 | ||||
| -rw-r--r-- | weed/command/mount_std.go | 20 | ||||
| -rw-r--r-- | weed/command/msg_broker.go | 16 | ||||
| -rw-r--r-- | weed/command/s3.go | 17 | ||||
| -rw-r--r-- | weed/command/server.go | 14 | ||||
| -rw-r--r-- | weed/command/shell.go | 9 | ||||
| -rw-r--r-- | weed/command/upload.go | 8 | ||||
| -rw-r--r-- | weed/command/volume.go | 14 | ||||
| -rw-r--r-- | weed/command/webdav.go | 17 |
24 files changed, 142 insertions, 188 deletions
diff --git a/weed/command/backup.go b/weed/command/backup.go index 4c5a2d820..2279d0d1a 100644 --- a/weed/command/backup.go +++ b/weed/command/backup.go @@ -2,6 +2,7 @@ package command import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/storage/needle" @@ -72,12 +73,12 @@ func runBackup(cmd *Command, args []string) bool { vid := needle.VolumeId(*s.volumeId) // find volume location, replication, ttl info - lookup, err := operation.LookupVolumeId(func() string { return *s.master }, grpcDialOption, vid.String()) + lookup, err := operation.LookupVolumeId(func() pb.ServerAddress { return pb.ServerAddress(*s.master) }, grpcDialOption, vid.String()) if err != nil { fmt.Printf("Error looking up volume %d: %v\n", vid, err) return true } - volumeServer := lookup.Locations[0].Url + volumeServer := lookup.Locations[0].ServerAddress() stats, err := operation.GetVolumeSyncStatus(volumeServer, grpcDialOption, uint32(vid)) if err != nil { diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go index f0c8f6139..af5919adf 100644 --- a/weed/command/benchmark.go +++ b/weed/command/benchmark.go @@ -3,6 +3,7 @@ package command import ( "bufio" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" "io" "math" "math/rand" @@ -10,7 +11,6 @@ import ( "runtime" "runtime/pprof" "sort" - "strings" "sync" "time" @@ -129,7 +129,7 @@ func runBenchmark(cmd *Command, args []string) bool { defer pprof.StopCPUProfile() } - b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "client", "", 0, "", strings.Split(*b.masters, ",")) + b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "client", "", "", pb.ServerAddresses(*b.masters).ToAddresses()) go b.masterClient.KeepConnectedToMaster() b.masterClient.WaitUntilConnected() diff --git a/weed/command/download.go b/weed/command/download.go index a64d3f237..1d8a72d31 100644 --- a/weed/command/download.go +++ b/weed/command/download.go @@ -2,6 +2,7 @@ package command import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/security" "google.golang.org/grpc" "io" @@ -49,7 +50,7 @@ func runDownload(cmd *Command, args []string) bool { grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") for _, fid := range args { - if e := downloadToFile(func() string { return *d.server }, grpcDialOption, fid, util.ResolvePath(*d.dir)); e != nil { + if e := downloadToFile(func() pb.ServerAddress { return pb.ServerAddress(*d.server) }, grpcDialOption, fid, util.ResolvePath(*d.dir)); e != nil { fmt.Println("Download Error: ", fid, e) } } diff --git a/weed/command/filer.go b/weed/command/filer.go index 63dd53f9e..96802a1cb 100644 --- a/weed/command/filer.go +++ b/weed/command/filer.go @@ -29,7 +29,8 @@ var ( ) type FilerOptions struct { - masters *string + masters []pb.ServerAddress + mastersString *string ip *string bindIp *string port *int @@ -56,7 +57,7 @@ type FilerOptions struct { func init() { cmdFiler.Run = runFiler // break init cycle - f.masters = cmdFiler.Flag.String("master", "localhost:9333", "comma-separated master servers") + f.mastersString = cmdFiler.Flag.String("master", "localhost:9333", "comma-separated master servers") f.collection = cmdFiler.Flag.String("collection", "", "all data will be stored in this default collection") f.ip = cmdFiler.Flag.String("ip", util.DetectedHostAddress(), "filer server http listen ip address") f.bindIp = cmdFiler.Flag.String("ip.bind", "", "ip address to bind to") @@ -157,13 +158,15 @@ func runFiler(cmd *Command, args []string) bool { if *filerStartIam { filerIamOptions.filer = &filerAddress - filerIamOptions.masters = f.masters + filerIamOptions.masters = f.mastersString go func() { time.Sleep(startDelay * time.Second) filerIamOptions.startIamServer() }() } + f.masters = pb.ServerAddresses(*f.mastersString).ToAddresses() + f.startFiler() return true @@ -185,8 +188,10 @@ func (fo *FilerOptions) startFiler() { peers = strings.Split(*fo.peers, ",") } + filerAddress := pb.NewServerAddress(*fo.ip, *fo.port, *fo.portGrpc) + fs, nfs_err := weed_server.NewFilerServer(defaultMux, publicVolumeMux, &weed_server.FilerOption{ - Masters: strings.Split(*fo.masters, ","), + Masters: fo.masters, Collection: *fo.collection, DefaultReplication: *fo.defaultReplicaPlacement, DisableDirListing: *fo.disableDirListing, @@ -196,11 +201,10 @@ func (fo *FilerOptions) startFiler() { Rack: *fo.rack, DefaultLevelDbDir: defaultLevelDbDirectory, DisableHttp: *fo.disableHttp, - Host: *fo.ip, - Port: uint32(*fo.port), + Host: filerAddress, Cipher: *fo.cipher, SaveToFilerLimit: int64(*fo.saveToFilerLimit), - Filers: peers, + Filers: pb.FromAddressStrings(peers), ConcurrentUploadLimit: int64(*fo.concurrentUploadLimitMB) * 1024 * 1024, }) if nfs_err != nil { diff --git a/weed/command/filer_backup.go b/weed/command/filer_backup.go index 5b6409187..9e5041531 100644 --- a/weed/command/filer_backup.go +++ b/weed/command/filer_backup.go @@ -78,7 +78,7 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti return fmt.Errorf("no data sink configured in replication.toml") } - sourceFiler := *backupOption.filer + sourceFiler := pb.ServerAddress(*backupOption.filer) sourcePath := *backupOption.path timeAgo := *backupOption.timeAgo targetPath := dataSink.GetSinkToDirectory() @@ -102,7 +102,7 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti // create filer sink filerSource := &source.FilerSource{} - filerSource.DoInitialize(sourceFiler, pb.ServerToGrpcAddress(sourceFiler), sourcePath, *backupOption.proxyByFiler) + filerSource.DoInitialize(sourceFiler.ToHttpAddress(), sourceFiler.ToGrpcAddress(), sourcePath, *backupOption.proxyByFiler) dataSink.SetSourceFiler(filerSource) processEventFn := genProcessFunction(sourcePath, targetPath, dataSink, debug) diff --git a/weed/command/filer_cat.go b/weed/command/filer_cat.go index 09f5e97fe..71c3a48d6 100644 --- a/weed/command/filer_cat.go +++ b/weed/command/filer_cat.go @@ -23,7 +23,7 @@ var ( type FilerCatOptions struct { grpcDialOption grpc.DialOption - filerAddress string + filerAddress pb.ServerAddress filerClient filer_pb.SeaweedFilerClient output *string } @@ -78,7 +78,7 @@ func runFilerCat(cmd *Command, args []string) bool { return false } - filerCat.filerAddress = filerUrl.Host + filerCat.filerAddress = pb.ServerAddress(filerUrl.Host) filerCat.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client") dir, name := util.FullPath(urlPath).DirAndName() diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index 05aa96292..0feae63b3 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -7,7 +7,6 @@ import ( "io" "io/ioutil" "net/http" - "net/url" "os" "path/filepath" "strconv" @@ -92,35 +91,21 @@ func runCopy(cmd *Command, args []string) bool { filerDestination := args[len(args)-1] fileOrDirs := args[0 : len(args)-1] - filerUrl, err := url.Parse(filerDestination) + filerAddress, urlPath, err := pb.ParseUrl(filerDestination) if err != nil { fmt.Printf("The last argument should be a URL on filer: %v\n", err) return false } - urlPath := filerUrl.Path if !strings.HasSuffix(urlPath, "/") { fmt.Printf("The last argument should be a folder and end with \"/\"\n") return false } - if filerUrl.Port() == "" { - fmt.Printf("The filer port should be specified.\n") - return false - } - - filerPort, parseErr := strconv.ParseUint(filerUrl.Port(), 10, 64) - if parseErr != nil { - fmt.Printf("The filer port parse error: %v\n", parseErr) - return false - } - - filerGrpcPort := filerPort + 10000 - filerGrpcAddress := util.JoinHostPort(filerUrl.Hostname(), int(filerGrpcPort)) copy.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client") - masters, collection, replication, dirBuckets, maxMB, cipher, err := readFilerConfiguration(copy.grpcDialOption, filerGrpcAddress) + masters, collection, replication, dirBuckets, maxMB, cipher, err := readFilerConfiguration(copy.grpcDialOption, filerAddress) if err != nil { - fmt.Printf("read from filer %s: %v\n", filerGrpcAddress, err) + fmt.Printf("read from filer %s: %v\n", filerAddress, err) return false } if strings.HasPrefix(urlPath, dirBuckets+"/") { @@ -174,9 +159,8 @@ func runCopy(cmd *Command, args []string) bool { go func() { defer waitGroup.Done() worker := FileCopyWorker{ - options: ©, - filerHost: filerUrl.Host, - filerGrpcAddress: filerGrpcAddress, + options: ©, + filerAddress: filerAddress, } if err := worker.copyFiles(fileCopyTaskChan); err != nil { fmt.Fprintf(os.Stderr, "copy file error: %v\n", err) @@ -189,7 +173,7 @@ func runCopy(cmd *Command, args []string) bool { return true } -func readFilerConfiguration(grpcDialOption grpc.DialOption, filerGrpcAddress string) (masters []string, collection, replication string, dirBuckets string, maxMB uint32, cipher bool, err error) { +func readFilerConfiguration(grpcDialOption grpc.DialOption, filerGrpcAddress pb.ServerAddress) (masters []string, collection, replication string, dirBuckets string, maxMB uint32, cipher bool, err error) { err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { @@ -241,9 +225,8 @@ func genFileCopyTask(fileOrDir string, destPath string, fileCopyTaskChan chan Fi } type FileCopyWorker struct { - options *CopyOptions - filerHost string - filerGrpcAddress string + options *CopyOptions + filerAddress pb.ServerAddress } func (worker *FileCopyWorker) copyFiles(fileCopyTaskChan chan FileCopyTask) error { @@ -321,7 +304,7 @@ func (worker *FileCopyWorker) checkExistingFileFirst(task FileCopyTask, f *os.Fi return } - err = pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.LookupDirectoryEntryRequest{ Directory: task.destinationUrlPath, @@ -363,7 +346,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err // assign a volume err = util.Retry("assignVolume", func() error { - return pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + return pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AssignVolumeRequest{ Count: 1, @@ -381,7 +364,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err if assignResult.Error != "" { return fmt.Errorf("assign volume failure %v: %v", request, assignResult.Error) } - if assignResult.Url == "" { + if assignResult.Location.Url == "" { return fmt.Errorf("assign volume failure %v: %v", request, assignResult) } return nil @@ -391,7 +374,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err return fmt.Errorf("Failed to assign from %v: %v\n", worker.options.masters, err) } - targetUrl := "http://" + assignResult.Url + "/" + assignResult.FileId + targetUrl := "http://" + assignResult.Location.Url + "/" + assignResult.FileId uploadOption := &operation.UploadOption{ UploadUrl: targetUrl, Filename: fileName, @@ -414,10 +397,10 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err chunks = append(chunks, uploadResult.ToPbFileChunk(assignResult.FileId, 0)) - fmt.Printf("copied %s => http://%s%s%s\n", f.Name(), worker.filerHost, task.destinationUrlPath, fileName) + fmt.Printf("copied %s => http://%s%s%s\n", f.Name(), worker.filerAddress.ToHttpAddress(), task.destinationUrlPath, fileName) } - if err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if err := pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.CreateEntryRequest{ Directory: task.destinationUrlPath, Entry: &filer_pb.Entry{ @@ -443,7 +426,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err } return nil }); err != nil { - return fmt.Errorf("upload data %v to http://%s%s%s: %v\n", fileName, worker.filerHost, task.destinationUrlPath, fileName, err) + return fmt.Errorf("upload data %v to http://%s%s%s: %v\n", fileName, worker.filerAddress.ToHttpAddress(), task.destinationUrlPath, fileName, err) } return nil @@ -474,7 +457,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, var assignResult *filer_pb.AssignVolumeResponse var assignError error err := util.Retry("assignVolume", func() error { - return pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + return pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AssignVolumeRequest{ Count: 1, Replication: *worker.options.replication, @@ -498,7 +481,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, fmt.Printf("Failed to assign from %v: %v\n", worker.options.masters, err) } - targetUrl := "http://" + assignResult.Url + "/" + assignResult.FileId + targetUrl := "http://" + assignResult.Location.Url + "/" + assignResult.FileId if collection == "" { collection = assignResult.Collection } @@ -508,7 +491,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, uploadOption := &operation.UploadOption{ UploadUrl: targetUrl, - Filename: fileName+"-"+strconv.FormatInt(i+1, 10), + Filename: fileName + "-" + strconv.FormatInt(i+1, 10), Cipher: worker.options.cipher, IsInputCompressed: false, MimeType: "", @@ -542,8 +525,8 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, for _, chunk := range chunks { fileIds = append(fileIds, chunk.FileId) } - operation.DeleteFiles(func() string { - return copy.masters[0] + operation.DeleteFiles(func() pb.ServerAddress { + return pb.ServerAddress(copy.masters[0]) }, false, worker.options.grpcDialOption, fileIds) return uploadError } @@ -553,7 +536,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, return fmt.Errorf("create manifest: %v", manifestErr) } - if err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if err := pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.CreateEntryRequest{ Directory: task.destinationUrlPath, Entry: &filer_pb.Entry{ @@ -579,10 +562,10 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, } return nil }); err != nil { - return fmt.Errorf("upload data %v to http://%s%s%s: %v\n", fileName, worker.filerHost, task.destinationUrlPath, fileName, err) + return fmt.Errorf("upload data %v to http://%s%s%s: %v\n", fileName, worker.filerAddress.ToHttpAddress(), task.destinationUrlPath, fileName, err) } - fmt.Printf("copied %s => http://%s%s%s\n", f.Name(), worker.filerHost, task.destinationUrlPath, fileName) + fmt.Printf("copied %s => http://%s%s%s\n", f.Name(), worker.filerAddress.ToHttpAddress(), task.destinationUrlPath, fileName) return nil } @@ -611,7 +594,7 @@ func (worker *FileCopyWorker) saveDataAsChunk(reader io.Reader, name string, off var fileId, host string var auth security.EncodedJwt - if flushErr := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + if flushErr := pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { ctx := context.Background() @@ -633,7 +616,7 @@ func (worker *FileCopyWorker) saveDataAsChunk(reader io.Reader, name string, off return fmt.Errorf("assign volume failure %v: %v", request, resp.Error) } - fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth) + fileId, host, auth = resp.FileId, resp.Location.Url, security.EncodedJwt(resp.Auth) collection, replication = resp.Collection, resp.Replication return nil diff --git a/weed/command/filer_meta_backup.go b/weed/command/filer_meta_backup.go index 3757f63f1..0b8fa76c6 100644 --- a/weed/command/filer_meta_backup.go +++ b/weed/command/filer_meta_backup.go @@ -195,7 +195,7 @@ func (metaBackup *FilerMetaBackupOptions) streamMetadataBackup() error { return metaBackup.setOffset(lastTime) }) - return pb.FollowMetadata(*metaBackup.filerAddress, metaBackup.grpcDialOption, "meta_backup", + return pb.FollowMetadata(pb.ServerAddress(*metaBackup.filerAddress), metaBackup.grpcDialOption, "meta_backup", *metaBackup.filerDirectory, nil, startTime.UnixNano(), 0, processEventFnWithOffset, false) } @@ -224,7 +224,7 @@ var _ = filer_pb.FilerClient(&FilerMetaBackupOptions{}) func (metaBackup *FilerMetaBackupOptions) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { - return pb.WithFilerClient(*metaBackup.filerAddress, metaBackup.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + return pb.WithFilerClient(pb.ServerAddress(*metaBackup.filerAddress), metaBackup.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { return fn(client) }) diff --git a/weed/command/filer_meta_tail.go b/weed/command/filer_meta_tail.go index 0363ae8d1..85a3eaf84 100644 --- a/weed/command/filer_meta_tail.go +++ b/weed/command/filer_meta_tail.go @@ -103,7 +103,7 @@ func runFilerMetaTail(cmd *Command, args []string) bool { } } - tailErr := pb.FollowMetadata(*tailFiler, grpcDialOption, "tail", + tailErr := pb.FollowMetadata(pb.ServerAddress(*tailFiler), grpcDialOption, "tail", *tailTarget, nil, time.Now().Add(-*tailStart).UnixNano(), 0, func(resp *filer_pb.SubscribeMetadataResponse) error { if !shouldPrint(resp) { diff --git a/weed/command/filer_remote_sync.go b/weed/command/filer_remote_sync.go index 3776ee4d9..857fbb0eb 100644 --- a/weed/command/filer_remote_sync.go +++ b/weed/command/filer_remote_sync.go @@ -33,7 +33,7 @@ type RemoteSyncOptions struct { var _ = filer_pb.FilerClient(&RemoteSyncOptions{}) func (option *RemoteSyncOptions) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { - return pb.WithFilerClient(*option.filerAddress, option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + return pb.WithFilerClient(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { return fn(client) }) } @@ -90,12 +90,12 @@ func runFilerRemoteSynchronize(cmd *Command, args []string) bool { remoteSyncOptions.grpcDialOption = grpcDialOption dir := *remoteSyncOptions.dir - filerAddress := *remoteSyncOptions.filerAddress + filerAddress := pb.ServerAddress(*remoteSyncOptions.filerAddress) filerSource := &source.FilerSource{} filerSource.DoInitialize( - filerAddress, - pb.ServerToGrpcAddress(filerAddress), + filerAddress.ToHttpAddress(), + filerAddress.ToGrpcAddress(), "/", // does not matter *remoteSyncOptions.readChunkFromFiler, ) diff --git a/weed/command/filer_remote_sync_buckets.go b/weed/command/filer_remote_sync_buckets.go index 70f9f49c1..73c8de1a9 100644 --- a/weed/command/filer_remote_sync_buckets.go +++ b/weed/command/filer_remote_sync_buckets.go @@ -32,12 +32,12 @@ func (option *RemoteSyncOptions) followBucketUpdatesAndUploadToRemote(filerSourc processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error { lastTime := time.Unix(0, lastTsNs) glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, lastTime, float64(counter)/float64(3)) - return remote_storage.SetSyncOffset(option.grpcDialOption, *option.filerAddress, option.bucketsDir, lastTsNs) + return remote_storage.SetSyncOffset(option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, lastTsNs) }) lastOffsetTs := collectLastSyncOffset(option, option.bucketsDir) - return pb.FollowMetadata(*option.filerAddress, option.grpcDialOption, "filer.remote.sync", + return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, "filer.remote.sync", option.bucketsDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false) } @@ -357,7 +357,7 @@ func extractBucketPath(bucketsDir, dir string) (util.FullPath, bool) { func (option *RemoteSyncOptions) collectRemoteStorageConf() (err error) { - if mappings, err := filer.ReadMountMappings(option.grpcDialOption, *option.filerAddress); err != nil { + if mappings, err := filer.ReadMountMappings(option.grpcDialOption, pb.ServerAddress(*option.filerAddress)); err != nil { return err } else { option.mappings = mappings diff --git a/weed/command/filer_remote_sync_dir.go b/weed/command/filer_remote_sync_dir.go index dc2e9a1fb..50f1e35cf 100644 --- a/weed/command/filer_remote_sync_dir.go +++ b/weed/command/filer_remote_sync_dir.go @@ -20,7 +20,7 @@ import ( func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *source.FilerSource, mountedDir string) error { // read filer remote storage mount mappings - _, _, remoteStorageMountLocation, remoteStorage, detectErr := filer.DetectMountInfo(option.grpcDialOption, *option.filerAddress, mountedDir) + _, _, remoteStorageMountLocation, remoteStorage, detectErr := filer.DetectMountInfo(option.grpcDialOption, pb.ServerAddress(*option.filerAddress), mountedDir) if detectErr != nil { return fmt.Errorf("read mount info: %v", detectErr) } @@ -33,12 +33,12 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error { lastTime := time.Unix(0, lastTsNs) glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, lastTime, float64(counter)/float64(3)) - return remote_storage.SetSyncOffset(option.grpcDialOption, *option.filerAddress, mountedDir, lastTsNs) + return remote_storage.SetSyncOffset(option.grpcDialOption, pb.ServerAddress(*option.filerAddress), mountedDir, lastTsNs) }) lastOffsetTs := collectLastSyncOffset(option, mountedDir) - return pb.FollowMetadata(*option.filerAddress, option.grpcDialOption, "filer.remote.sync", + return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, "filer.remote.sync", mountedDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false) } @@ -171,7 +171,7 @@ func collectLastSyncOffset(option *RemoteSyncOptions, mountedDir string) time.Ti return time.Now() } - lastOffsetTsNs, err := remote_storage.GetSyncOffset(option.grpcDialOption, *option.filerAddress, mountedDir) + lastOffsetTsNs, err := remote_storage.GetSyncOffset(option.grpcDialOption, pb.ServerAddress(*option.filerAddress), mountedDir) if mountedDirEntry != nil { if err == nil && mountedDirEntry.Attributes.Crtime < lastOffsetTsNs/1000000 { lastOffsetTs = time.Unix(0, lastOffsetTsNs) diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go index 33efdb2b7..20755dbe5 100644 --- a/weed/command/filer_sync.go +++ b/weed/command/filer_sync.go @@ -93,9 +93,11 @@ func runFilerSynchronize(cmd *Command, args []string) bool { grace.SetupProfiling(*syncCpuProfile, *syncMemProfile) + filerA := pb.ServerAddress(*syncOptions.filerA) + filerB := pb.ServerAddress(*syncOptions.filerB) go func() { for { - err := doSubscribeFilerMetaChanges(grpcDialOption, *syncOptions.filerA, *syncOptions.aPath, *syncOptions.aProxyByFiler, *syncOptions.filerB, + err := doSubscribeFilerMetaChanges(grpcDialOption, filerA, *syncOptions.aPath, *syncOptions.aProxyByFiler, filerB, *syncOptions.bPath, *syncOptions.bReplication, *syncOptions.bCollection, *syncOptions.bTtlSec, *syncOptions.bProxyByFiler, *syncOptions.bDiskType, *syncOptions.bDebug) if err != nil { glog.Errorf("sync from %s to %s: %v", *syncOptions.filerA, *syncOptions.filerB, err) @@ -107,7 +109,7 @@ func runFilerSynchronize(cmd *Command, args []string) bool { if !*syncOptions.isActivePassive { go func() { for { - err := doSubscribeFilerMetaChanges(grpcDialOption, *syncOptions.filerB, *syncOptions.bPath, *syncOptions.bProxyByFiler, *syncOptions.filerA, + err := doSubscribeFilerMetaChanges(grpcDialOption, filerB, *syncOptions.bPath, *syncOptions.bProxyByFiler, filerA, *syncOptions.aPath, *syncOptions.aReplication, *syncOptions.aCollection, *syncOptions.aTtlSec, *syncOptions.aProxyByFiler, *syncOptions.aDiskType, *syncOptions.aDebug) if err != nil { glog.Errorf("sync from %s to %s: %v", *syncOptions.filerB, *syncOptions.filerA, err) @@ -122,7 +124,7 @@ func runFilerSynchronize(cmd *Command, args []string) bool { return true } -func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, sourcePath string, sourceReadChunkFromFiler bool, targetFiler, targetPath string, +func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string, replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool) error { // read source filer signature @@ -147,9 +149,9 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so // create filer sink filerSource := &source.FilerSource{} - filerSource.DoInitialize(sourceFiler, pb.ServerToGrpcAddress(sourceFiler), sourcePath, sourceReadChunkFromFiler) + filerSource.DoInitialize(sourceFiler.ToHttpAddress(), sourceFiler.ToGrpcAddress(), sourcePath, sourceReadChunkFromFiler) filerSink := &filersink.FilerSink{} - filerSink.DoInitialize(targetFiler, pb.ServerToGrpcAddress(targetFiler), targetPath, replicationStr, collection, ttlSec, diskType, grpcDialOption, sinkWriteChunkByFiler) + filerSink.DoInitialize(targetFiler.ToHttpAddress(), targetFiler.ToGrpcAddress(), targetPath, replicationStr, collection, ttlSec, diskType, grpcDialOption, sinkWriteChunkByFiler) filerSink.SetSourceFiler(filerSource) persistEventFn := genProcessFunction(sourcePath, targetPath, filerSink, debug) @@ -170,7 +172,7 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so return setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, lastTsNs) }) - return pb.FollowMetadata(sourceFiler, grpcDialOption, "syncTo_"+targetFiler, + return pb.FollowMetadata(sourceFiler, grpcDialOption, "syncTo_"+string(targetFiler), sourcePath, nil, sourceFilerOffsetTsNs, targetFilerSignature, processEventFnWithOffset, false) } @@ -179,7 +181,7 @@ const ( SyncKeyPrefix = "sync." ) -func getOffset(grpcDialOption grpc.DialOption, filer string, signaturePrefix string, signature int32) (lastOffsetTsNs int64, readErr error) { +func getOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32) (lastOffsetTsNs int64, readErr error) { readErr = pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { syncKey := []byte(signaturePrefix + "____") @@ -206,7 +208,7 @@ func getOffset(grpcDialOption grpc.DialOption, filer string, signaturePrefix str } -func setOffset(grpcDialOption grpc.DialOption, filer string, signaturePrefix string, signature int32, offsetTsNs int64) error { +func setOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32, offsetTsNs int64) error { return pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { syncKey := []byte(signaturePrefix + "____") diff --git a/weed/command/iam.go b/weed/command/iam.go index ed4eea543..ebe9657f2 100644 --- a/weed/command/iam.go +++ b/weed/command/iam.go @@ -43,38 +43,35 @@ func runIam(cmd *Command, args []string) bool { } func (iamopt *IamOptions) startIamServer() bool { - filerGrpcAddress, err := pb.ParseServerToGrpcAddress(*iamopt.filer) - if err != nil { - glog.Fatal(err) - return false - } + filerAddress := pb.ServerAddress(*iamopt.filer) util.LoadConfiguration("security", false) grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") for { - err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err := pb.WithGrpcFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { - return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err) + return fmt.Errorf("get filer %s configuration: %v", filerAddress, err) } glog.V(0).Infof("IAM read filer configuration: %s", resp) return nil }) if err != nil { - glog.V(0).Infof("wait to connect to filer %s grpc address %s", *iamopt.filer, filerGrpcAddress) + glog.V(0).Infof("wait to connect to filer %s grpc address %s", *iamopt.filer, filerAddress.ToGrpcAddress()) time.Sleep(time.Second) } else { - glog.V(0).Infof("connected to filer %s grpc address %s", *iamopt.filer, filerGrpcAddress) + glog.V(0).Infof("connected to filer %s grpc address %s", *iamopt.filer, filerAddress.ToGrpcAddress()) break } } + masters := pb.ServerAddresses(*iamopt.masters).ToAddresses() router := mux.NewRouter().SkipClean(true) _, iamApiServer_err := iamapi.NewIamApiServer(router, &iamapi.IamServerOption{ - Filer: *iamopt.filer, - Port: *iamopt.port, - FilerGrpcAddress: filerGrpcAddress, - GrpcDialOption: grpcDialOption, + Masters: masters, + Filer: filerAddress, + Port: *iamopt.port, + GrpcDialOption: grpcDialOption, }) glog.V(0).Info("NewIamApiServer created") if iamApiServer_err != nil { diff --git a/weed/command/master.go b/weed/command/master.go index bf7a5d420..adc9055ea 100644 --- a/weed/command/master.go +++ b/weed/command/master.go @@ -113,7 +113,7 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) { backend.LoadConfiguration(util.GetViper()) - myMasterAddress, peers := checkPeers(*masterOption.ip, *masterOption.port, *masterOption.peers) + myMasterAddress, peers := checkPeers(*masterOption.ip, *masterOption.port, *masterOption.portGrpc, *masterOption.peers) r := mux.NewRouter() ms := weed_server.NewMasterServer(r, masterOption.toMasterOption(masterWhiteList), peers) @@ -162,16 +162,14 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) { select {} } -func checkPeers(masterIp string, masterPort int, peers string) (masterAddress string, cleanedPeers []string) { +func checkPeers(masterIp string, masterPort int, masterGrpcPort int, peers string) (masterAddress pb.ServerAddress, cleanedPeers []pb.ServerAddress) { glog.V(0).Infof("current: %s:%d peers:%s", masterIp, masterPort, peers) - masterAddress = util.JoinHostPort(masterIp, masterPort) - if peers != "" { - cleanedPeers = strings.Split(peers, ",") - } + masterAddress = pb.NewServerAddress(masterIp, masterPort, masterGrpcPort) + cleanedPeers = pb.ServerAddresses(peers).ToAddresses() hasSelf := false for _, peer := range cleanedPeers { - if peer == masterAddress { + if peer.ToHttpAddress() == masterAddress.ToHttpAddress() { hasSelf = true break } @@ -181,13 +179,15 @@ func checkPeers(masterIp string, masterPort int, peers string) (masterAddress st cleanedPeers = append(cleanedPeers, masterAddress) } if len(cleanedPeers)%2 == 0 { - glog.Fatalf("Only odd number of masters are supported!") + glog.Fatalf("Only odd number of masters are supported: %+v", cleanedPeers) } return } -func isTheFirstOne(self string, peers []string) bool { - sort.Strings(peers) +func isTheFirstOne(self pb.ServerAddress, peers []pb.ServerAddress) bool { + sort.Slice(peers, func(i, j int) bool { + return strings.Compare(string(peers[i]), string(peers[j])) < 0 + }) if len(peers) <= 0 { return true } @@ -195,9 +195,9 @@ func isTheFirstOne(self string, peers []string) bool { } func (m *MasterOptions) toMasterOption(whiteList []string) *weed_server.MasterOption { + masterAddress := pb.NewServerAddress(*m.ip, *m.port, *m.portGrpc) return &weed_server.MasterOption{ - Host: *m.ip, - Port: *m.port, + Master: masterAddress, MetaFolder: *m.metaFolder, VolumeSizeLimitMB: uint32(*m.volumeSizeLimitMB), VolumePreallocate: *m.volumePreallocate, diff --git a/weed/command/master_follower.go b/weed/command/master_follower.go index cf7e45253..2bb9ff6d4 100644 --- a/weed/command/master_follower.go +++ b/weed/command/master_follower.go @@ -13,7 +13,6 @@ import ( "github.com/gorilla/mux" "google.golang.org/grpc/reflection" "net/http" - "strings" "time" ) @@ -79,19 +78,15 @@ func runMasterFollower(cmd *Command, args []string) bool { func startMasterFollower(masterOptions MasterOptions) { // collect settings from main masters - masters := strings.Split(*mf.peers, ",") - masterGrpcAddresses, err := pb.ParseServersToGrpcAddresses(masters) - if err != nil { - glog.V(0).Infof("ParseFilerGrpcAddress: %v", err) - return - } + masters := pb.ServerAddresses(*mf.peers).ToAddresses() + var err error grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.master") for i := 0; i < 10; i++ { - err = pb.WithOneOfGrpcMasterClients(masterGrpcAddresses, grpcDialOption, func(client master_pb.SeaweedClient) error { + err = pb.WithOneOfGrpcMasterClients(masters, grpcDialOption, func(client master_pb.SeaweedClient) error { resp, err := client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) if err != nil { - return fmt.Errorf("get master grpc address %v configuration: %v", masterGrpcAddresses, err) + return fmt.Errorf("get master grpc address %v configuration: %v", masters, err) } masterOptions.defaultReplication = &resp.DefaultReplication masterOptions.volumeSizeLimitMB = aws.Uint(uint(resp.VolumeSizeLimitMB)) @@ -99,13 +94,13 @@ func startMasterFollower(masterOptions MasterOptions) { return nil }) if err != nil { - glog.V(0).Infof("failed to talk to filer %v: %v", masterGrpcAddresses, err) + glog.V(0).Infof("failed to talk to filer %v: %v", masters, err) glog.V(0).Infof("wait for %d seconds ...", i+1) time.Sleep(time.Duration(i+1) * time.Second) } } if err != nil { - glog.Errorf("failed to talk to filer %v: %v", masterGrpcAddresses, err) + glog.Errorf("failed to talk to filer %v: %v", masters, err) return } diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index e393e5894..2603260a2 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -70,35 +70,30 @@ func getParentInode(mountDir string) (uint64, error) { func RunMount(option *MountOptions, umask os.FileMode) bool { - filers := strings.Split(*option.filer, ",") - // parse filer grpc address - filerGrpcAddresses, err := pb.ParseServersToGrpcAddresses(filers) - if err != nil { - glog.V(0).Infof("ParseFilerGrpcAddress: %v", err) - return true - } + filerAddresses := pb.ServerAddresses(*option.filer).ToAddresses() util.LoadConfiguration("security", false) // try to connect to filer, filerBucketsPath may be useful later grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") var cipher bool + var err error for i := 0; i < 10; i++ { - err = pb.WithOneOfGrpcFilerClients(filerGrpcAddresses, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = pb.WithOneOfGrpcFilerClients(filerAddresses, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { - return fmt.Errorf("get filer grpc address %v configuration: %v", filerGrpcAddresses, err) + return fmt.Errorf("get filer grpc address %v configuration: %v", filerAddresses, err) } cipher = resp.Cipher return nil }) if err != nil { - glog.V(0).Infof("failed to talk to filer %v: %v", filerGrpcAddresses, err) + glog.V(0).Infof("failed to talk to filer %v: %v", filerAddresses, err) glog.V(0).Infof("wait for %d seconds ...", i+1) time.Sleep(time.Duration(i+1) * time.Second) } } if err != nil { - glog.Errorf("failed to talk to filer %v: %v", filerGrpcAddresses, err) + glog.Errorf("failed to talk to filer %v: %v", filerAddresses, err) return true } @@ -206,8 +201,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { seaweedFileSystem := filesys.NewSeaweedFileSystem(&filesys.Option{ MountDirectory: dir, - FilerAddresses: filers, - FilerGrpcAddresses: filerGrpcAddresses, + FilerAddresses: filerAddresses, GrpcDialOption: grpcDialOption, FilerMountRootPath: mountRoot, Collection: *option.collection, diff --git a/weed/command/msg_broker.go b/weed/command/msg_broker.go index 403bbe317..61517ab39 100644 --- a/weed/command/msg_broker.go +++ b/weed/command/msg_broker.go @@ -62,35 +62,31 @@ func (msgBrokerOpt *MessageBrokerOptions) startQueueServer() bool { grace.SetupProfiling(*messageBrokerStandaloneOptions.cpuprofile, *messageBrokerStandaloneOptions.memprofile) - filerGrpcAddress, err := pb.ParseServerToGrpcAddress(*msgBrokerOpt.filer) - if err != nil { - glog.Fatal(err) - return false - } + filerAddress := pb.ServerAddress(*msgBrokerOpt.filer) grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.msg_broker") cipher := false for { - err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err := pb.WithGrpcFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { - return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err) + return fmt.Errorf("get filer %s configuration: %v", filerAddress, err) } cipher = resp.Cipher return nil }) if err != nil { - glog.V(0).Infof("wait to connect to filer %s grpc address %s", *msgBrokerOpt.filer, filerGrpcAddress) + glog.V(0).Infof("wait to connect to filer %s grpc address %s", *msgBrokerOpt.filer, filerAddress.ToGrpcAddress()) time.Sleep(time.Second) } else { - glog.V(0).Infof("connected to filer %s grpc address %s", *msgBrokerOpt.filer, filerGrpcAddress) + glog.V(0).Infof("connected to filer %s grpc address %s", *msgBrokerOpt.filer, filerAddress.ToGrpcAddress()) break } } qs, err := broker.NewMessageBroker(&broker.MessageBrokerOption{ - Filers: []string{*msgBrokerOpt.filer}, + Filers: []pb.ServerAddress{filerAddress}, DefaultReplication: "", MaxMB: 0, Ip: *msgBrokerOpt.ip, diff --git a/weed/command/s3.go b/weed/command/s3.go index c8292a7d5..f2c6c0769 100644 --- a/weed/command/s3.go +++ b/weed/command/s3.go @@ -137,11 +137,7 @@ func runS3(cmd *Command, args []string) bool { func (s3opt *S3Options) startS3Server() bool { - filerGrpcAddress, err := pb.ParseServerToGrpcAddress(*s3opt.filer) - if err != nil { - glog.Fatal(err) - return false - } + filerAddress := pb.ServerAddress(*s3opt.filer) filerBucketsPath := "/buckets" @@ -152,10 +148,10 @@ func (s3opt *S3Options) startS3Server() bool { var metricsIntervalSec int for { - err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err := pb.WithGrpcFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { - return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err) + return fmt.Errorf("get filer %s configuration: %v", filerAddress, err) } filerBucketsPath = resp.DirBuckets metricsAddress, metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSec) @@ -163,10 +159,10 @@ func (s3opt *S3Options) startS3Server() bool { return nil }) if err != nil { - glog.V(0).Infof("wait to connect to filer %s grpc address %s", *s3opt.filer, filerGrpcAddress) + glog.V(0).Infof("wait to connect to filer %s grpc address %s", *s3opt.filer, filerAddress.ToGrpcAddress()) time.Sleep(time.Second) } else { - glog.V(0).Infof("connected to filer %s grpc address %s", *s3opt.filer, filerGrpcAddress) + glog.V(0).Infof("connected to filer %s grpc address %s", *s3opt.filer, filerAddress.ToGrpcAddress()) break } } @@ -176,9 +172,8 @@ func (s3opt *S3Options) startS3Server() bool { router := mux.NewRouter().SkipClean(true) _, s3ApiServer_err := s3api.NewS3ApiServer(router, &s3api.S3ApiServerOption{ - Filer: *s3opt.filer, + Filer: filerAddress, Port: *s3opt.port, - FilerGrpcAddress: filerGrpcAddress, Config: *s3opt.config, DomainName: *s3opt.domainName, BucketsPath: filerBucketsPath, diff --git a/weed/command/server.go b/weed/command/server.go index b45ea8f4a..e498c39b4 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -2,6 +2,7 @@ package command import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/util/grace" "net/http" "os" @@ -167,21 +168,16 @@ func runServer(cmd *Command, args []string) bool { *isStartingFiler = true } - if *isStartingMasterServer { - _, peerList := checkPeers(*serverIp, *masterOptions.port, *masterOptions.peers) - peers := strings.Join(peerList, ",") - masterOptions.peers = &peers - } - // ip address masterOptions.ip = serverIp masterOptions.ipBind = serverBindIp - filerOptions.masters = masterOptions.peers + _, masters := checkPeers(*masterOptions.ip, *masterOptions.port, *masterOptions.portGrpc, *masterOptions.peers) + filerOptions.masters = masters filerOptions.ip = serverIp filerOptions.bindIp = serverBindIp serverOptions.v.ip = serverIp serverOptions.v.bindIp = serverBindIp - serverOptions.v.masters = masterOptions.peers + serverOptions.v.masters = masters serverOptions.v.idleConnectionTimeout = serverTimeout serverOptions.v.dataCenter = serverDataCenter serverOptions.v.rack = serverRack @@ -197,7 +193,7 @@ func runServer(cmd *Command, args []string) bool { filerOptions.disableHttp = serverDisableHttp masterOptions.disableHttp = serverDisableHttp - filerAddress := util.JoinHostPort(*serverIp, *filerOptions.port) + filerAddress := string(pb.NewServerAddress(*serverIp, *filerOptions.port, *filerOptions.portGrpc)) s3Options.filer = &filerAddress webdavOptions.filer = &filerAddress msgBrokerOptions.filer = &filerAddress diff --git a/weed/command/shell.go b/weed/command/shell.go index 4a9f4b027..93bb69522 100644 --- a/weed/command/shell.go +++ b/weed/command/shell.go @@ -2,6 +2,7 @@ package command import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/shell" @@ -53,13 +54,7 @@ func runShell(command *Command, args []string) bool { fmt.Printf("master: %s filer: %s\n", *shellOptions.Masters, *shellInitialFiler) - var err error - shellOptions.FilerHost, shellOptions.FilerPort, err = util.ParseHostPort(*shellInitialFiler) - shellOptions.FilerAddress = *shellInitialFiler - if err != nil { - fmt.Printf("failed to parse filer %s: %v\n", *shellInitialFiler, err) - return false - } + shellOptions.FilerAddress = pb.ServerAddress(*shellInitialFiler) shellOptions.Directory = "/" shell.RunShell(shellOptions) diff --git a/weed/command/upload.go b/weed/command/upload.go index 9ae1befab..f46e70cb1 100644 --- a/weed/command/upload.go +++ b/weed/command/upload.go @@ -71,7 +71,7 @@ func runUpload(cmd *Command, args []string) bool { util.LoadConfiguration("security", false) grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") - defaultReplication, err := readMasterConfiguration(grpcDialOption, *upload.master) + defaultReplication, err := readMasterConfiguration(grpcDialOption, pb.ServerAddress(*upload.master)) if err != nil { fmt.Printf("upload: %v", err) return false @@ -96,7 +96,7 @@ func runUpload(cmd *Command, args []string) bool { if e != nil { return e } - results, e := operation.SubmitFiles(func() string { return *upload.master }, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl) + results, e := operation.SubmitFiles(func() pb.ServerAddress { return pb.ServerAddress(*upload.master) }, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl) bytes, _ := json.Marshal(results) fmt.Println(string(bytes)) if e != nil { @@ -118,7 +118,7 @@ func runUpload(cmd *Command, args []string) bool { fmt.Println(e.Error()) return false } - results, err := operation.SubmitFiles(func() string { return *upload.master }, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl) + results, err := operation.SubmitFiles(func() pb.ServerAddress { return pb.ServerAddress(*upload.master) }, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl) if err != nil { fmt.Println(err.Error()) return false @@ -129,7 +129,7 @@ func runUpload(cmd *Command, args []string) bool { return true } -func readMasterConfiguration(grpcDialOption grpc.DialOption, masterAddress string) (replication string, err error) { +func readMasterConfiguration(grpcDialOption grpc.DialOption, masterAddress pb.ServerAddress) (replication string, err error) { err = pb.WithMasterClient(masterAddress, grpcDialOption, func(client master_pb.SeaweedClient) error { resp, err := client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) if err != nil { diff --git a/weed/command/volume.go b/weed/command/volume.go index f5ec11724..15e88d65e 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -44,7 +44,8 @@ type VolumeServerOptions struct { ip *string publicUrl *string bindIp *string - masters *string + mastersString *string + masters []pb.ServerAddress idleConnectionTimeout *int dataCenter *string rack *string @@ -74,7 +75,7 @@ func init() { v.ip = cmdVolume.Flag.String("ip", util.DetectedHostAddress(), "ip or server name, also used as identifier") v.publicUrl = cmdVolume.Flag.String("publicUrl", "", "Publicly accessible address") v.bindIp = cmdVolume.Flag.String("ip.bind", "", "ip address to bind to") - v.masters = cmdVolume.Flag.String("mserver", "localhost:9333", "comma-separated master servers") + v.mastersString = cmdVolume.Flag.String("mserver", "localhost:9333", "comma-separated master servers") v.preStopSeconds = cmdVolume.Flag.Int("preStopSeconds", 10, "number of seconds between stop send heartbeats and stop volume server") // v.pulseSeconds = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than or equal to the master's setting") v.idleConnectionTimeout = cmdVolume.Flag.Int("idleTimeout", 30, "connection idle seconds") @@ -125,6 +126,7 @@ func runVolume(cmd *Command, args []string) bool { go stats_collect.StartMetricsServer(*v.metricsHttpPort) minFreeSpaces := util.MustParseMinFreeSpace(*minFreeSpace, *minFreeSpacePercent) + v.masters = pb.ServerAddresses(*v.mastersString).ToAddresses() v.startVolumeServer(*volumeFolders, *maxVolumeCounts, *volumeWhiteListOption, minFreeSpaces) return true @@ -223,14 +225,12 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v volumeNeedleMapKind = storage.NeedleMapLevelDbLarge } - masters := *v.masters - volumeServer := weed_server.NewVolumeServer(volumeMux, publicVolumeMux, - *v.ip, *v.port, *v.publicUrl, + *v.ip, *v.port, *v.portGrpc, *v.publicUrl, v.folders, v.folderMaxLimits, minFreeSpaces, diskTypes, *v.idxFolder, volumeNeedleMapKind, - strings.Split(masters, ","), 5, *v.dataCenter, *v.rack, + v.masters, 5, *v.dataCenter, *v.rack, v.whiteList, *v.fixJpgOrientation, *v.readMode, *v.compactionMBPerSecond, @@ -375,7 +375,7 @@ func (v VolumeServerOptions) startClusterHttpService(handler http.Handler) httpd } func (v VolumeServerOptions) startTcpService(volumeServer *weed_server.VolumeServer) { - listeningAddress := util.JoinHostPort(*v.bindIp,*v.port+20000) + listeningAddress := util.JoinHostPort(*v.bindIp, *v.port+20000) glog.V(0).Infoln("Start Seaweed volume server", util.Version(), "tcp at", listeningAddress) listener, e := util.NewListener(listeningAddress, 0) if e != nil { diff --git a/weed/command/webdav.go b/weed/command/webdav.go index 781ea1e36..a7062d8cd 100644 --- a/weed/command/webdav.go +++ b/weed/command/webdav.go @@ -78,37 +78,32 @@ func (wo *WebDavOption) startWebDav() bool { } // parse filer grpc address - filerGrpcAddress, err := pb.ParseServerToGrpcAddress(*wo.filer) - if err != nil { - glog.Fatal(err) - return false - } + filerAddress := pb.ServerAddress(*wo.filer) grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") var cipher bool // connect to filer for { - err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err := pb.WithGrpcFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { - return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err) + return fmt.Errorf("get filer %s configuration: %v", filerAddress, err) } cipher = resp.Cipher return nil }) if err != nil { - glog.V(0).Infof("wait to connect to filer %s grpc address %s", *wo.filer, filerGrpcAddress) + glog.V(0).Infof("wait to connect to filer %s grpc address %s", *wo.filer, filerAddress.ToGrpcAddress()) time.Sleep(time.Second) } else { - glog.V(0).Infof("connected to filer %s grpc address %s", *wo.filer, filerGrpcAddress) + glog.V(0).Infof("connected to filer %s grpc address %s", *wo.filer, filerAddress.ToGrpcAddress()) break } } ws, webdavServer_err := weed_server.NewWebDavServer(&weed_server.WebDavOption{ - Filer: *wo.filer, - FilerGrpcAddress: filerGrpcAddress, + Filer: filerAddress, GrpcDialOption: grpcDialOption, Collection: *wo.collection, Replication: *wo.replication, |
