diff options
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/filer_grpc_server.go | 15 | ||||
| -rw-r--r-- | weed/server/filer_grpc_server_remote.go | 5 | ||||
| -rw-r--r-- | weed/server/filer_server.go | 20 | ||||
| -rw-r--r-- | weed/server/master_grpc_server.go | 15 | ||||
| -rw-r--r-- | weed/server/master_grpc_server_collection.go | 2 | ||||
| -rw-r--r-- | weed/server/master_grpc_server_volume.go | 12 | ||||
| -rw-r--r-- | weed/server/master_server.go | 15 | ||||
| -rw-r--r-- | weed/server/master_server_handlers_admin.go | 13 | ||||
| -rw-r--r-- | weed/server/raft_server.go | 24 | ||||
| -rw-r--r-- | weed/server/raft_server_handlers.go | 9 | ||||
| -rw-r--r-- | weed/server/volume_grpc_client_to_master.go | 45 | ||||
| -rw-r--r-- | weed/server/volume_grpc_copy.go | 3 | ||||
| -rw-r--r-- | weed/server/volume_grpc_erasure_coding.go | 3 | ||||
| -rw-r--r-- | weed/server/volume_grpc_tail.go | 3 | ||||
| -rw-r--r-- | weed/server/volume_server.go | 11 | ||||
| -rw-r--r-- | weed/server/volume_server_handlers_ui.go | 3 | ||||
| -rw-r--r-- | weed/server/webdav_server.go | 9 |
17 files changed, 105 insertions, 102 deletions
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index 6a7df0f87..1df15d69f 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -3,6 +3,7 @@ package weed_server import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" "os" "path/filepath" "strconv" @@ -107,6 +108,7 @@ func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVol locs = append(locs, &filer_pb.Location{ Url: loc.Url, PublicUrl: loc.PublicUrl, + GrpcPort: uint32(loc.GrpcPort), }) } resp.LocationsMap[vidString] = &filer_pb.Locations{ @@ -306,10 +308,13 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol } return &filer_pb.AssignVolumeResponse{ - FileId: assignResult.Fid, - Count: int32(assignResult.Count), - Url: assignResult.Url, - PublicUrl: assignResult.PublicUrl, + FileId: assignResult.Fid, + Count: int32(assignResult.Count), + Location: &filer_pb.Location{ + Url: assignResult.Url, + PublicUrl: assignResult.PublicUrl, + GrpcPort: uint32(assignResult.GrpcPort), + }, Auth: string(assignResult.Auth), Collection: so.Collection, Replication: so.Replication, @@ -387,7 +392,7 @@ func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb. clusterId, _ := fs.filer.Store.KvGet(context.Background(), []byte("clusterId")) t := &filer_pb.GetFilerConfigurationResponse{ - Masters: fs.option.Masters, + Masters: pb.ToAddressStrings(fs.option.Masters), Collection: fs.option.Collection, Replication: fs.option.DefaultReplication, MaxMb: uint32(fs.option.MaxMB), diff --git a/weed/server/filer_grpc_server_remote.go b/weed/server/filer_grpc_server_remote.go index c47356a8e..9f986e6aa 100644 --- a/weed/server/filer_grpc_server_remote.go +++ b/weed/server/filer_grpc_server_remote.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/remote_pb" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" @@ -115,11 +116,13 @@ func (fs *FilerServer) DownloadToLocal(ctx context.Context, req *filer_pb.Downlo replicas = append(replicas, &volume_server_pb.FetchAndWriteNeedleRequest_Replica{ Url: r.Url, PublicUrl: r.PublicUrl, + GrpcPort: int32(r.GrpcPort), }) } // tell filer to tell volume server to download into needles - err = operation.WithVolumeServerClient(assignResult.Url, fs.grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + assignedServerAddress := pb.NewServerAddressWithGrpcPort(assignResult.Url, assignResult.GrpcPort) + err = operation.WithVolumeServerClient(assignedServerAddress, fs.grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, fetchAndWriteErr := volumeServerClient.FetchAndWriteNeedle(context.Background(), &volume_server_pb.FetchAndWriteNeedleRequest{ VolumeId: uint32(fileId.VolumeId), NeedleId: uint64(fileId.Key), diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 7e5e98660..b886bf641 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -46,7 +46,7 @@ import ( ) type FilerOption struct { - Masters []string + Masters []pb.ServerAddress Collection string DefaultReplication string DisableDirListing bool @@ -56,12 +56,11 @@ type FilerOption struct { Rack string DefaultLevelDbDir string DisableHttp bool - Host string - Port uint32 + Host pb.ServerAddress recursiveDelete bool Cipher bool SaveToFilerLimit int64 - Filers []string + Filers []pb.ServerAddress ConcurrentUploadLimit int64 } @@ -100,14 +99,14 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) glog.Fatal("master list is required!") } - fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, option.DataCenter, func() { + fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Collection, option.DefaultReplication, option.DataCenter, func() { fs.listenersCond.Broadcast() }) fs.filer.Cipher = option.Cipher fs.checkWithMaster() - go stats.LoopPushingMetric("filer", stats.SourceName(fs.option.Port), fs.metricsAddress, fs.metricsIntervalSec) + go stats.LoopPushingMetric("filer", string(fs.option.Host), fs.metricsAddress, fs.metricsIntervalSec) go fs.filer.KeepConnectedToMaster() v := util.GetViper() @@ -143,7 +142,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) readonlyMux.HandleFunc("/", fs.readonlyFilerHandler) } - fs.filer.AggregateFromPeers(util.JoinHostPort(option.Host, int(option.Port)), option.Filers) + fs.filer.AggregateFromPeers(option.Host, option.Filers) fs.filer.LoadBuckets() @@ -160,13 +159,6 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) func (fs *FilerServer) checkWithMaster() { - for _, master := range fs.option.Masters { - _, err := pb.ParseServerToGrpcAddress(master) - if err != nil { - glog.Fatalf("invalid master address %s: %v", master, err) - } - } - isConnected := false for !isConnected { for _, master := range fs.option.Masters { diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index 94e050259..194520f49 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -2,6 +2,7 @@ package weed_server import ( "context" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/storage/backend" "github.com/chrislusf/seaweedfs/weed/util" "net" @@ -70,7 +71,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ dcName, rackName := ms.Topo.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack) dc := ms.Topo.GetOrCreateDataCenter(dcName) rack := dc.GetOrCreateRack(rackName) - dn = rack.GetOrCreateDataNode(heartbeat.Ip, int(heartbeat.Port), heartbeat.PublicUrl, heartbeat.MaxVolumeCounts) + dn = rack.GetOrCreateDataNode(heartbeat.Ip, int(heartbeat.Port), int(heartbeat.GrpcPort), heartbeat.PublicUrl, heartbeat.MaxVolumeCounts) glog.V(0).Infof("added volume server %d: %v:%d", dn.Counter, heartbeat.GetIp(), heartbeat.GetPort()) if err := stream.Send(&master_pb.HeartbeatResponse{ VolumeSizeLimit: uint64(ms.option.VolumeSizeLimitMB) * 1024 * 1024, @@ -168,7 +169,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ return err } if err := stream.Send(&master_pb.HeartbeatResponse{ - Leader: newLeader, + Leader: string(newLeader), }); err != nil { glog.Warningf("SendHeartbeat.Send response to to %s:%d %v", dn.Ip, dn.Port, err) return err @@ -189,7 +190,7 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ return ms.informNewLeader(stream) } - peerAddress := findClientAddress(stream.Context(), req.GrpcPort) + peerAddress := pb.ServerAddress(req.ClientAddress) // buffer by 1 so we don't end up getting stuck writing to stopChan forever stopChan := make(chan bool, 1) @@ -241,15 +242,15 @@ func (ms *MasterServer) informNewLeader(stream master_pb.Seaweed_KeepConnectedSe return raft.NotLeaderError } if err := stream.Send(&master_pb.VolumeLocation{ - Leader: leader, + Leader: string(leader), }); err != nil { return err } return nil } -func (ms *MasterServer) addClient(clientType string, clientAddress string) (clientName string, messageChan chan *master_pb.VolumeLocation) { - clientName = clientType + "@" + clientAddress +func (ms *MasterServer) addClient(clientType string, clientAddress pb.ServerAddress) (clientName string, messageChan chan *master_pb.VolumeLocation) { + clientName = clientType + "@" + string(clientAddress) glog.V(0).Infof("+ client %v", clientName) // we buffer this because otherwise we end up in a potential deadlock where @@ -319,7 +320,7 @@ func (ms *MasterServer) GetMasterConfiguration(ctx context.Context, req *master_ DefaultReplication: ms.option.DefaultReplicaPlacement, VolumeSizeLimitMB: uint32(ms.option.VolumeSizeLimitMB), VolumePreallocate: ms.option.VolumePreallocate, - Leader: leader, + Leader: string(leader), } return resp, nil diff --git a/weed/server/master_grpc_server_collection.go b/weed/server/master_grpc_server_collection.go index b92d6bcbe..55f3faf8c 100644 --- a/weed/server/master_grpc_server_collection.go +++ b/weed/server/master_grpc_server_collection.go @@ -58,7 +58,7 @@ func (ms *MasterServer) doDeleteNormalCollection(collectionName string) error { } for _, server := range collection.ListVolumeServers() { - err := operation.WithVolumeServerClient(server.Url(), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(server.ServerAddress(), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { _, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{ Collection: collectionName, }) diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go index 3a92889d2..49ac455fe 100644 --- a/weed/server/master_grpc_server_volume.go +++ b/weed/server/master_grpc_server_volume.go @@ -150,17 +150,21 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest fid, count, dnList, err := ms.Topo.PickForWrite(req.Count, option) if err == nil { dn := dnList.Head() - var replicas []*master_pb.AssignResponse_Replica + var replicas []*master_pb.Location for _, r := range dnList.Rest() { - replicas = append(replicas, &master_pb.AssignResponse_Replica{ + replicas = append(replicas, &master_pb.Location{ Url: r.Url(), PublicUrl: r.PublicUrl, + GrpcPort: uint32(r.GrpcPort), }) } return &master_pb.AssignResponse{ Fid: fid, - Url: dn.Url(), - PublicUrl: dn.PublicUrl, + Location: &master_pb.Location{ + Url: dn.Url(), + PublicUrl: dn.PublicUrl, + GrpcPort: uint32(dn.GrpcPort), + }, Count: count, Auth: string(security.GenJwt(ms.guard.SigningKey, ms.guard.ExpiresAfterSec, fid)), Replicas: replicas, diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 7c78be379..8de01abf7 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -2,6 +2,7 @@ package weed_server import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" "net/http" "net/http/httputil" "net/url" @@ -32,8 +33,7 @@ const ( ) type MasterOption struct { - Host string - Port int + Master pb.ServerAddress MetaFolder string VolumeSizeLimitMB uint32 VolumePreallocate bool @@ -70,7 +70,7 @@ type MasterServer struct { adminLocks *AdminLocks } -func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *MasterServer { +func NewMasterServer(r *mux.Router, option *MasterOption, peers []pb.ServerAddress) *MasterServer { v := util.GetViper() signingKey := v.GetString("jwt.signing.key") @@ -102,7 +102,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste vgCh: make(chan *topology.VolumeGrowRequest, 1<<6), clientChans: make(map[string]chan *master_pb.VolumeLocation), grpcDialOption: grpcDialOption, - MasterClient: wdclient.NewMasterClient(grpcDialOption, "master", option.Host, 0, "", peers), + MasterClient: wdclient.NewMasterClient(grpcDialOption, "master", option.Master, "", peers), adminLocks: NewAdminLocks(), } ms.boundedLeaderChan = make(chan int, 16) @@ -224,14 +224,13 @@ func (ms *MasterServer) startAdminScripts() { scriptLines = append(scriptLines, "unlock") } - masterAddress := util.JoinHostPort(ms.option.Host, ms.option.Port) + masterAddress := string(ms.option.Master) var shellOptions shell.ShellOptions shellOptions.GrpcDialOption = security.LoadClientTLS(v, "grpc.master") shellOptions.Masters = &masterAddress - shellOptions.FilerHost, shellOptions.FilerPort, err = util.ParseHostPort(filerHostPort) - shellOptions.FilerAddress = filerHostPort + shellOptions.FilerAddress = pb.ServerAddress(filerHostPort) shellOptions.Directory = "/" if err != nil { glog.V(0).Infof("failed to parse master.filer.default = %s : %v\n", filerHostPort, err) @@ -299,7 +298,7 @@ func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer case "snowflake": var err error snowflakeId := v.GetInt(SequencerSnowflakeId) - seq, err = sequence.NewSnowflakeSequencer(util.JoinHostPort(option.Host, option.Port), snowflakeId) + seq, err = sequence.NewSnowflakeSequencer(string(option.Master), snowflakeId) if err != nil { glog.Error(err) seq = nil diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go index 4a86348d9..549ea86dc 100644 --- a/weed/server/master_server_handlers_admin.go +++ b/weed/server/master_server_handlers_admin.go @@ -3,6 +3,7 @@ package weed_server import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" "math/rand" "net/http" "strconv" @@ -26,7 +27,7 @@ func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.R return } for _, server := range collection.ListVolumeServers() { - err := operation.WithVolumeServerClient(server.Url(), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(server.ServerAddress(), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { _, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{ Collection: collection.Name, }) @@ -118,21 +119,15 @@ func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request) } } -func (ms *MasterServer) selfUrl(r *http.Request) string { - if r.Host != "" { - return r.Host - } - return "localhost:" + strconv.Itoa(ms.option.Port) -} func (ms *MasterServer) submitFromMasterServerHandler(w http.ResponseWriter, r *http.Request) { if ms.Topo.IsLeader() { - submitForClientHandler(w, r, func() string { return ms.selfUrl(r) }, ms.grpcDialOption) + submitForClientHandler(w, r, func() pb.ServerAddress { return ms.option.Master }, ms.grpcDialOption) } else { masterUrl, err := ms.Topo.Leader() if err != nil { writeJsonError(w, r, http.StatusInternalServerError, err) } else { - submitForClientHandler(w, r, func() string { return masterUrl }, ms.grpcDialOption) + submitForClientHandler(w, r, func() pb.ServerAddress { return masterUrl }, ms.grpcDialOption) } } } diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go index 85841e409..568bfc7b5 100644 --- a/weed/server/raft_server.go +++ b/weed/server/raft_server.go @@ -6,6 +6,7 @@ import ( "os" "path" "sort" + "strings" "time" "google.golang.org/grpc" @@ -19,10 +20,10 @@ import ( ) type RaftServer struct { - peers []string // initial peers to join with + peers []pb.ServerAddress // initial peers to join with raftServer raft.Server dataDir string - serverAddr string + serverAddr pb.ServerAddress topo *topology.Topology *raft.GrpcServer } @@ -51,7 +52,7 @@ func (s StateMachine) Recovery(data []byte) error { return nil } -func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, dataDir string, topo *topology.Topology, raftResumeState bool) (*RaftServer, error) { +func NewRaftServer(grpcDialOption grpc.DialOption, peers []pb.ServerAddress, serverAddr pb.ServerAddress, dataDir string, topo *topology.Topology, raftResumeState bool) (*RaftServer, error) { s := &RaftServer{ peers: peers, serverAddr: serverAddr, @@ -80,7 +81,7 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, d } stateMachine := StateMachine{topo: topo} - s.raftServer, err = raft.NewServer(s.serverAddr, s.dataDir, transporter, stateMachine, topo, "") + s.raftServer, err = raft.NewServer(string(s.serverAddr), s.dataDir, transporter, stateMachine, topo, "") if err != nil { glog.V(0).Infoln(err) return nil, err @@ -95,16 +96,17 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, d } for _, peer := range s.peers { - if err := s.raftServer.AddPeer(peer, pb.ServerToGrpcAddress(peer)); err != nil { + if err := s.raftServer.AddPeer(string(peer), peer.ToGrpcAddress()); err != nil { return nil, err } } // Remove deleted peers for existsPeerName := range s.raftServer.Peers() { - exists, existingPeer := false, "" + exists := false + var existingPeer pb.ServerAddress for _, peer := range s.peers { - if pb.ServerToGrpcAddress(peer) == existsPeerName { + if peer.ToGrpcAddress() == existsPeerName { exists, existingPeer = true, peer break } @@ -141,8 +143,10 @@ func (s *RaftServer) Peers() (members []string) { return } -func isTheFirstOne(self string, peers []string) bool { - sort.Strings(peers) +func isTheFirstOne(self pb.ServerAddress, peers []pb.ServerAddress) bool { + sort.Slice(peers, func(i, j int) bool { + return strings.Compare(string(peers[i]), string(peers[j])) < 0 + }) if len(peers) <= 0 { return true } @@ -155,7 +159,7 @@ func (s *RaftServer) DoJoinCommand() { if _, err := s.raftServer.Do(&raft.DefaultJoinCommand{ Name: s.raftServer.Name(), - ConnectionString: pb.ServerToGrpcAddress(s.serverAddr), + ConnectionString: s.serverAddr.ToGrpcAddress(), }); err != nil { glog.Errorf("fail to send join command: %v", err) } diff --git a/weed/server/raft_server_handlers.go b/weed/server/raft_server_handlers.go index 252570eab..7e58f1e92 100644 --- a/weed/server/raft_server_handlers.go +++ b/weed/server/raft_server_handlers.go @@ -1,15 +1,16 @@ package weed_server import ( + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/storage/needle" "net/http" ) type ClusterStatusResult struct { - IsLeader bool `json:"IsLeader,omitempty"` - Leader string `json:"Leader,omitempty"` - Peers []string `json:"Peers,omitempty"` - MaxVolumeId needle.VolumeId `json:"MaxVolumeId,omitempty"` + IsLeader bool `json:"IsLeader,omitempty"` + Leader pb.ServerAddress `json:"Leader,omitempty"` + Peers []string `json:"Peers,omitempty"` + MaxVolumeId needle.VolumeId `json:"MaxVolumeId,omitempty"` } func (s *RaftServer) StatusHandler(w http.ResponseWriter, r *http.Request) { diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go index 770abdab7..2659307fc 100644 --- a/weed/server/volume_grpc_client_to_master.go +++ b/weed/server/volume_grpc_client_to_master.go @@ -19,7 +19,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) -func (vs *VolumeServer) GetMaster() string { +func (vs *VolumeServer) GetMaster() pb.ServerAddress { return vs.currentMaster } @@ -54,7 +54,7 @@ func (vs *VolumeServer) heartbeat() { grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.volume") var err error - var newLeader string + var newLeader pb.ServerAddress for vs.isHeartbeating { for _, master := range vs.SeedMasterNodes { if newLeader != "" { @@ -63,13 +63,8 @@ func (vs *VolumeServer) heartbeat() { time.Sleep(3 * time.Second) master = newLeader } - masterGrpcAddress, parseErr := pb.ParseServerToGrpcAddress(master) - if parseErr != nil { - glog.V(0).Infof("failed to parse master grpc %v: %v", masterGrpcAddress, parseErr) - continue - } vs.store.MasterAddress = master - newLeader, err = vs.doHeartbeat(master, masterGrpcAddress, grpcDialOption, time.Duration(vs.pulseSeconds)*time.Second) + newLeader, err = vs.doHeartbeat(master, grpcDialOption, time.Duration(vs.pulseSeconds)*time.Second) if err != nil { glog.V(0).Infof("heartbeat error: %v", err) time.Sleep(time.Duration(vs.pulseSeconds) * time.Second) @@ -92,25 +87,25 @@ func (vs *VolumeServer) StopHeartbeat() (isAlreadyStopping bool) { return false } -func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDialOption grpc.DialOption, sleepInterval time.Duration) (newLeader string, err error) { +func (vs *VolumeServer) doHeartbeat(masterAddress pb.ServerAddress, grpcDialOption grpc.DialOption, sleepInterval time.Duration) (newLeader pb.ServerAddress, err error) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - grpcConection, err := pb.GrpcDial(ctx, masterGrpcAddress, grpcDialOption) + grpcConection, err := pb.GrpcDial(ctx, masterAddress.ToGrpcAddress(), grpcDialOption) if err != nil { - return "", fmt.Errorf("fail to dial %s : %v", masterNode, err) + return "", fmt.Errorf("fail to dial %s : %v", masterAddress, err) } defer grpcConection.Close() client := master_pb.NewSeaweedClient(grpcConection) stream, err := client.SendHeartbeat(ctx) if err != nil { - glog.V(0).Infof("SendHeartbeat to %s: %v", masterNode, err) + glog.V(0).Infof("SendHeartbeat to %s: %v", masterAddress, err) return "", err } - glog.V(0).Infof("Heartbeat to: %v", masterNode) - vs.currentMaster = masterNode + glog.V(0).Infof("Heartbeat to: %v", masterAddress) + vs.currentMaster = masterAddress doneChan := make(chan error, 1) @@ -130,9 +125,9 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi } } } - if in.GetLeader() != "" && vs.currentMaster != in.GetLeader() { + if in.GetLeader() != "" && string(vs.currentMaster) != in.GetLeader() { glog.V(0).Infof("Volume Server found a new master newLeader: %v instead of %v", in.GetLeader(), vs.currentMaster) - newLeader = in.GetLeader() + newLeader = pb.ServerAddress(in.GetLeader()) doneChan <- nil return } @@ -140,12 +135,12 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi }() if err = stream.Send(vs.store.CollectHeartbeat()); err != nil { - glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err) + glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterAddress, err) return "", err } if err = stream.Send(vs.store.CollectErasureCodingHeartbeat()); err != nil { - glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err) + glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterAddress, err) return "", err } @@ -162,7 +157,7 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi } glog.V(0).Infof("volume server %s:%d adds volume %d", vs.store.Ip, vs.store.Port, volumeMessage.Id) if err = stream.Send(deltaBeat); err != nil { - glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterNode, err) + glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err) return "", err } case ecShardMessage := <-vs.store.NewEcShardsChan: @@ -174,7 +169,7 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi glog.V(0).Infof("volume server %s:%d adds ec shard %d:%d", vs.store.Ip, vs.store.Port, ecShardMessage.Id, erasure_coding.ShardBits(ecShardMessage.EcIndexBits).ShardIds()) if err = stream.Send(deltaBeat); err != nil { - glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterNode, err) + glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err) return "", err } case volumeMessage := <-vs.store.DeletedVolumesChan: @@ -185,7 +180,7 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi } glog.V(0).Infof("volume server %s:%d deletes volume %d", vs.store.Ip, vs.store.Port, volumeMessage.Id) if err = stream.Send(deltaBeat); err != nil { - glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterNode, err) + glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err) return "", err } case ecShardMessage := <-vs.store.DeletedEcShardsChan: @@ -197,20 +192,20 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi glog.V(0).Infof("volume server %s:%d deletes ec shard %d:%d", vs.store.Ip, vs.store.Port, ecShardMessage.Id, erasure_coding.ShardBits(ecShardMessage.EcIndexBits).ShardIds()) if err = stream.Send(deltaBeat); err != nil { - glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterNode, err) + glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err) return "", err } case <-volumeTickChan: glog.V(4).Infof("volume server %s:%d heartbeat", vs.store.Ip, vs.store.Port) vs.store.MaybeAdjustVolumeMax() if err = stream.Send(vs.store.CollectHeartbeat()); err != nil { - glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err) + glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterAddress, err) return "", err } case <-ecShardTickChan: glog.V(4).Infof("volume server %s:%d ec heartbeat", vs.store.Ip, vs.store.Port) if err = stream.Send(vs.store.CollectErasureCodingHeartbeat()); err != nil { - glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err) + glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterAddress, err) return "", err } case err = <-doneChan: @@ -229,7 +224,7 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi } glog.V(1).Infof("volume server %s:%d stops and deletes all volumes", vs.store.Ip, vs.store.Port) if err = stream.Send(emptyBeat); err != nil { - glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterNode, err) + glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err) return "", err } return diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go index 53ee3df0a..e046481fb 100644 --- a/weed/server/volume_grpc_copy.go +++ b/weed/server/volume_grpc_copy.go @@ -3,6 +3,7 @@ package weed_server import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/storage/types" "io" "io/ioutil" @@ -45,7 +46,7 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo // confirm size and timestamp var volFileInfoResp *volume_server_pb.ReadVolumeFileStatusResponse var dataBaseFileName, indexBaseFileName, idxFileName, datFileName string - err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(pb.ServerAddress(req.SourceDataNode), vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { var err error volFileInfoResp, err = client.ReadVolumeFileStatus(context.Background(), &volume_server_pb.ReadVolumeFileStatusRequest{ diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go index 364045d9b..653883c8e 100644 --- a/weed/server/volume_grpc_erasure_coding.go +++ b/weed/server/volume_grpc_erasure_coding.go @@ -3,6 +3,7 @@ package weed_server import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/storage/volume_info" "io" "io/ioutil" @@ -126,7 +127,7 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv dataBaseFileName := storage.VolumeFileName(location.Directory, req.Collection, int(req.VolumeId)) indexBaseFileName := storage.VolumeFileName(location.IdxDirectory, req.Collection, int(req.VolumeId)) - err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(pb.ServerAddress(req.SourceDataNode), vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { // copy ec data slices for _, shardId := range req.ShardIds { diff --git a/weed/server/volume_grpc_tail.go b/weed/server/volume_grpc_tail.go index 3ea902ed3..4022da44a 100644 --- a/weed/server/volume_grpc_tail.go +++ b/weed/server/volume_grpc_tail.go @@ -3,6 +3,7 @@ package weed_server import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" "time" "github.com/chrislusf/seaweedfs/weed/glog" @@ -89,7 +90,7 @@ func (vs *VolumeServer) VolumeTailReceiver(ctx context.Context, req *volume_serv defer glog.V(1).Infof("receive tailing volume %d finished", v.Id) - return resp, operation.TailVolumeFromSource(req.SourceVolumeServer, vs.grpcDialOption, v.Id, req.SinceNs, int(req.IdleTimeoutSeconds), func(n *needle.Needle) error { + return resp, operation.TailVolumeFromSource(pb.ServerAddress(req.SourceVolumeServer), vs.grpcDialOption, v.Id, req.SinceNs, int(req.IdleTimeoutSeconds), func(n *needle.Needle) error { _, err := vs.store.WriteVolumeNeedle(v.Id, n, false, false) return err }) diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go index 9406b5601..6b6692146 100644 --- a/weed/server/volume_server.go +++ b/weed/server/volume_server.go @@ -1,6 +1,7 @@ package weed_server import ( + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/storage/types" "net/http" "sync" @@ -23,8 +24,8 @@ type VolumeServer struct { inFlightUploadDataLimitCond *sync.Cond inFlightDownloadDataLimitCond *sync.Cond - SeedMasterNodes []string - currentMaster string + SeedMasterNodes []pb.ServerAddress + currentMaster pb.ServerAddress pulseSeconds int dataCenter string rack string @@ -44,11 +45,11 @@ type VolumeServer struct { } func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, - port int, publicUrl string, + port int, grpcPort int, publicUrl string, folders []string, maxCounts []int, minFreeSpaces []util.MinFreeSpace, diskTypes []types.DiskType, idxFolder string, needleMapKind storage.NeedleMapKind, - masterNodes []string, pulseSeconds int, + masterNodes []pb.ServerAddress, pulseSeconds int, dataCenter string, rack string, whiteList []string, fixJpgOrientation bool, @@ -90,7 +91,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, vs.checkWithMaster() - vs.store = storage.NewStore(vs.grpcDialOption, port, ip, publicUrl, folders, maxCounts, minFreeSpaces, idxFolder, vs.needleMapKind, diskTypes) + vs.store = storage.NewStore(vs.grpcDialOption, ip, port, grpcPort, publicUrl, folders, maxCounts, minFreeSpaces, idxFolder, vs.needleMapKind, diskTypes) vs.guard = security.NewGuard(whiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec) handleStaticResources(adminMux) diff --git a/weed/server/volume_server_handlers_ui.go b/weed/server/volume_server_handlers_ui.go index 437e5c45d..2c420c2d6 100644 --- a/weed/server/volume_server_handlers_ui.go +++ b/weed/server/volume_server_handlers_ui.go @@ -1,6 +1,7 @@ package weed_server import ( + "github.com/chrislusf/seaweedfs/weed/pb" "net/http" "path/filepath" "time" @@ -35,7 +36,7 @@ func (vs *VolumeServer) uiStatusHandler(w http.ResponseWriter, r *http.Request) } args := struct { Version string - Masters []string + Masters []pb.ServerAddress Volumes interface{} EcVolumes interface{} RemoteVolumes interface{} diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index e99d4a358..239de69f8 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -27,8 +27,7 @@ import ( ) type WebDavOption struct { - Filer string - FilerGrpcAddress string + Filer pb.ServerAddress DomainName string BucketsPath string GrpcDialOption grpc.DialOption @@ -107,7 +106,7 @@ type WebDavFile struct { func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) { - cacheUniqueId := util.Md5String([]byte("webdav" + option.FilerGrpcAddress + util.Version()))[0:8] + cacheUniqueId := util.Md5String([]byte("webdav" + string(option.Filer) + util.Version()))[0:8] cacheDir := path.Join(option.CacheDir, cacheUniqueId) os.MkdirAll(cacheDir, os.FileMode(0755)) @@ -126,7 +125,7 @@ func (fs *WebDavFileSystem) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) - }, fs.option.FilerGrpcAddress, fs.option.GrpcDialOption) + }, fs.option.Filer.ToGrpcAddress(), fs.option.GrpcDialOption) } func (fs *WebDavFileSystem) AdjustedUrl(location *filer_pb.Location) string { @@ -398,7 +397,7 @@ func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64 return fmt.Errorf("assign volume failure %v: %v", request, resp.Error) } - fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth) + fileId, host, auth = resp.FileId, resp.Location.Url, security.EncodedJwt(resp.Auth) f.collection, f.replication = resp.Collection, resp.Replication return nil |
