aboutsummaryrefslogtreecommitdiff
path: root/weed/command
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-09-12 22:47:52 -0700
committerChris Lu <chris.lu@gmail.com>2021-09-12 22:47:52 -0700
commite5fc35ed0c970fea060a5b3b7a3f5efb5af425d6 (patch)
tree3ad0436940263a24ac46d38a60dd1e35b2c1cdfe /weed/command
parent2c9d4c8f43c1e95c75fc332ca83d19e33e5da3ac (diff)
downloadseaweedfs-e5fc35ed0c970fea060a5b3b7a3f5efb5af425d6.tar.xz
seaweedfs-e5fc35ed0c970fea060a5b3b7a3f5efb5af425d6.zip
change server address from string to a type
Diffstat (limited to 'weed/command')
-rw-r--r--weed/command/backup.go5
-rw-r--r--weed/command/benchmark.go4
-rw-r--r--weed/command/download.go3
-rw-r--r--weed/command/filer.go18
-rw-r--r--weed/command/filer_backup.go4
-rw-r--r--weed/command/filer_cat.go4
-rw-r--r--weed/command/filer_copy.go67
-rw-r--r--weed/command/filer_meta_backup.go4
-rw-r--r--weed/command/filer_meta_tail.go2
-rw-r--r--weed/command/filer_remote_sync.go8
-rw-r--r--weed/command/filer_remote_sync_buckets.go6
-rw-r--r--weed/command/filer_remote_sync_dir.go8
-rw-r--r--weed/command/filer_sync.go18
-rw-r--r--weed/command/iam.go23
-rw-r--r--weed/command/master.go24
-rw-r--r--weed/command/master_follower.go17
-rw-r--r--weed/command/mount_std.go20
-rw-r--r--weed/command/msg_broker.go16
-rw-r--r--weed/command/s3.go17
-rw-r--r--weed/command/server.go14
-rw-r--r--weed/command/shell.go9
-rw-r--r--weed/command/upload.go8
-rw-r--r--weed/command/volume.go14
-rw-r--r--weed/command/webdav.go17
24 files changed, 142 insertions, 188 deletions
diff --git a/weed/command/backup.go b/weed/command/backup.go
index 4c5a2d820..2279d0d1a 100644
--- a/weed/command/backup.go
+++ b/weed/command/backup.go
@@ -2,6 +2,7 @@ package command
import (
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
@@ -72,12 +73,12 @@ func runBackup(cmd *Command, args []string) bool {
vid := needle.VolumeId(*s.volumeId)
// find volume location, replication, ttl info
- lookup, err := operation.LookupVolumeId(func() string { return *s.master }, grpcDialOption, vid.String())
+ lookup, err := operation.LookupVolumeId(func() pb.ServerAddress { return pb.ServerAddress(*s.master) }, grpcDialOption, vid.String())
if err != nil {
fmt.Printf("Error looking up volume %d: %v\n", vid, err)
return true
}
- volumeServer := lookup.Locations[0].Url
+ volumeServer := lookup.Locations[0].ServerAddress()
stats, err := operation.GetVolumeSyncStatus(volumeServer, grpcDialOption, uint32(vid))
if err != nil {
diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go
index f0c8f6139..af5919adf 100644
--- a/weed/command/benchmark.go
+++ b/weed/command/benchmark.go
@@ -3,6 +3,7 @@ package command
import (
"bufio"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"io"
"math"
"math/rand"
@@ -10,7 +11,6 @@ import (
"runtime"
"runtime/pprof"
"sort"
- "strings"
"sync"
"time"
@@ -129,7 +129,7 @@ func runBenchmark(cmd *Command, args []string) bool {
defer pprof.StopCPUProfile()
}
- b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "client", "", 0, "", strings.Split(*b.masters, ","))
+ b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "client", "", "", pb.ServerAddresses(*b.masters).ToAddresses())
go b.masterClient.KeepConnectedToMaster()
b.masterClient.WaitUntilConnected()
diff --git a/weed/command/download.go b/weed/command/download.go
index a64d3f237..1d8a72d31 100644
--- a/weed/command/download.go
+++ b/weed/command/download.go
@@ -2,6 +2,7 @@ package command
import (
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/security"
"google.golang.org/grpc"
"io"
@@ -49,7 +50,7 @@ func runDownload(cmd *Command, args []string) bool {
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
for _, fid := range args {
- if e := downloadToFile(func() string { return *d.server }, grpcDialOption, fid, util.ResolvePath(*d.dir)); e != nil {
+ if e := downloadToFile(func() pb.ServerAddress { return pb.ServerAddress(*d.server) }, grpcDialOption, fid, util.ResolvePath(*d.dir)); e != nil {
fmt.Println("Download Error: ", fid, e)
}
}
diff --git a/weed/command/filer.go b/weed/command/filer.go
index 63dd53f9e..96802a1cb 100644
--- a/weed/command/filer.go
+++ b/weed/command/filer.go
@@ -29,7 +29,8 @@ var (
)
type FilerOptions struct {
- masters *string
+ masters []pb.ServerAddress
+ mastersString *string
ip *string
bindIp *string
port *int
@@ -56,7 +57,7 @@ type FilerOptions struct {
func init() {
cmdFiler.Run = runFiler // break init cycle
- f.masters = cmdFiler.Flag.String("master", "localhost:9333", "comma-separated master servers")
+ f.mastersString = cmdFiler.Flag.String("master", "localhost:9333", "comma-separated master servers")
f.collection = cmdFiler.Flag.String("collection", "", "all data will be stored in this default collection")
f.ip = cmdFiler.Flag.String("ip", util.DetectedHostAddress(), "filer server http listen ip address")
f.bindIp = cmdFiler.Flag.String("ip.bind", "", "ip address to bind to")
@@ -157,13 +158,15 @@ func runFiler(cmd *Command, args []string) bool {
if *filerStartIam {
filerIamOptions.filer = &filerAddress
- filerIamOptions.masters = f.masters
+ filerIamOptions.masters = f.mastersString
go func() {
time.Sleep(startDelay * time.Second)
filerIamOptions.startIamServer()
}()
}
+ f.masters = pb.ServerAddresses(*f.mastersString).ToAddresses()
+
f.startFiler()
return true
@@ -185,8 +188,10 @@ func (fo *FilerOptions) startFiler() {
peers = strings.Split(*fo.peers, ",")
}
+ filerAddress := pb.NewServerAddress(*fo.ip, *fo.port, *fo.portGrpc)
+
fs, nfs_err := weed_server.NewFilerServer(defaultMux, publicVolumeMux, &weed_server.FilerOption{
- Masters: strings.Split(*fo.masters, ","),
+ Masters: fo.masters,
Collection: *fo.collection,
DefaultReplication: *fo.defaultReplicaPlacement,
DisableDirListing: *fo.disableDirListing,
@@ -196,11 +201,10 @@ func (fo *FilerOptions) startFiler() {
Rack: *fo.rack,
DefaultLevelDbDir: defaultLevelDbDirectory,
DisableHttp: *fo.disableHttp,
- Host: *fo.ip,
- Port: uint32(*fo.port),
+ Host: filerAddress,
Cipher: *fo.cipher,
SaveToFilerLimit: int64(*fo.saveToFilerLimit),
- Filers: peers,
+ Filers: pb.FromAddressStrings(peers),
ConcurrentUploadLimit: int64(*fo.concurrentUploadLimitMB) * 1024 * 1024,
})
if nfs_err != nil {
diff --git a/weed/command/filer_backup.go b/weed/command/filer_backup.go
index 5b6409187..9e5041531 100644
--- a/weed/command/filer_backup.go
+++ b/weed/command/filer_backup.go
@@ -78,7 +78,7 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti
return fmt.Errorf("no data sink configured in replication.toml")
}
- sourceFiler := *backupOption.filer
+ sourceFiler := pb.ServerAddress(*backupOption.filer)
sourcePath := *backupOption.path
timeAgo := *backupOption.timeAgo
targetPath := dataSink.GetSinkToDirectory()
@@ -102,7 +102,7 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti
// create filer sink
filerSource := &source.FilerSource{}
- filerSource.DoInitialize(sourceFiler, pb.ServerToGrpcAddress(sourceFiler), sourcePath, *backupOption.proxyByFiler)
+ filerSource.DoInitialize(sourceFiler.ToHttpAddress(), sourceFiler.ToGrpcAddress(), sourcePath, *backupOption.proxyByFiler)
dataSink.SetSourceFiler(filerSource)
processEventFn := genProcessFunction(sourcePath, targetPath, dataSink, debug)
diff --git a/weed/command/filer_cat.go b/weed/command/filer_cat.go
index 09f5e97fe..71c3a48d6 100644
--- a/weed/command/filer_cat.go
+++ b/weed/command/filer_cat.go
@@ -23,7 +23,7 @@ var (
type FilerCatOptions struct {
grpcDialOption grpc.DialOption
- filerAddress string
+ filerAddress pb.ServerAddress
filerClient filer_pb.SeaweedFilerClient
output *string
}
@@ -78,7 +78,7 @@ func runFilerCat(cmd *Command, args []string) bool {
return false
}
- filerCat.filerAddress = filerUrl.Host
+ filerCat.filerAddress = pb.ServerAddress(filerUrl.Host)
filerCat.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
dir, name := util.FullPath(urlPath).DirAndName()
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go
index 05aa96292..0feae63b3 100644
--- a/weed/command/filer_copy.go
+++ b/weed/command/filer_copy.go
@@ -7,7 +7,6 @@ import (
"io"
"io/ioutil"
"net/http"
- "net/url"
"os"
"path/filepath"
"strconv"
@@ -92,35 +91,21 @@ func runCopy(cmd *Command, args []string) bool {
filerDestination := args[len(args)-1]
fileOrDirs := args[0 : len(args)-1]
- filerUrl, err := url.Parse(filerDestination)
+ filerAddress, urlPath, err := pb.ParseUrl(filerDestination)
if err != nil {
fmt.Printf("The last argument should be a URL on filer: %v\n", err)
return false
}
- urlPath := filerUrl.Path
if !strings.HasSuffix(urlPath, "/") {
fmt.Printf("The last argument should be a folder and end with \"/\"\n")
return false
}
- if filerUrl.Port() == "" {
- fmt.Printf("The filer port should be specified.\n")
- return false
- }
-
- filerPort, parseErr := strconv.ParseUint(filerUrl.Port(), 10, 64)
- if parseErr != nil {
- fmt.Printf("The filer port parse error: %v\n", parseErr)
- return false
- }
-
- filerGrpcPort := filerPort + 10000
- filerGrpcAddress := util.JoinHostPort(filerUrl.Hostname(), int(filerGrpcPort))
copy.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
- masters, collection, replication, dirBuckets, maxMB, cipher, err := readFilerConfiguration(copy.grpcDialOption, filerGrpcAddress)
+ masters, collection, replication, dirBuckets, maxMB, cipher, err := readFilerConfiguration(copy.grpcDialOption, filerAddress)
if err != nil {
- fmt.Printf("read from filer %s: %v\n", filerGrpcAddress, err)
+ fmt.Printf("read from filer %s: %v\n", filerAddress, err)
return false
}
if strings.HasPrefix(urlPath, dirBuckets+"/") {
@@ -174,9 +159,8 @@ func runCopy(cmd *Command, args []string) bool {
go func() {
defer waitGroup.Done()
worker := FileCopyWorker{
- options: &copy,
- filerHost: filerUrl.Host,
- filerGrpcAddress: filerGrpcAddress,
+ options: &copy,
+ filerAddress: filerAddress,
}
if err := worker.copyFiles(fileCopyTaskChan); err != nil {
fmt.Fprintf(os.Stderr, "copy file error: %v\n", err)
@@ -189,7 +173,7 @@ func runCopy(cmd *Command, args []string) bool {
return true
}
-func readFilerConfiguration(grpcDialOption grpc.DialOption, filerGrpcAddress string) (masters []string, collection, replication string, dirBuckets string, maxMB uint32, cipher bool, err error) {
+func readFilerConfiguration(grpcDialOption grpc.DialOption, filerGrpcAddress pb.ServerAddress) (masters []string, collection, replication string, dirBuckets string, maxMB uint32, cipher bool, err error) {
err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
@@ -241,9 +225,8 @@ func genFileCopyTask(fileOrDir string, destPath string, fileCopyTaskChan chan Fi
}
type FileCopyWorker struct {
- options *CopyOptions
- filerHost string
- filerGrpcAddress string
+ options *CopyOptions
+ filerAddress pb.ServerAddress
}
func (worker *FileCopyWorker) copyFiles(fileCopyTaskChan chan FileCopyTask) error {
@@ -321,7 +304,7 @@ func (worker *FileCopyWorker) checkExistingFileFirst(task FileCopyTask, f *os.Fi
return
}
- err = pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ err = pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.LookupDirectoryEntryRequest{
Directory: task.destinationUrlPath,
@@ -363,7 +346,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err
// assign a volume
err = util.Retry("assignVolume", func() error {
- return pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ return pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.AssignVolumeRequest{
Count: 1,
@@ -381,7 +364,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err
if assignResult.Error != "" {
return fmt.Errorf("assign volume failure %v: %v", request, assignResult.Error)
}
- if assignResult.Url == "" {
+ if assignResult.Location.Url == "" {
return fmt.Errorf("assign volume failure %v: %v", request, assignResult)
}
return nil
@@ -391,7 +374,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err
return fmt.Errorf("Failed to assign from %v: %v\n", worker.options.masters, err)
}
- targetUrl := "http://" + assignResult.Url + "/" + assignResult.FileId
+ targetUrl := "http://" + assignResult.Location.Url + "/" + assignResult.FileId
uploadOption := &operation.UploadOption{
UploadUrl: targetUrl,
Filename: fileName,
@@ -414,10 +397,10 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err
chunks = append(chunks, uploadResult.ToPbFileChunk(assignResult.FileId, 0))
- fmt.Printf("copied %s => http://%s%s%s\n", f.Name(), worker.filerHost, task.destinationUrlPath, fileName)
+ fmt.Printf("copied %s => http://%s%s%s\n", f.Name(), worker.filerAddress.ToHttpAddress(), task.destinationUrlPath, fileName)
}
- if err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ if err := pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.CreateEntryRequest{
Directory: task.destinationUrlPath,
Entry: &filer_pb.Entry{
@@ -443,7 +426,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err
}
return nil
}); err != nil {
- return fmt.Errorf("upload data %v to http://%s%s%s: %v\n", fileName, worker.filerHost, task.destinationUrlPath, fileName, err)
+ return fmt.Errorf("upload data %v to http://%s%s%s: %v\n", fileName, worker.filerAddress.ToHttpAddress(), task.destinationUrlPath, fileName, err)
}
return nil
@@ -474,7 +457,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
var assignResult *filer_pb.AssignVolumeResponse
var assignError error
err := util.Retry("assignVolume", func() error {
- return pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ return pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.AssignVolumeRequest{
Count: 1,
Replication: *worker.options.replication,
@@ -498,7 +481,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
fmt.Printf("Failed to assign from %v: %v\n", worker.options.masters, err)
}
- targetUrl := "http://" + assignResult.Url + "/" + assignResult.FileId
+ targetUrl := "http://" + assignResult.Location.Url + "/" + assignResult.FileId
if collection == "" {
collection = assignResult.Collection
}
@@ -508,7 +491,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
uploadOption := &operation.UploadOption{
UploadUrl: targetUrl,
- Filename: fileName+"-"+strconv.FormatInt(i+1, 10),
+ Filename: fileName + "-" + strconv.FormatInt(i+1, 10),
Cipher: worker.options.cipher,
IsInputCompressed: false,
MimeType: "",
@@ -542,8 +525,8 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
for _, chunk := range chunks {
fileIds = append(fileIds, chunk.FileId)
}
- operation.DeleteFiles(func() string {
- return copy.masters[0]
+ operation.DeleteFiles(func() pb.ServerAddress {
+ return pb.ServerAddress(copy.masters[0])
}, false, worker.options.grpcDialOption, fileIds)
return uploadError
}
@@ -553,7 +536,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
return fmt.Errorf("create manifest: %v", manifestErr)
}
- if err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ if err := pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.CreateEntryRequest{
Directory: task.destinationUrlPath,
Entry: &filer_pb.Entry{
@@ -579,10 +562,10 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
}
return nil
}); err != nil {
- return fmt.Errorf("upload data %v to http://%s%s%s: %v\n", fileName, worker.filerHost, task.destinationUrlPath, fileName, err)
+ return fmt.Errorf("upload data %v to http://%s%s%s: %v\n", fileName, worker.filerAddress.ToHttpAddress(), task.destinationUrlPath, fileName, err)
}
- fmt.Printf("copied %s => http://%s%s%s\n", f.Name(), worker.filerHost, task.destinationUrlPath, fileName)
+ fmt.Printf("copied %s => http://%s%s%s\n", f.Name(), worker.filerAddress.ToHttpAddress(), task.destinationUrlPath, fileName)
return nil
}
@@ -611,7 +594,7 @@ func (worker *FileCopyWorker) saveDataAsChunk(reader io.Reader, name string, off
var fileId, host string
var auth security.EncodedJwt
- if flushErr := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ if flushErr := pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
ctx := context.Background()
@@ -633,7 +616,7 @@ func (worker *FileCopyWorker) saveDataAsChunk(reader io.Reader, name string, off
return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
}
- fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
+ fileId, host, auth = resp.FileId, resp.Location.Url, security.EncodedJwt(resp.Auth)
collection, replication = resp.Collection, resp.Replication
return nil
diff --git a/weed/command/filer_meta_backup.go b/weed/command/filer_meta_backup.go
index 3757f63f1..0b8fa76c6 100644
--- a/weed/command/filer_meta_backup.go
+++ b/weed/command/filer_meta_backup.go
@@ -195,7 +195,7 @@ func (metaBackup *FilerMetaBackupOptions) streamMetadataBackup() error {
return metaBackup.setOffset(lastTime)
})
- return pb.FollowMetadata(*metaBackup.filerAddress, metaBackup.grpcDialOption, "meta_backup",
+ return pb.FollowMetadata(pb.ServerAddress(*metaBackup.filerAddress), metaBackup.grpcDialOption, "meta_backup",
*metaBackup.filerDirectory, nil, startTime.UnixNano(), 0, processEventFnWithOffset, false)
}
@@ -224,7 +224,7 @@ var _ = filer_pb.FilerClient(&FilerMetaBackupOptions{})
func (metaBackup *FilerMetaBackupOptions) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
- return pb.WithFilerClient(*metaBackup.filerAddress, metaBackup.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ return pb.WithFilerClient(pb.ServerAddress(*metaBackup.filerAddress), metaBackup.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
return fn(client)
})
diff --git a/weed/command/filer_meta_tail.go b/weed/command/filer_meta_tail.go
index 0363ae8d1..85a3eaf84 100644
--- a/weed/command/filer_meta_tail.go
+++ b/weed/command/filer_meta_tail.go
@@ -103,7 +103,7 @@ func runFilerMetaTail(cmd *Command, args []string) bool {
}
}
- tailErr := pb.FollowMetadata(*tailFiler, grpcDialOption, "tail",
+ tailErr := pb.FollowMetadata(pb.ServerAddress(*tailFiler), grpcDialOption, "tail",
*tailTarget, nil, time.Now().Add(-*tailStart).UnixNano(), 0,
func(resp *filer_pb.SubscribeMetadataResponse) error {
if !shouldPrint(resp) {
diff --git a/weed/command/filer_remote_sync.go b/weed/command/filer_remote_sync.go
index 3776ee4d9..857fbb0eb 100644
--- a/weed/command/filer_remote_sync.go
+++ b/weed/command/filer_remote_sync.go
@@ -33,7 +33,7 @@ type RemoteSyncOptions struct {
var _ = filer_pb.FilerClient(&RemoteSyncOptions{})
func (option *RemoteSyncOptions) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
- return pb.WithFilerClient(*option.filerAddress, option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ return pb.WithFilerClient(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
return fn(client)
})
}
@@ -90,12 +90,12 @@ func runFilerRemoteSynchronize(cmd *Command, args []string) bool {
remoteSyncOptions.grpcDialOption = grpcDialOption
dir := *remoteSyncOptions.dir
- filerAddress := *remoteSyncOptions.filerAddress
+ filerAddress := pb.ServerAddress(*remoteSyncOptions.filerAddress)
filerSource := &source.FilerSource{}
filerSource.DoInitialize(
- filerAddress,
- pb.ServerToGrpcAddress(filerAddress),
+ filerAddress.ToHttpAddress(),
+ filerAddress.ToGrpcAddress(),
"/", // does not matter
*remoteSyncOptions.readChunkFromFiler,
)
diff --git a/weed/command/filer_remote_sync_buckets.go b/weed/command/filer_remote_sync_buckets.go
index 70f9f49c1..73c8de1a9 100644
--- a/weed/command/filer_remote_sync_buckets.go
+++ b/weed/command/filer_remote_sync_buckets.go
@@ -32,12 +32,12 @@ func (option *RemoteSyncOptions) followBucketUpdatesAndUploadToRemote(filerSourc
processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error {
lastTime := time.Unix(0, lastTsNs)
glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, lastTime, float64(counter)/float64(3))
- return remote_storage.SetSyncOffset(option.grpcDialOption, *option.filerAddress, option.bucketsDir, lastTsNs)
+ return remote_storage.SetSyncOffset(option.grpcDialOption, pb.ServerAddress(*option.filerAddress), option.bucketsDir, lastTsNs)
})
lastOffsetTs := collectLastSyncOffset(option, option.bucketsDir)
- return pb.FollowMetadata(*option.filerAddress, option.grpcDialOption, "filer.remote.sync",
+ return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, "filer.remote.sync",
option.bucketsDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false)
}
@@ -357,7 +357,7 @@ func extractBucketPath(bucketsDir, dir string) (util.FullPath, bool) {
func (option *RemoteSyncOptions) collectRemoteStorageConf() (err error) {
- if mappings, err := filer.ReadMountMappings(option.grpcDialOption, *option.filerAddress); err != nil {
+ if mappings, err := filer.ReadMountMappings(option.grpcDialOption, pb.ServerAddress(*option.filerAddress)); err != nil {
return err
} else {
option.mappings = mappings
diff --git a/weed/command/filer_remote_sync_dir.go b/weed/command/filer_remote_sync_dir.go
index dc2e9a1fb..50f1e35cf 100644
--- a/weed/command/filer_remote_sync_dir.go
+++ b/weed/command/filer_remote_sync_dir.go
@@ -20,7 +20,7 @@ import (
func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *source.FilerSource, mountedDir string) error {
// read filer remote storage mount mappings
- _, _, remoteStorageMountLocation, remoteStorage, detectErr := filer.DetectMountInfo(option.grpcDialOption, *option.filerAddress, mountedDir)
+ _, _, remoteStorageMountLocation, remoteStorage, detectErr := filer.DetectMountInfo(option.grpcDialOption, pb.ServerAddress(*option.filerAddress), mountedDir)
if detectErr != nil {
return fmt.Errorf("read mount info: %v", detectErr)
}
@@ -33,12 +33,12 @@ func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *sour
processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error {
lastTime := time.Unix(0, lastTsNs)
glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, lastTime, float64(counter)/float64(3))
- return remote_storage.SetSyncOffset(option.grpcDialOption, *option.filerAddress, mountedDir, lastTsNs)
+ return remote_storage.SetSyncOffset(option.grpcDialOption, pb.ServerAddress(*option.filerAddress), mountedDir, lastTsNs)
})
lastOffsetTs := collectLastSyncOffset(option, mountedDir)
- return pb.FollowMetadata(*option.filerAddress, option.grpcDialOption, "filer.remote.sync",
+ return pb.FollowMetadata(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, "filer.remote.sync",
mountedDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false)
}
@@ -171,7 +171,7 @@ func collectLastSyncOffset(option *RemoteSyncOptions, mountedDir string) time.Ti
return time.Now()
}
- lastOffsetTsNs, err := remote_storage.GetSyncOffset(option.grpcDialOption, *option.filerAddress, mountedDir)
+ lastOffsetTsNs, err := remote_storage.GetSyncOffset(option.grpcDialOption, pb.ServerAddress(*option.filerAddress), mountedDir)
if mountedDirEntry != nil {
if err == nil && mountedDirEntry.Attributes.Crtime < lastOffsetTsNs/1000000 {
lastOffsetTs = time.Unix(0, lastOffsetTsNs)
diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go
index 33efdb2b7..20755dbe5 100644
--- a/weed/command/filer_sync.go
+++ b/weed/command/filer_sync.go
@@ -93,9 +93,11 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
grace.SetupProfiling(*syncCpuProfile, *syncMemProfile)
+ filerA := pb.ServerAddress(*syncOptions.filerA)
+ filerB := pb.ServerAddress(*syncOptions.filerB)
go func() {
for {
- err := doSubscribeFilerMetaChanges(grpcDialOption, *syncOptions.filerA, *syncOptions.aPath, *syncOptions.aProxyByFiler, *syncOptions.filerB,
+ err := doSubscribeFilerMetaChanges(grpcDialOption, filerA, *syncOptions.aPath, *syncOptions.aProxyByFiler, filerB,
*syncOptions.bPath, *syncOptions.bReplication, *syncOptions.bCollection, *syncOptions.bTtlSec, *syncOptions.bProxyByFiler, *syncOptions.bDiskType, *syncOptions.bDebug)
if err != nil {
glog.Errorf("sync from %s to %s: %v", *syncOptions.filerA, *syncOptions.filerB, err)
@@ -107,7 +109,7 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
if !*syncOptions.isActivePassive {
go func() {
for {
- err := doSubscribeFilerMetaChanges(grpcDialOption, *syncOptions.filerB, *syncOptions.bPath, *syncOptions.bProxyByFiler, *syncOptions.filerA,
+ err := doSubscribeFilerMetaChanges(grpcDialOption, filerB, *syncOptions.bPath, *syncOptions.bProxyByFiler, filerA,
*syncOptions.aPath, *syncOptions.aReplication, *syncOptions.aCollection, *syncOptions.aTtlSec, *syncOptions.aProxyByFiler, *syncOptions.aDiskType, *syncOptions.aDebug)
if err != nil {
glog.Errorf("sync from %s to %s: %v", *syncOptions.filerB, *syncOptions.filerA, err)
@@ -122,7 +124,7 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
return true
}
-func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, sourcePath string, sourceReadChunkFromFiler bool, targetFiler, targetPath string,
+func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string,
replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool) error {
// read source filer signature
@@ -147,9 +149,9 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so
// create filer sink
filerSource := &source.FilerSource{}
- filerSource.DoInitialize(sourceFiler, pb.ServerToGrpcAddress(sourceFiler), sourcePath, sourceReadChunkFromFiler)
+ filerSource.DoInitialize(sourceFiler.ToHttpAddress(), sourceFiler.ToGrpcAddress(), sourcePath, sourceReadChunkFromFiler)
filerSink := &filersink.FilerSink{}
- filerSink.DoInitialize(targetFiler, pb.ServerToGrpcAddress(targetFiler), targetPath, replicationStr, collection, ttlSec, diskType, grpcDialOption, sinkWriteChunkByFiler)
+ filerSink.DoInitialize(targetFiler.ToHttpAddress(), targetFiler.ToGrpcAddress(), targetPath, replicationStr, collection, ttlSec, diskType, grpcDialOption, sinkWriteChunkByFiler)
filerSink.SetSourceFiler(filerSource)
persistEventFn := genProcessFunction(sourcePath, targetPath, filerSink, debug)
@@ -170,7 +172,7 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so
return setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, lastTsNs)
})
- return pb.FollowMetadata(sourceFiler, grpcDialOption, "syncTo_"+targetFiler,
+ return pb.FollowMetadata(sourceFiler, grpcDialOption, "syncTo_"+string(targetFiler),
sourcePath, nil, sourceFilerOffsetTsNs, targetFilerSignature, processEventFnWithOffset, false)
}
@@ -179,7 +181,7 @@ const (
SyncKeyPrefix = "sync."
)
-func getOffset(grpcDialOption grpc.DialOption, filer string, signaturePrefix string, signature int32) (lastOffsetTsNs int64, readErr error) {
+func getOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32) (lastOffsetTsNs int64, readErr error) {
readErr = pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
syncKey := []byte(signaturePrefix + "____")
@@ -206,7 +208,7 @@ func getOffset(grpcDialOption grpc.DialOption, filer string, signaturePrefix str
}
-func setOffset(grpcDialOption grpc.DialOption, filer string, signaturePrefix string, signature int32, offsetTsNs int64) error {
+func setOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32, offsetTsNs int64) error {
return pb.WithFilerClient(filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
syncKey := []byte(signaturePrefix + "____")
diff --git a/weed/command/iam.go b/weed/command/iam.go
index ed4eea543..ebe9657f2 100644
--- a/weed/command/iam.go
+++ b/weed/command/iam.go
@@ -43,38 +43,35 @@ func runIam(cmd *Command, args []string) bool {
}
func (iamopt *IamOptions) startIamServer() bool {
- filerGrpcAddress, err := pb.ParseServerToGrpcAddress(*iamopt.filer)
- if err != nil {
- glog.Fatal(err)
- return false
- }
+ filerAddress := pb.ServerAddress(*iamopt.filer)
util.LoadConfiguration("security", false)
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
for {
- err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ err := pb.WithGrpcFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
- return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err)
+ return fmt.Errorf("get filer %s configuration: %v", filerAddress, err)
}
glog.V(0).Infof("IAM read filer configuration: %s", resp)
return nil
})
if err != nil {
- glog.V(0).Infof("wait to connect to filer %s grpc address %s", *iamopt.filer, filerGrpcAddress)
+ glog.V(0).Infof("wait to connect to filer %s grpc address %s", *iamopt.filer, filerAddress.ToGrpcAddress())
time.Sleep(time.Second)
} else {
- glog.V(0).Infof("connected to filer %s grpc address %s", *iamopt.filer, filerGrpcAddress)
+ glog.V(0).Infof("connected to filer %s grpc address %s", *iamopt.filer, filerAddress.ToGrpcAddress())
break
}
}
+ masters := pb.ServerAddresses(*iamopt.masters).ToAddresses()
router := mux.NewRouter().SkipClean(true)
_, iamApiServer_err := iamapi.NewIamApiServer(router, &iamapi.IamServerOption{
- Filer: *iamopt.filer,
- Port: *iamopt.port,
- FilerGrpcAddress: filerGrpcAddress,
- GrpcDialOption: grpcDialOption,
+ Masters: masters,
+ Filer: filerAddress,
+ Port: *iamopt.port,
+ GrpcDialOption: grpcDialOption,
})
glog.V(0).Info("NewIamApiServer created")
if iamApiServer_err != nil {
diff --git a/weed/command/master.go b/weed/command/master.go
index bf7a5d420..adc9055ea 100644
--- a/weed/command/master.go
+++ b/weed/command/master.go
@@ -113,7 +113,7 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
backend.LoadConfiguration(util.GetViper())
- myMasterAddress, peers := checkPeers(*masterOption.ip, *masterOption.port, *masterOption.peers)
+ myMasterAddress, peers := checkPeers(*masterOption.ip, *masterOption.port, *masterOption.portGrpc, *masterOption.peers)
r := mux.NewRouter()
ms := weed_server.NewMasterServer(r, masterOption.toMasterOption(masterWhiteList), peers)
@@ -162,16 +162,14 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
select {}
}
-func checkPeers(masterIp string, masterPort int, peers string) (masterAddress string, cleanedPeers []string) {
+func checkPeers(masterIp string, masterPort int, masterGrpcPort int, peers string) (masterAddress pb.ServerAddress, cleanedPeers []pb.ServerAddress) {
glog.V(0).Infof("current: %s:%d peers:%s", masterIp, masterPort, peers)
- masterAddress = util.JoinHostPort(masterIp, masterPort)
- if peers != "" {
- cleanedPeers = strings.Split(peers, ",")
- }
+ masterAddress = pb.NewServerAddress(masterIp, masterPort, masterGrpcPort)
+ cleanedPeers = pb.ServerAddresses(peers).ToAddresses()
hasSelf := false
for _, peer := range cleanedPeers {
- if peer == masterAddress {
+ if peer.ToHttpAddress() == masterAddress.ToHttpAddress() {
hasSelf = true
break
}
@@ -181,13 +179,15 @@ func checkPeers(masterIp string, masterPort int, peers string) (masterAddress st
cleanedPeers = append(cleanedPeers, masterAddress)
}
if len(cleanedPeers)%2 == 0 {
- glog.Fatalf("Only odd number of masters are supported!")
+ glog.Fatalf("Only odd number of masters are supported: %+v", cleanedPeers)
}
return
}
-func isTheFirstOne(self string, peers []string) bool {
- sort.Strings(peers)
+func isTheFirstOne(self pb.ServerAddress, peers []pb.ServerAddress) bool {
+ sort.Slice(peers, func(i, j int) bool {
+ return strings.Compare(string(peers[i]), string(peers[j])) < 0
+ })
if len(peers) <= 0 {
return true
}
@@ -195,9 +195,9 @@ func isTheFirstOne(self string, peers []string) bool {
}
func (m *MasterOptions) toMasterOption(whiteList []string) *weed_server.MasterOption {
+ masterAddress := pb.NewServerAddress(*m.ip, *m.port, *m.portGrpc)
return &weed_server.MasterOption{
- Host: *m.ip,
- Port: *m.port,
+ Master: masterAddress,
MetaFolder: *m.metaFolder,
VolumeSizeLimitMB: uint32(*m.volumeSizeLimitMB),
VolumePreallocate: *m.volumePreallocate,
diff --git a/weed/command/master_follower.go b/weed/command/master_follower.go
index cf7e45253..2bb9ff6d4 100644
--- a/weed/command/master_follower.go
+++ b/weed/command/master_follower.go
@@ -13,7 +13,6 @@ import (
"github.com/gorilla/mux"
"google.golang.org/grpc/reflection"
"net/http"
- "strings"
"time"
)
@@ -79,19 +78,15 @@ func runMasterFollower(cmd *Command, args []string) bool {
func startMasterFollower(masterOptions MasterOptions) {
// collect settings from main masters
- masters := strings.Split(*mf.peers, ",")
- masterGrpcAddresses, err := pb.ParseServersToGrpcAddresses(masters)
- if err != nil {
- glog.V(0).Infof("ParseFilerGrpcAddress: %v", err)
- return
- }
+ masters := pb.ServerAddresses(*mf.peers).ToAddresses()
+ var err error
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.master")
for i := 0; i < 10; i++ {
- err = pb.WithOneOfGrpcMasterClients(masterGrpcAddresses, grpcDialOption, func(client master_pb.SeaweedClient) error {
+ err = pb.WithOneOfGrpcMasterClients(masters, grpcDialOption, func(client master_pb.SeaweedClient) error {
resp, err := client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
if err != nil {
- return fmt.Errorf("get master grpc address %v configuration: %v", masterGrpcAddresses, err)
+ return fmt.Errorf("get master grpc address %v configuration: %v", masters, err)
}
masterOptions.defaultReplication = &resp.DefaultReplication
masterOptions.volumeSizeLimitMB = aws.Uint(uint(resp.VolumeSizeLimitMB))
@@ -99,13 +94,13 @@ func startMasterFollower(masterOptions MasterOptions) {
return nil
})
if err != nil {
- glog.V(0).Infof("failed to talk to filer %v: %v", masterGrpcAddresses, err)
+ glog.V(0).Infof("failed to talk to filer %v: %v", masters, err)
glog.V(0).Infof("wait for %d seconds ...", i+1)
time.Sleep(time.Duration(i+1) * time.Second)
}
}
if err != nil {
- glog.Errorf("failed to talk to filer %v: %v", masterGrpcAddresses, err)
+ glog.Errorf("failed to talk to filer %v: %v", masters, err)
return
}
diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go
index e393e5894..2603260a2 100644
--- a/weed/command/mount_std.go
+++ b/weed/command/mount_std.go
@@ -70,35 +70,30 @@ func getParentInode(mountDir string) (uint64, error) {
func RunMount(option *MountOptions, umask os.FileMode) bool {
- filers := strings.Split(*option.filer, ",")
- // parse filer grpc address
- filerGrpcAddresses, err := pb.ParseServersToGrpcAddresses(filers)
- if err != nil {
- glog.V(0).Infof("ParseFilerGrpcAddress: %v", err)
- return true
- }
+ filerAddresses := pb.ServerAddresses(*option.filer).ToAddresses()
util.LoadConfiguration("security", false)
// try to connect to filer, filerBucketsPath may be useful later
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
var cipher bool
+ var err error
for i := 0; i < 10; i++ {
- err = pb.WithOneOfGrpcFilerClients(filerGrpcAddresses, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ err = pb.WithOneOfGrpcFilerClients(filerAddresses, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
- return fmt.Errorf("get filer grpc address %v configuration: %v", filerGrpcAddresses, err)
+ return fmt.Errorf("get filer grpc address %v configuration: %v", filerAddresses, err)
}
cipher = resp.Cipher
return nil
})
if err != nil {
- glog.V(0).Infof("failed to talk to filer %v: %v", filerGrpcAddresses, err)
+ glog.V(0).Infof("failed to talk to filer %v: %v", filerAddresses, err)
glog.V(0).Infof("wait for %d seconds ...", i+1)
time.Sleep(time.Duration(i+1) * time.Second)
}
}
if err != nil {
- glog.Errorf("failed to talk to filer %v: %v", filerGrpcAddresses, err)
+ glog.Errorf("failed to talk to filer %v: %v", filerAddresses, err)
return true
}
@@ -206,8 +201,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
seaweedFileSystem := filesys.NewSeaweedFileSystem(&filesys.Option{
MountDirectory: dir,
- FilerAddresses: filers,
- FilerGrpcAddresses: filerGrpcAddresses,
+ FilerAddresses: filerAddresses,
GrpcDialOption: grpcDialOption,
FilerMountRootPath: mountRoot,
Collection: *option.collection,
diff --git a/weed/command/msg_broker.go b/weed/command/msg_broker.go
index 403bbe317..61517ab39 100644
--- a/weed/command/msg_broker.go
+++ b/weed/command/msg_broker.go
@@ -62,35 +62,31 @@ func (msgBrokerOpt *MessageBrokerOptions) startQueueServer() bool {
grace.SetupProfiling(*messageBrokerStandaloneOptions.cpuprofile, *messageBrokerStandaloneOptions.memprofile)
- filerGrpcAddress, err := pb.ParseServerToGrpcAddress(*msgBrokerOpt.filer)
- if err != nil {
- glog.Fatal(err)
- return false
- }
+ filerAddress := pb.ServerAddress(*msgBrokerOpt.filer)
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.msg_broker")
cipher := false
for {
- err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ err := pb.WithGrpcFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
- return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err)
+ return fmt.Errorf("get filer %s configuration: %v", filerAddress, err)
}
cipher = resp.Cipher
return nil
})
if err != nil {
- glog.V(0).Infof("wait to connect to filer %s grpc address %s", *msgBrokerOpt.filer, filerGrpcAddress)
+ glog.V(0).Infof("wait to connect to filer %s grpc address %s", *msgBrokerOpt.filer, filerAddress.ToGrpcAddress())
time.Sleep(time.Second)
} else {
- glog.V(0).Infof("connected to filer %s grpc address %s", *msgBrokerOpt.filer, filerGrpcAddress)
+ glog.V(0).Infof("connected to filer %s grpc address %s", *msgBrokerOpt.filer, filerAddress.ToGrpcAddress())
break
}
}
qs, err := broker.NewMessageBroker(&broker.MessageBrokerOption{
- Filers: []string{*msgBrokerOpt.filer},
+ Filers: []pb.ServerAddress{filerAddress},
DefaultReplication: "",
MaxMB: 0,
Ip: *msgBrokerOpt.ip,
diff --git a/weed/command/s3.go b/weed/command/s3.go
index c8292a7d5..f2c6c0769 100644
--- a/weed/command/s3.go
+++ b/weed/command/s3.go
@@ -137,11 +137,7 @@ func runS3(cmd *Command, args []string) bool {
func (s3opt *S3Options) startS3Server() bool {
- filerGrpcAddress, err := pb.ParseServerToGrpcAddress(*s3opt.filer)
- if err != nil {
- glog.Fatal(err)
- return false
- }
+ filerAddress := pb.ServerAddress(*s3opt.filer)
filerBucketsPath := "/buckets"
@@ -152,10 +148,10 @@ func (s3opt *S3Options) startS3Server() bool {
var metricsIntervalSec int
for {
- err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ err := pb.WithGrpcFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
- return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err)
+ return fmt.Errorf("get filer %s configuration: %v", filerAddress, err)
}
filerBucketsPath = resp.DirBuckets
metricsAddress, metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSec)
@@ -163,10 +159,10 @@ func (s3opt *S3Options) startS3Server() bool {
return nil
})
if err != nil {
- glog.V(0).Infof("wait to connect to filer %s grpc address %s", *s3opt.filer, filerGrpcAddress)
+ glog.V(0).Infof("wait to connect to filer %s grpc address %s", *s3opt.filer, filerAddress.ToGrpcAddress())
time.Sleep(time.Second)
} else {
- glog.V(0).Infof("connected to filer %s grpc address %s", *s3opt.filer, filerGrpcAddress)
+ glog.V(0).Infof("connected to filer %s grpc address %s", *s3opt.filer, filerAddress.ToGrpcAddress())
break
}
}
@@ -176,9 +172,8 @@ func (s3opt *S3Options) startS3Server() bool {
router := mux.NewRouter().SkipClean(true)
_, s3ApiServer_err := s3api.NewS3ApiServer(router, &s3api.S3ApiServerOption{
- Filer: *s3opt.filer,
+ Filer: filerAddress,
Port: *s3opt.port,
- FilerGrpcAddress: filerGrpcAddress,
Config: *s3opt.config,
DomainName: *s3opt.domainName,
BucketsPath: filerBucketsPath,
diff --git a/weed/command/server.go b/weed/command/server.go
index b45ea8f4a..e498c39b4 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -2,6 +2,7 @@ package command
import (
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/util/grace"
"net/http"
"os"
@@ -167,21 +168,16 @@ func runServer(cmd *Command, args []string) bool {
*isStartingFiler = true
}
- if *isStartingMasterServer {
- _, peerList := checkPeers(*serverIp, *masterOptions.port, *masterOptions.peers)
- peers := strings.Join(peerList, ",")
- masterOptions.peers = &peers
- }
-
// ip address
masterOptions.ip = serverIp
masterOptions.ipBind = serverBindIp
- filerOptions.masters = masterOptions.peers
+ _, masters := checkPeers(*masterOptions.ip, *masterOptions.port, *masterOptions.portGrpc, *masterOptions.peers)
+ filerOptions.masters = masters
filerOptions.ip = serverIp
filerOptions.bindIp = serverBindIp
serverOptions.v.ip = serverIp
serverOptions.v.bindIp = serverBindIp
- serverOptions.v.masters = masterOptions.peers
+ serverOptions.v.masters = masters
serverOptions.v.idleConnectionTimeout = serverTimeout
serverOptions.v.dataCenter = serverDataCenter
serverOptions.v.rack = serverRack
@@ -197,7 +193,7 @@ func runServer(cmd *Command, args []string) bool {
filerOptions.disableHttp = serverDisableHttp
masterOptions.disableHttp = serverDisableHttp
- filerAddress := util.JoinHostPort(*serverIp, *filerOptions.port)
+ filerAddress := string(pb.NewServerAddress(*serverIp, *filerOptions.port, *filerOptions.portGrpc))
s3Options.filer = &filerAddress
webdavOptions.filer = &filerAddress
msgBrokerOptions.filer = &filerAddress
diff --git a/weed/command/shell.go b/weed/command/shell.go
index 4a9f4b027..93bb69522 100644
--- a/weed/command/shell.go
+++ b/weed/command/shell.go
@@ -2,6 +2,7 @@ package command
import (
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/shell"
@@ -53,13 +54,7 @@ func runShell(command *Command, args []string) bool {
fmt.Printf("master: %s filer: %s\n", *shellOptions.Masters, *shellInitialFiler)
- var err error
- shellOptions.FilerHost, shellOptions.FilerPort, err = util.ParseHostPort(*shellInitialFiler)
- shellOptions.FilerAddress = *shellInitialFiler
- if err != nil {
- fmt.Printf("failed to parse filer %s: %v\n", *shellInitialFiler, err)
- return false
- }
+ shellOptions.FilerAddress = pb.ServerAddress(*shellInitialFiler)
shellOptions.Directory = "/"
shell.RunShell(shellOptions)
diff --git a/weed/command/upload.go b/weed/command/upload.go
index 9ae1befab..f46e70cb1 100644
--- a/weed/command/upload.go
+++ b/weed/command/upload.go
@@ -71,7 +71,7 @@ func runUpload(cmd *Command, args []string) bool {
util.LoadConfiguration("security", false)
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
- defaultReplication, err := readMasterConfiguration(grpcDialOption, *upload.master)
+ defaultReplication, err := readMasterConfiguration(grpcDialOption, pb.ServerAddress(*upload.master))
if err != nil {
fmt.Printf("upload: %v", err)
return false
@@ -96,7 +96,7 @@ func runUpload(cmd *Command, args []string) bool {
if e != nil {
return e
}
- results, e := operation.SubmitFiles(func() string { return *upload.master }, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl)
+ results, e := operation.SubmitFiles(func() pb.ServerAddress { return pb.ServerAddress(*upload.master) }, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl)
bytes, _ := json.Marshal(results)
fmt.Println(string(bytes))
if e != nil {
@@ -118,7 +118,7 @@ func runUpload(cmd *Command, args []string) bool {
fmt.Println(e.Error())
return false
}
- results, err := operation.SubmitFiles(func() string { return *upload.master }, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl)
+ results, err := operation.SubmitFiles(func() pb.ServerAddress { return pb.ServerAddress(*upload.master) }, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl)
if err != nil {
fmt.Println(err.Error())
return false
@@ -129,7 +129,7 @@ func runUpload(cmd *Command, args []string) bool {
return true
}
-func readMasterConfiguration(grpcDialOption grpc.DialOption, masterAddress string) (replication string, err error) {
+func readMasterConfiguration(grpcDialOption grpc.DialOption, masterAddress pb.ServerAddress) (replication string, err error) {
err = pb.WithMasterClient(masterAddress, grpcDialOption, func(client master_pb.SeaweedClient) error {
resp, err := client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
if err != nil {
diff --git a/weed/command/volume.go b/weed/command/volume.go
index f5ec11724..15e88d65e 100644
--- a/weed/command/volume.go
+++ b/weed/command/volume.go
@@ -44,7 +44,8 @@ type VolumeServerOptions struct {
ip *string
publicUrl *string
bindIp *string
- masters *string
+ mastersString *string
+ masters []pb.ServerAddress
idleConnectionTimeout *int
dataCenter *string
rack *string
@@ -74,7 +75,7 @@ func init() {
v.ip = cmdVolume.Flag.String("ip", util.DetectedHostAddress(), "ip or server name, also used as identifier")
v.publicUrl = cmdVolume.Flag.String("publicUrl", "", "Publicly accessible address")
v.bindIp = cmdVolume.Flag.String("ip.bind", "", "ip address to bind to")
- v.masters = cmdVolume.Flag.String("mserver", "localhost:9333", "comma-separated master servers")
+ v.mastersString = cmdVolume.Flag.String("mserver", "localhost:9333", "comma-separated master servers")
v.preStopSeconds = cmdVolume.Flag.Int("preStopSeconds", 10, "number of seconds between stop send heartbeats and stop volume server")
// v.pulseSeconds = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than or equal to the master's setting")
v.idleConnectionTimeout = cmdVolume.Flag.Int("idleTimeout", 30, "connection idle seconds")
@@ -125,6 +126,7 @@ func runVolume(cmd *Command, args []string) bool {
go stats_collect.StartMetricsServer(*v.metricsHttpPort)
minFreeSpaces := util.MustParseMinFreeSpace(*minFreeSpace, *minFreeSpacePercent)
+ v.masters = pb.ServerAddresses(*v.mastersString).ToAddresses()
v.startVolumeServer(*volumeFolders, *maxVolumeCounts, *volumeWhiteListOption, minFreeSpaces)
return true
@@ -223,14 +225,12 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
volumeNeedleMapKind = storage.NeedleMapLevelDbLarge
}
- masters := *v.masters
-
volumeServer := weed_server.NewVolumeServer(volumeMux, publicVolumeMux,
- *v.ip, *v.port, *v.publicUrl,
+ *v.ip, *v.port, *v.portGrpc, *v.publicUrl,
v.folders, v.folderMaxLimits, minFreeSpaces, diskTypes,
*v.idxFolder,
volumeNeedleMapKind,
- strings.Split(masters, ","), 5, *v.dataCenter, *v.rack,
+ v.masters, 5, *v.dataCenter, *v.rack,
v.whiteList,
*v.fixJpgOrientation, *v.readMode,
*v.compactionMBPerSecond,
@@ -375,7 +375,7 @@ func (v VolumeServerOptions) startClusterHttpService(handler http.Handler) httpd
}
func (v VolumeServerOptions) startTcpService(volumeServer *weed_server.VolumeServer) {
- listeningAddress := util.JoinHostPort(*v.bindIp,*v.port+20000)
+ listeningAddress := util.JoinHostPort(*v.bindIp, *v.port+20000)
glog.V(0).Infoln("Start Seaweed volume server", util.Version(), "tcp at", listeningAddress)
listener, e := util.NewListener(listeningAddress, 0)
if e != nil {
diff --git a/weed/command/webdav.go b/weed/command/webdav.go
index 781ea1e36..a7062d8cd 100644
--- a/weed/command/webdav.go
+++ b/weed/command/webdav.go
@@ -78,37 +78,32 @@ func (wo *WebDavOption) startWebDav() bool {
}
// parse filer grpc address
- filerGrpcAddress, err := pb.ParseServerToGrpcAddress(*wo.filer)
- if err != nil {
- glog.Fatal(err)
- return false
- }
+ filerAddress := pb.ServerAddress(*wo.filer)
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
var cipher bool
// connect to filer
for {
- err = pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ err := pb.WithGrpcFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
- return fmt.Errorf("get filer %s configuration: %v", filerGrpcAddress, err)
+ return fmt.Errorf("get filer %s configuration: %v", filerAddress, err)
}
cipher = resp.Cipher
return nil
})
if err != nil {
- glog.V(0).Infof("wait to connect to filer %s grpc address %s", *wo.filer, filerGrpcAddress)
+ glog.V(0).Infof("wait to connect to filer %s grpc address %s", *wo.filer, filerAddress.ToGrpcAddress())
time.Sleep(time.Second)
} else {
- glog.V(0).Infof("connected to filer %s grpc address %s", *wo.filer, filerGrpcAddress)
+ glog.V(0).Infof("connected to filer %s grpc address %s", *wo.filer, filerAddress.ToGrpcAddress())
break
}
}
ws, webdavServer_err := weed_server.NewWebDavServer(&weed_server.WebDavOption{
- Filer: *wo.filer,
- FilerGrpcAddress: filerGrpcAddress,
+ Filer: filerAddress,
GrpcDialOption: grpcDialOption,
Collection: *wo.collection,
Replication: *wo.replication,