aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/filer_grpc_server.go15
-rw-r--r--weed/server/filer_grpc_server_remote.go5
-rw-r--r--weed/server/filer_server.go20
-rw-r--r--weed/server/master_grpc_server.go15
-rw-r--r--weed/server/master_grpc_server_collection.go2
-rw-r--r--weed/server/master_grpc_server_volume.go12
-rw-r--r--weed/server/master_server.go15
-rw-r--r--weed/server/master_server_handlers_admin.go13
-rw-r--r--weed/server/raft_server.go24
-rw-r--r--weed/server/raft_server_handlers.go9
-rw-r--r--weed/server/volume_grpc_client_to_master.go45
-rw-r--r--weed/server/volume_grpc_copy.go3
-rw-r--r--weed/server/volume_grpc_erasure_coding.go3
-rw-r--r--weed/server/volume_grpc_tail.go3
-rw-r--r--weed/server/volume_server.go11
-rw-r--r--weed/server/volume_server_handlers_ui.go3
-rw-r--r--weed/server/webdav_server.go9
17 files changed, 105 insertions, 102 deletions
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go
index 6a7df0f87..1df15d69f 100644
--- a/weed/server/filer_grpc_server.go
+++ b/weed/server/filer_grpc_server.go
@@ -3,6 +3,7 @@ package weed_server
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"os"
"path/filepath"
"strconv"
@@ -107,6 +108,7 @@ func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVol
locs = append(locs, &filer_pb.Location{
Url: loc.Url,
PublicUrl: loc.PublicUrl,
+ GrpcPort: uint32(loc.GrpcPort),
})
}
resp.LocationsMap[vidString] = &filer_pb.Locations{
@@ -306,10 +308,13 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol
}
return &filer_pb.AssignVolumeResponse{
- FileId: assignResult.Fid,
- Count: int32(assignResult.Count),
- Url: assignResult.Url,
- PublicUrl: assignResult.PublicUrl,
+ FileId: assignResult.Fid,
+ Count: int32(assignResult.Count),
+ Location: &filer_pb.Location{
+ Url: assignResult.Url,
+ PublicUrl: assignResult.PublicUrl,
+ GrpcPort: uint32(assignResult.GrpcPort),
+ },
Auth: string(assignResult.Auth),
Collection: so.Collection,
Replication: so.Replication,
@@ -387,7 +392,7 @@ func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb.
clusterId, _ := fs.filer.Store.KvGet(context.Background(), []byte("clusterId"))
t := &filer_pb.GetFilerConfigurationResponse{
- Masters: fs.option.Masters,
+ Masters: pb.ToAddressStrings(fs.option.Masters),
Collection: fs.option.Collection,
Replication: fs.option.DefaultReplication,
MaxMb: uint32(fs.option.MaxMB),
diff --git a/weed/server/filer_grpc_server_remote.go b/weed/server/filer_grpc_server_remote.go
index c47356a8e..9f986e6aa 100644
--- a/weed/server/filer_grpc_server_remote.go
+++ b/weed/server/filer_grpc_server_remote.go
@@ -5,6 +5,7 @@ import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/remote_pb"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
@@ -115,11 +116,13 @@ func (fs *FilerServer) DownloadToLocal(ctx context.Context, req *filer_pb.Downlo
replicas = append(replicas, &volume_server_pb.FetchAndWriteNeedleRequest_Replica{
Url: r.Url,
PublicUrl: r.PublicUrl,
+ GrpcPort: int32(r.GrpcPort),
})
}
// tell filer to tell volume server to download into needles
- err = operation.WithVolumeServerClient(assignResult.Url, fs.grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ assignedServerAddress := pb.NewServerAddressWithGrpcPort(assignResult.Url, assignResult.GrpcPort)
+ err = operation.WithVolumeServerClient(assignedServerAddress, fs.grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, fetchAndWriteErr := volumeServerClient.FetchAndWriteNeedle(context.Background(), &volume_server_pb.FetchAndWriteNeedleRequest{
VolumeId: uint32(fileId.VolumeId),
NeedleId: uint64(fileId.Key),
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index 7e5e98660..b886bf641 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -46,7 +46,7 @@ import (
)
type FilerOption struct {
- Masters []string
+ Masters []pb.ServerAddress
Collection string
DefaultReplication string
DisableDirListing bool
@@ -56,12 +56,11 @@ type FilerOption struct {
Rack string
DefaultLevelDbDir string
DisableHttp bool
- Host string
- Port uint32
+ Host pb.ServerAddress
recursiveDelete bool
Cipher bool
SaveToFilerLimit int64
- Filers []string
+ Filers []pb.ServerAddress
ConcurrentUploadLimit int64
}
@@ -100,14 +99,14 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
glog.Fatal("master list is required!")
}
- fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, option.DataCenter, func() {
+ fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Collection, option.DefaultReplication, option.DataCenter, func() {
fs.listenersCond.Broadcast()
})
fs.filer.Cipher = option.Cipher
fs.checkWithMaster()
- go stats.LoopPushingMetric("filer", stats.SourceName(fs.option.Port), fs.metricsAddress, fs.metricsIntervalSec)
+ go stats.LoopPushingMetric("filer", string(fs.option.Host), fs.metricsAddress, fs.metricsIntervalSec)
go fs.filer.KeepConnectedToMaster()
v := util.GetViper()
@@ -143,7 +142,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
readonlyMux.HandleFunc("/", fs.readonlyFilerHandler)
}
- fs.filer.AggregateFromPeers(util.JoinHostPort(option.Host, int(option.Port)), option.Filers)
+ fs.filer.AggregateFromPeers(option.Host, option.Filers)
fs.filer.LoadBuckets()
@@ -160,13 +159,6 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
func (fs *FilerServer) checkWithMaster() {
- for _, master := range fs.option.Masters {
- _, err := pb.ParseServerToGrpcAddress(master)
- if err != nil {
- glog.Fatalf("invalid master address %s: %v", master, err)
- }
- }
-
isConnected := false
for !isConnected {
for _, master := range fs.option.Masters {
diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go
index 94e050259..194520f49 100644
--- a/weed/server/master_grpc_server.go
+++ b/weed/server/master_grpc_server.go
@@ -2,6 +2,7 @@ package weed_server
import (
"context"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/util"
"net"
@@ -70,7 +71,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
dcName, rackName := ms.Topo.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack)
dc := ms.Topo.GetOrCreateDataCenter(dcName)
rack := dc.GetOrCreateRack(rackName)
- dn = rack.GetOrCreateDataNode(heartbeat.Ip, int(heartbeat.Port), heartbeat.PublicUrl, heartbeat.MaxVolumeCounts)
+ dn = rack.GetOrCreateDataNode(heartbeat.Ip, int(heartbeat.Port), int(heartbeat.GrpcPort), heartbeat.PublicUrl, heartbeat.MaxVolumeCounts)
glog.V(0).Infof("added volume server %d: %v:%d", dn.Counter, heartbeat.GetIp(), heartbeat.GetPort())
if err := stream.Send(&master_pb.HeartbeatResponse{
VolumeSizeLimit: uint64(ms.option.VolumeSizeLimitMB) * 1024 * 1024,
@@ -168,7 +169,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
return err
}
if err := stream.Send(&master_pb.HeartbeatResponse{
- Leader: newLeader,
+ Leader: string(newLeader),
}); err != nil {
glog.Warningf("SendHeartbeat.Send response to to %s:%d %v", dn.Ip, dn.Port, err)
return err
@@ -189,7 +190,7 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ
return ms.informNewLeader(stream)
}
- peerAddress := findClientAddress(stream.Context(), req.GrpcPort)
+ peerAddress := pb.ServerAddress(req.ClientAddress)
// buffer by 1 so we don't end up getting stuck writing to stopChan forever
stopChan := make(chan bool, 1)
@@ -241,15 +242,15 @@ func (ms *MasterServer) informNewLeader(stream master_pb.Seaweed_KeepConnectedSe
return raft.NotLeaderError
}
if err := stream.Send(&master_pb.VolumeLocation{
- Leader: leader,
+ Leader: string(leader),
}); err != nil {
return err
}
return nil
}
-func (ms *MasterServer) addClient(clientType string, clientAddress string) (clientName string, messageChan chan *master_pb.VolumeLocation) {
- clientName = clientType + "@" + clientAddress
+func (ms *MasterServer) addClient(clientType string, clientAddress pb.ServerAddress) (clientName string, messageChan chan *master_pb.VolumeLocation) {
+ clientName = clientType + "@" + string(clientAddress)
glog.V(0).Infof("+ client %v", clientName)
// we buffer this because otherwise we end up in a potential deadlock where
@@ -319,7 +320,7 @@ func (ms *MasterServer) GetMasterConfiguration(ctx context.Context, req *master_
DefaultReplication: ms.option.DefaultReplicaPlacement,
VolumeSizeLimitMB: uint32(ms.option.VolumeSizeLimitMB),
VolumePreallocate: ms.option.VolumePreallocate,
- Leader: leader,
+ Leader: string(leader),
}
return resp, nil
diff --git a/weed/server/master_grpc_server_collection.go b/weed/server/master_grpc_server_collection.go
index b92d6bcbe..55f3faf8c 100644
--- a/weed/server/master_grpc_server_collection.go
+++ b/weed/server/master_grpc_server_collection.go
@@ -58,7 +58,7 @@ func (ms *MasterServer) doDeleteNormalCollection(collectionName string) error {
}
for _, server := range collection.ListVolumeServers() {
- err := operation.WithVolumeServerClient(server.Url(), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(server.ServerAddress(), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
_, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{
Collection: collectionName,
})
diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go
index 3a92889d2..49ac455fe 100644
--- a/weed/server/master_grpc_server_volume.go
+++ b/weed/server/master_grpc_server_volume.go
@@ -150,17 +150,21 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
fid, count, dnList, err := ms.Topo.PickForWrite(req.Count, option)
if err == nil {
dn := dnList.Head()
- var replicas []*master_pb.AssignResponse_Replica
+ var replicas []*master_pb.Location
for _, r := range dnList.Rest() {
- replicas = append(replicas, &master_pb.AssignResponse_Replica{
+ replicas = append(replicas, &master_pb.Location{
Url: r.Url(),
PublicUrl: r.PublicUrl,
+ GrpcPort: uint32(r.GrpcPort),
})
}
return &master_pb.AssignResponse{
Fid: fid,
- Url: dn.Url(),
- PublicUrl: dn.PublicUrl,
+ Location: &master_pb.Location{
+ Url: dn.Url(),
+ PublicUrl: dn.PublicUrl,
+ GrpcPort: uint32(dn.GrpcPort),
+ },
Count: count,
Auth: string(security.GenJwt(ms.guard.SigningKey, ms.guard.ExpiresAfterSec, fid)),
Replicas: replicas,
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index 7c78be379..8de01abf7 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -2,6 +2,7 @@ package weed_server
import (
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"net/http"
"net/http/httputil"
"net/url"
@@ -32,8 +33,7 @@ const (
)
type MasterOption struct {
- Host string
- Port int
+ Master pb.ServerAddress
MetaFolder string
VolumeSizeLimitMB uint32
VolumePreallocate bool
@@ -70,7 +70,7 @@ type MasterServer struct {
adminLocks *AdminLocks
}
-func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *MasterServer {
+func NewMasterServer(r *mux.Router, option *MasterOption, peers []pb.ServerAddress) *MasterServer {
v := util.GetViper()
signingKey := v.GetString("jwt.signing.key")
@@ -102,7 +102,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste
vgCh: make(chan *topology.VolumeGrowRequest, 1<<6),
clientChans: make(map[string]chan *master_pb.VolumeLocation),
grpcDialOption: grpcDialOption,
- MasterClient: wdclient.NewMasterClient(grpcDialOption, "master", option.Host, 0, "", peers),
+ MasterClient: wdclient.NewMasterClient(grpcDialOption, "master", option.Master, "", peers),
adminLocks: NewAdminLocks(),
}
ms.boundedLeaderChan = make(chan int, 16)
@@ -224,14 +224,13 @@ func (ms *MasterServer) startAdminScripts() {
scriptLines = append(scriptLines, "unlock")
}
- masterAddress := util.JoinHostPort(ms.option.Host, ms.option.Port)
+ masterAddress := string(ms.option.Master)
var shellOptions shell.ShellOptions
shellOptions.GrpcDialOption = security.LoadClientTLS(v, "grpc.master")
shellOptions.Masters = &masterAddress
- shellOptions.FilerHost, shellOptions.FilerPort, err = util.ParseHostPort(filerHostPort)
- shellOptions.FilerAddress = filerHostPort
+ shellOptions.FilerAddress = pb.ServerAddress(filerHostPort)
shellOptions.Directory = "/"
if err != nil {
glog.V(0).Infof("failed to parse master.filer.default = %s : %v\n", filerHostPort, err)
@@ -299,7 +298,7 @@ func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer
case "snowflake":
var err error
snowflakeId := v.GetInt(SequencerSnowflakeId)
- seq, err = sequence.NewSnowflakeSequencer(util.JoinHostPort(option.Host, option.Port), snowflakeId)
+ seq, err = sequence.NewSnowflakeSequencer(string(option.Master), snowflakeId)
if err != nil {
glog.Error(err)
seq = nil
diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go
index 4a86348d9..549ea86dc 100644
--- a/weed/server/master_server_handlers_admin.go
+++ b/weed/server/master_server_handlers_admin.go
@@ -3,6 +3,7 @@ package weed_server
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"math/rand"
"net/http"
"strconv"
@@ -26,7 +27,7 @@ func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.R
return
}
for _, server := range collection.ListVolumeServers() {
- err := operation.WithVolumeServerClient(server.Url(), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(server.ServerAddress(), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
_, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{
Collection: collection.Name,
})
@@ -118,21 +119,15 @@ func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request)
}
}
-func (ms *MasterServer) selfUrl(r *http.Request) string {
- if r.Host != "" {
- return r.Host
- }
- return "localhost:" + strconv.Itoa(ms.option.Port)
-}
func (ms *MasterServer) submitFromMasterServerHandler(w http.ResponseWriter, r *http.Request) {
if ms.Topo.IsLeader() {
- submitForClientHandler(w, r, func() string { return ms.selfUrl(r) }, ms.grpcDialOption)
+ submitForClientHandler(w, r, func() pb.ServerAddress { return ms.option.Master }, ms.grpcDialOption)
} else {
masterUrl, err := ms.Topo.Leader()
if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err)
} else {
- submitForClientHandler(w, r, func() string { return masterUrl }, ms.grpcDialOption)
+ submitForClientHandler(w, r, func() pb.ServerAddress { return masterUrl }, ms.grpcDialOption)
}
}
}
diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go
index 85841e409..568bfc7b5 100644
--- a/weed/server/raft_server.go
+++ b/weed/server/raft_server.go
@@ -6,6 +6,7 @@ import (
"os"
"path"
"sort"
+ "strings"
"time"
"google.golang.org/grpc"
@@ -19,10 +20,10 @@ import (
)
type RaftServer struct {
- peers []string // initial peers to join with
+ peers []pb.ServerAddress // initial peers to join with
raftServer raft.Server
dataDir string
- serverAddr string
+ serverAddr pb.ServerAddress
topo *topology.Topology
*raft.GrpcServer
}
@@ -51,7 +52,7 @@ func (s StateMachine) Recovery(data []byte) error {
return nil
}
-func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, dataDir string, topo *topology.Topology, raftResumeState bool) (*RaftServer, error) {
+func NewRaftServer(grpcDialOption grpc.DialOption, peers []pb.ServerAddress, serverAddr pb.ServerAddress, dataDir string, topo *topology.Topology, raftResumeState bool) (*RaftServer, error) {
s := &RaftServer{
peers: peers,
serverAddr: serverAddr,
@@ -80,7 +81,7 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, d
}
stateMachine := StateMachine{topo: topo}
- s.raftServer, err = raft.NewServer(s.serverAddr, s.dataDir, transporter, stateMachine, topo, "")
+ s.raftServer, err = raft.NewServer(string(s.serverAddr), s.dataDir, transporter, stateMachine, topo, "")
if err != nil {
glog.V(0).Infoln(err)
return nil, err
@@ -95,16 +96,17 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, d
}
for _, peer := range s.peers {
- if err := s.raftServer.AddPeer(peer, pb.ServerToGrpcAddress(peer)); err != nil {
+ if err := s.raftServer.AddPeer(string(peer), peer.ToGrpcAddress()); err != nil {
return nil, err
}
}
// Remove deleted peers
for existsPeerName := range s.raftServer.Peers() {
- exists, existingPeer := false, ""
+ exists := false
+ var existingPeer pb.ServerAddress
for _, peer := range s.peers {
- if pb.ServerToGrpcAddress(peer) == existsPeerName {
+ if peer.ToGrpcAddress() == existsPeerName {
exists, existingPeer = true, peer
break
}
@@ -141,8 +143,10 @@ func (s *RaftServer) Peers() (members []string) {
return
}
-func isTheFirstOne(self string, peers []string) bool {
- sort.Strings(peers)
+func isTheFirstOne(self pb.ServerAddress, peers []pb.ServerAddress) bool {
+ sort.Slice(peers, func(i, j int) bool {
+ return strings.Compare(string(peers[i]), string(peers[j])) < 0
+ })
if len(peers) <= 0 {
return true
}
@@ -155,7 +159,7 @@ func (s *RaftServer) DoJoinCommand() {
if _, err := s.raftServer.Do(&raft.DefaultJoinCommand{
Name: s.raftServer.Name(),
- ConnectionString: pb.ServerToGrpcAddress(s.serverAddr),
+ ConnectionString: s.serverAddr.ToGrpcAddress(),
}); err != nil {
glog.Errorf("fail to send join command: %v", err)
}
diff --git a/weed/server/raft_server_handlers.go b/weed/server/raft_server_handlers.go
index 252570eab..7e58f1e92 100644
--- a/weed/server/raft_server_handlers.go
+++ b/weed/server/raft_server_handlers.go
@@ -1,15 +1,16 @@
package weed_server
import (
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"net/http"
)
type ClusterStatusResult struct {
- IsLeader bool `json:"IsLeader,omitempty"`
- Leader string `json:"Leader,omitempty"`
- Peers []string `json:"Peers,omitempty"`
- MaxVolumeId needle.VolumeId `json:"MaxVolumeId,omitempty"`
+ IsLeader bool `json:"IsLeader,omitempty"`
+ Leader pb.ServerAddress `json:"Leader,omitempty"`
+ Peers []string `json:"Peers,omitempty"`
+ MaxVolumeId needle.VolumeId `json:"MaxVolumeId,omitempty"`
}
func (s *RaftServer) StatusHandler(w http.ResponseWriter, r *http.Request) {
diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go
index 770abdab7..2659307fc 100644
--- a/weed/server/volume_grpc_client_to_master.go
+++ b/weed/server/volume_grpc_client_to_master.go
@@ -19,7 +19,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
)
-func (vs *VolumeServer) GetMaster() string {
+func (vs *VolumeServer) GetMaster() pb.ServerAddress {
return vs.currentMaster
}
@@ -54,7 +54,7 @@ func (vs *VolumeServer) heartbeat() {
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.volume")
var err error
- var newLeader string
+ var newLeader pb.ServerAddress
for vs.isHeartbeating {
for _, master := range vs.SeedMasterNodes {
if newLeader != "" {
@@ -63,13 +63,8 @@ func (vs *VolumeServer) heartbeat() {
time.Sleep(3 * time.Second)
master = newLeader
}
- masterGrpcAddress, parseErr := pb.ParseServerToGrpcAddress(master)
- if parseErr != nil {
- glog.V(0).Infof("failed to parse master grpc %v: %v", masterGrpcAddress, parseErr)
- continue
- }
vs.store.MasterAddress = master
- newLeader, err = vs.doHeartbeat(master, masterGrpcAddress, grpcDialOption, time.Duration(vs.pulseSeconds)*time.Second)
+ newLeader, err = vs.doHeartbeat(master, grpcDialOption, time.Duration(vs.pulseSeconds)*time.Second)
if err != nil {
glog.V(0).Infof("heartbeat error: %v", err)
time.Sleep(time.Duration(vs.pulseSeconds) * time.Second)
@@ -92,25 +87,25 @@ func (vs *VolumeServer) StopHeartbeat() (isAlreadyStopping bool) {
return false
}
-func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDialOption grpc.DialOption, sleepInterval time.Duration) (newLeader string, err error) {
+func (vs *VolumeServer) doHeartbeat(masterAddress pb.ServerAddress, grpcDialOption grpc.DialOption, sleepInterval time.Duration) (newLeader pb.ServerAddress, err error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
- grpcConection, err := pb.GrpcDial(ctx, masterGrpcAddress, grpcDialOption)
+ grpcConection, err := pb.GrpcDial(ctx, masterAddress.ToGrpcAddress(), grpcDialOption)
if err != nil {
- return "", fmt.Errorf("fail to dial %s : %v", masterNode, err)
+ return "", fmt.Errorf("fail to dial %s : %v", masterAddress, err)
}
defer grpcConection.Close()
client := master_pb.NewSeaweedClient(grpcConection)
stream, err := client.SendHeartbeat(ctx)
if err != nil {
- glog.V(0).Infof("SendHeartbeat to %s: %v", masterNode, err)
+ glog.V(0).Infof("SendHeartbeat to %s: %v", masterAddress, err)
return "", err
}
- glog.V(0).Infof("Heartbeat to: %v", masterNode)
- vs.currentMaster = masterNode
+ glog.V(0).Infof("Heartbeat to: %v", masterAddress)
+ vs.currentMaster = masterAddress
doneChan := make(chan error, 1)
@@ -130,9 +125,9 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi
}
}
}
- if in.GetLeader() != "" && vs.currentMaster != in.GetLeader() {
+ if in.GetLeader() != "" && string(vs.currentMaster) != in.GetLeader() {
glog.V(0).Infof("Volume Server found a new master newLeader: %v instead of %v", in.GetLeader(), vs.currentMaster)
- newLeader = in.GetLeader()
+ newLeader = pb.ServerAddress(in.GetLeader())
doneChan <- nil
return
}
@@ -140,12 +135,12 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi
}()
if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
- glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err)
+ glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterAddress, err)
return "", err
}
if err = stream.Send(vs.store.CollectErasureCodingHeartbeat()); err != nil {
- glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err)
+ glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterAddress, err)
return "", err
}
@@ -162,7 +157,7 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi
}
glog.V(0).Infof("volume server %s:%d adds volume %d", vs.store.Ip, vs.store.Port, volumeMessage.Id)
if err = stream.Send(deltaBeat); err != nil {
- glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterNode, err)
+ glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err)
return "", err
}
case ecShardMessage := <-vs.store.NewEcShardsChan:
@@ -174,7 +169,7 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi
glog.V(0).Infof("volume server %s:%d adds ec shard %d:%d", vs.store.Ip, vs.store.Port, ecShardMessage.Id,
erasure_coding.ShardBits(ecShardMessage.EcIndexBits).ShardIds())
if err = stream.Send(deltaBeat); err != nil {
- glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterNode, err)
+ glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err)
return "", err
}
case volumeMessage := <-vs.store.DeletedVolumesChan:
@@ -185,7 +180,7 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi
}
glog.V(0).Infof("volume server %s:%d deletes volume %d", vs.store.Ip, vs.store.Port, volumeMessage.Id)
if err = stream.Send(deltaBeat); err != nil {
- glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterNode, err)
+ glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err)
return "", err
}
case ecShardMessage := <-vs.store.DeletedEcShardsChan:
@@ -197,20 +192,20 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi
glog.V(0).Infof("volume server %s:%d deletes ec shard %d:%d", vs.store.Ip, vs.store.Port, ecShardMessage.Id,
erasure_coding.ShardBits(ecShardMessage.EcIndexBits).ShardIds())
if err = stream.Send(deltaBeat); err != nil {
- glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterNode, err)
+ glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err)
return "", err
}
case <-volumeTickChan:
glog.V(4).Infof("volume server %s:%d heartbeat", vs.store.Ip, vs.store.Port)
vs.store.MaybeAdjustVolumeMax()
if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
- glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err)
+ glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterAddress, err)
return "", err
}
case <-ecShardTickChan:
glog.V(4).Infof("volume server %s:%d ec heartbeat", vs.store.Ip, vs.store.Port)
if err = stream.Send(vs.store.CollectErasureCodingHeartbeat()); err != nil {
- glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err)
+ glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterAddress, err)
return "", err
}
case err = <-doneChan:
@@ -229,7 +224,7 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi
}
glog.V(1).Infof("volume server %s:%d stops and deletes all volumes", vs.store.Ip, vs.store.Port)
if err = stream.Send(emptyBeat); err != nil {
- glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterNode, err)
+ glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err)
return "", err
}
return
diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go
index 53ee3df0a..e046481fb 100644
--- a/weed/server/volume_grpc_copy.go
+++ b/weed/server/volume_grpc_copy.go
@@ -3,6 +3,7 @@ package weed_server
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"io"
"io/ioutil"
@@ -45,7 +46,7 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
// confirm size and timestamp
var volFileInfoResp *volume_server_pb.ReadVolumeFileStatusResponse
var dataBaseFileName, indexBaseFileName, idxFileName, datFileName string
- err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(pb.ServerAddress(req.SourceDataNode), vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
var err error
volFileInfoResp, err = client.ReadVolumeFileStatus(context.Background(),
&volume_server_pb.ReadVolumeFileStatusRequest{
diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go
index 364045d9b..653883c8e 100644
--- a/weed/server/volume_grpc_erasure_coding.go
+++ b/weed/server/volume_grpc_erasure_coding.go
@@ -3,6 +3,7 @@ package weed_server
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/storage/volume_info"
"io"
"io/ioutil"
@@ -126,7 +127,7 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv
dataBaseFileName := storage.VolumeFileName(location.Directory, req.Collection, int(req.VolumeId))
indexBaseFileName := storage.VolumeFileName(location.IdxDirectory, req.Collection, int(req.VolumeId))
- err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(pb.ServerAddress(req.SourceDataNode), vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
// copy ec data slices
for _, shardId := range req.ShardIds {
diff --git a/weed/server/volume_grpc_tail.go b/weed/server/volume_grpc_tail.go
index 3ea902ed3..4022da44a 100644
--- a/weed/server/volume_grpc_tail.go
+++ b/weed/server/volume_grpc_tail.go
@@ -3,6 +3,7 @@ package weed_server
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -89,7 +90,7 @@ func (vs *VolumeServer) VolumeTailReceiver(ctx context.Context, req *volume_serv
defer glog.V(1).Infof("receive tailing volume %d finished", v.Id)
- return resp, operation.TailVolumeFromSource(req.SourceVolumeServer, vs.grpcDialOption, v.Id, req.SinceNs, int(req.IdleTimeoutSeconds), func(n *needle.Needle) error {
+ return resp, operation.TailVolumeFromSource(pb.ServerAddress(req.SourceVolumeServer), vs.grpcDialOption, v.Id, req.SinceNs, int(req.IdleTimeoutSeconds), func(n *needle.Needle) error {
_, err := vs.store.WriteVolumeNeedle(v.Id, n, false, false)
return err
})
diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go
index 9406b5601..6b6692146 100644
--- a/weed/server/volume_server.go
+++ b/weed/server/volume_server.go
@@ -1,6 +1,7 @@
package weed_server
import (
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"net/http"
"sync"
@@ -23,8 +24,8 @@ type VolumeServer struct {
inFlightUploadDataLimitCond *sync.Cond
inFlightDownloadDataLimitCond *sync.Cond
- SeedMasterNodes []string
- currentMaster string
+ SeedMasterNodes []pb.ServerAddress
+ currentMaster pb.ServerAddress
pulseSeconds int
dataCenter string
rack string
@@ -44,11 +45,11 @@ type VolumeServer struct {
}
func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
- port int, publicUrl string,
+ port int, grpcPort int, publicUrl string,
folders []string, maxCounts []int, minFreeSpaces []util.MinFreeSpace, diskTypes []types.DiskType,
idxFolder string,
needleMapKind storage.NeedleMapKind,
- masterNodes []string, pulseSeconds int,
+ masterNodes []pb.ServerAddress, pulseSeconds int,
dataCenter string, rack string,
whiteList []string,
fixJpgOrientation bool,
@@ -90,7 +91,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
vs.checkWithMaster()
- vs.store = storage.NewStore(vs.grpcDialOption, port, ip, publicUrl, folders, maxCounts, minFreeSpaces, idxFolder, vs.needleMapKind, diskTypes)
+ vs.store = storage.NewStore(vs.grpcDialOption, ip, port, grpcPort, publicUrl, folders, maxCounts, minFreeSpaces, idxFolder, vs.needleMapKind, diskTypes)
vs.guard = security.NewGuard(whiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec)
handleStaticResources(adminMux)
diff --git a/weed/server/volume_server_handlers_ui.go b/weed/server/volume_server_handlers_ui.go
index 437e5c45d..2c420c2d6 100644
--- a/weed/server/volume_server_handlers_ui.go
+++ b/weed/server/volume_server_handlers_ui.go
@@ -1,6 +1,7 @@
package weed_server
import (
+ "github.com/chrislusf/seaweedfs/weed/pb"
"net/http"
"path/filepath"
"time"
@@ -35,7 +36,7 @@ func (vs *VolumeServer) uiStatusHandler(w http.ResponseWriter, r *http.Request)
}
args := struct {
Version string
- Masters []string
+ Masters []pb.ServerAddress
Volumes interface{}
EcVolumes interface{}
RemoteVolumes interface{}
diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go
index e99d4a358..239de69f8 100644
--- a/weed/server/webdav_server.go
+++ b/weed/server/webdav_server.go
@@ -27,8 +27,7 @@ import (
)
type WebDavOption struct {
- Filer string
- FilerGrpcAddress string
+ Filer pb.ServerAddress
DomainName string
BucketsPath string
GrpcDialOption grpc.DialOption
@@ -107,7 +106,7 @@ type WebDavFile struct {
func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) {
- cacheUniqueId := util.Md5String([]byte("webdav" + option.FilerGrpcAddress + util.Version()))[0:8]
+ cacheUniqueId := util.Md5String([]byte("webdav" + string(option.Filer) + util.Version()))[0:8]
cacheDir := path.Join(option.CacheDir, cacheUniqueId)
os.MkdirAll(cacheDir, os.FileMode(0755))
@@ -126,7 +125,7 @@ func (fs *WebDavFileSystem) WithFilerClient(fn func(filer_pb.SeaweedFilerClient)
return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
- }, fs.option.FilerGrpcAddress, fs.option.GrpcDialOption)
+ }, fs.option.Filer.ToGrpcAddress(), fs.option.GrpcDialOption)
}
func (fs *WebDavFileSystem) AdjustedUrl(location *filer_pb.Location) string {
@@ -398,7 +397,7 @@ func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64
return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
}
- fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
+ fileId, host, auth = resp.FileId, resp.Location.Url, security.EncodedJwt(resp.Auth)
f.collection, f.replication = resp.Collection, resp.Replication
return nil