aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--k8s/helm_charts2/Chart.yaml4
-rw-r--r--weed/command/benchmark.go2
-rw-r--r--weed/command/filer.go4
-rw-r--r--weed/command/iam.go2
-rw-r--r--weed/command/master.go9
-rw-r--r--weed/command/master_follower.go2
-rw-r--r--weed/command/server.go2
-rw-r--r--weed/filer/filer.go2
-rw-r--r--weed/filer/meta_aggregator.go3
-rw-r--r--weed/iamapi/iamapi_server.go2
-rw-r--r--weed/pb/grpc_client_server.go2
-rw-r--r--weed/pb/server_address.go15
-rw-r--r--weed/server/filer_grpc_server.go2
-rw-r--r--weed/server/filer_server.go2
-rw-r--r--weed/server/master_server.go2
-rw-r--r--weed/server/raft_server.go35
-rw-r--r--weed/shell/commands.go2
-rw-r--r--weed/storage/erasure_coding/ec_encoder.go2
-rw-r--r--weed/util/constants.go2
-rw-r--r--weed/wdclient/masterclient.go4
20 files changed, 46 insertions, 54 deletions
diff --git a/k8s/helm_charts2/Chart.yaml b/k8s/helm_charts2/Chart.yaml
index cd8a81b54..d0499ef51 100644
--- a/k8s/helm_charts2/Chart.yaml
+++ b/k8s/helm_charts2/Chart.yaml
@@ -1,5 +1,5 @@
apiVersion: v1
description: SeaweedFS
name: seaweedfs
-appVersion: "2.95"
-version: "2.95"
+appVersion: "2.96"
+version: "2.96"
diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go
index af5919adf..7091463cc 100644
--- a/weed/command/benchmark.go
+++ b/weed/command/benchmark.go
@@ -129,7 +129,7 @@ func runBenchmark(cmd *Command, args []string) bool {
defer pprof.StopCPUProfile()
}
- b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "client", "", "", pb.ServerAddresses(*b.masters).ToAddresses())
+ b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "client", "", "", pb.ServerAddresses(*b.masters).ToAddressMap())
go b.masterClient.KeepConnectedToMaster()
b.masterClient.WaitUntilConnected()
diff --git a/weed/command/filer.go b/weed/command/filer.go
index 4f8fd947a..0a768944b 100644
--- a/weed/command/filer.go
+++ b/weed/command/filer.go
@@ -29,7 +29,7 @@ var (
)
type FilerOptions struct {
- masters []pb.ServerAddress
+ masters map[string]pb.ServerAddress
mastersString *string
ip *string
bindIp *string
@@ -171,7 +171,7 @@ func runFiler(cmd *Command, args []string) bool {
}()
}
- f.masters = pb.ServerAddresses(*f.mastersString).ToAddresses()
+ f.masters = pb.ServerAddresses(*f.mastersString).ToAddressMap()
f.startFiler()
diff --git a/weed/command/iam.go b/weed/command/iam.go
index 88b17b1a2..968d23095 100644
--- a/weed/command/iam.go
+++ b/weed/command/iam.go
@@ -67,7 +67,7 @@ func (iamopt *IamOptions) startIamServer() bool {
}
}
- masters := pb.ServerAddresses(*iamopt.masters).ToAddresses()
+ masters := pb.ServerAddresses(*iamopt.masters).ToAddressMap()
router := mux.NewRouter().SkipClean(true)
_, iamApiServer_err := iamapi.NewIamApiServer(router, &iamapi.IamServerOption{
Masters: masters,
diff --git a/weed/command/master.go b/weed/command/master.go
index 9e45c5037..e56ee19fe 100644
--- a/weed/command/master.go
+++ b/weed/command/master.go
@@ -132,8 +132,13 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
myMasterAddress, peers := checkPeers(*masterOption.ip, *masterOption.port, *masterOption.portGrpc, *masterOption.peers)
+ masterPeers := make(map[string]pb.ServerAddress)
+ for _, peer := range peers {
+ masterPeers[peer.String()] = peer
+ }
+
r := mux.NewRouter()
- ms := weed_server.NewMasterServer(r, masterOption.toMasterOption(masterWhiteList), peers)
+ ms := weed_server.NewMasterServer(r, masterOption.toMasterOption(masterWhiteList), masterPeers)
listeningAddress := util.JoinHostPort(*masterOption.ipBind, *masterOption.port)
glog.V(0).Infof("Start Seaweed Master %s at %s", util.Version(), listeningAddress)
masterListener, masterLocalListner, e := util.NewIpAndLocalListeners(*masterOption.ipBind, *masterOption.port, 0)
@@ -144,7 +149,7 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
// start raftServer
raftServerOption := &weed_server.RaftServerOption{
GrpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.master"),
- Peers: peers,
+ Peers: masterPeers,
ServerAddr: myMasterAddress,
DataDir: util.ResolvePath(*masterOption.metaFolder),
Topo: ms.Topo,
diff --git a/weed/command/master_follower.go b/weed/command/master_follower.go
index f182d7ce4..ec7d2758f 100644
--- a/weed/command/master_follower.go
+++ b/weed/command/master_follower.go
@@ -83,7 +83,7 @@ func runMasterFollower(cmd *Command, args []string) bool {
func startMasterFollower(masterOptions MasterOptions) {
// collect settings from main masters
- masters := pb.ServerAddresses(*mf.peers).ToAddresses()
+ masters := pb.ServerAddresses(*mf.peers).ToAddressMap()
var err error
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.master")
diff --git a/weed/command/server.go b/weed/command/server.go
index a1b495c5f..0cc60fd30 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -191,7 +191,7 @@ func runServer(cmd *Command, args []string) bool {
// ip address
masterOptions.ip = serverIp
masterOptions.ipBind = serverBindIp
- filerOptions.masters = pb.ServerAddresses(*masterOptions.peers).ToAddresses()
+ filerOptions.masters = pb.ServerAddresses(*masterOptions.peers).ToAddressMap()
filerOptions.ip = serverIp
filerOptions.bindIp = serverBindIp
s3Options.bindIp = serverBindIp
diff --git a/weed/filer/filer.go b/weed/filer/filer.go
index 7b6f1342c..836a0e447 100644
--- a/weed/filer/filer.go
+++ b/weed/filer/filer.go
@@ -49,7 +49,7 @@ type Filer struct {
UniqueFileId uint32
}
-func NewFiler(masters []pb.ServerAddress, grpcDialOption grpc.DialOption,
+func NewFiler(masters map[string]pb.ServerAddress, grpcDialOption grpc.DialOption,
filerHost pb.ServerAddress, collection string, replication string, dataCenter string, notifyFn func()) *Filer {
f := &Filer{
MasterClient: wdclient.NewMasterClient(grpcDialOption, cluster.FilerType, filerHost, dataCenter, masters),
diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go
index 1e8b89ad5..83c8a945d 100644
--- a/weed/filer/meta_aggregator.go
+++ b/weed/filer/meta_aggregator.go
@@ -76,9 +76,6 @@ func (ma *MetaAggregator) setActive(address pb.ServerAddress, isActive bool) (no
}
} else {
if _, found := ma.peerStatues[address]; found {
- ma.peerStatues[address] -= 1
- }
- if ma.peerStatues[address] <= 0 {
delete(ma.peerStatues, address)
}
}
diff --git a/weed/iamapi/iamapi_server.go b/weed/iamapi/iamapi_server.go
index fc0e6b700..6f6cc7533 100644
--- a/weed/iamapi/iamapi_server.go
+++ b/weed/iamapi/iamapi_server.go
@@ -33,7 +33,7 @@ type IamS3ApiConfigure struct {
}
type IamServerOption struct {
- Masters []pb.ServerAddress
+ Masters map[string]pb.ServerAddress
Filer pb.ServerAddress
Port int
GrpcDialOption grpc.DialOption
diff --git a/weed/pb/grpc_client_server.go b/weed/pb/grpc_client_server.go
index cedcc2813..50feb2e23 100644
--- a/weed/pb/grpc_client_server.go
+++ b/weed/pb/grpc_client_server.go
@@ -206,7 +206,7 @@ func WithMasterClient(streamingMode bool, master ServerAddress, grpcDialOption g
}
-func WithOneOfGrpcMasterClients(streamingMode bool, masterGrpcAddresses []ServerAddress, grpcDialOption grpc.DialOption, fn func(client master_pb.SeaweedClient) error) (err error) {
+func WithOneOfGrpcMasterClients(streamingMode bool, masterGrpcAddresses map[string]ServerAddress, grpcDialOption grpc.DialOption, fn func(client master_pb.SeaweedClient) error) (err error) {
for _, masterGrpcAddress := range masterGrpcAddresses {
err = WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
diff --git a/weed/pb/server_address.go b/weed/pb/server_address.go
index b60551c71..c7faea4bd 100644
--- a/weed/pb/server_address.go
+++ b/weed/pb/server_address.go
@@ -86,6 +86,14 @@ func (sa ServerAddresses) ToAddresses() (addresses []ServerAddress) {
return
}
+func (sa ServerAddresses) ToAddressMap() (addresses map[string]ServerAddress) {
+ addresses = make(map[string]ServerAddress)
+ for _, address := range sa.ToAddresses() {
+ addresses[address.String()] = address
+ }
+ return
+}
+
func (sa ServerAddresses) ToAddressStrings() (addresses []string) {
parts := strings.Split(string(sa), ",")
for _, address := range parts {
@@ -101,6 +109,13 @@ func ToAddressStrings(addresses []ServerAddress) []string {
}
return strings
}
+func ToAddressStringsFromMap(addresses map[string]ServerAddress) []string {
+ var strings []string
+ for _, addr := range addresses {
+ strings = append(strings, string(addr))
+ }
+ return strings
+}
func FromAddressStrings(strings []string) []ServerAddress {
var addresses []ServerAddress
for _, addr := range strings {
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go
index 3f65660ee..5a5714156 100644
--- a/weed/server/filer_grpc_server.go
+++ b/weed/server/filer_grpc_server.go
@@ -393,7 +393,7 @@ func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb.
clusterId, _ := fs.filer.Store.KvGet(context.Background(), []byte("clusterId"))
t := &filer_pb.GetFilerConfigurationResponse{
- Masters: pb.ToAddressStrings(fs.option.Masters),
+ Masters: pb.ToAddressStringsFromMap(fs.option.Masters),
Collection: fs.option.Collection,
Replication: fs.option.DefaultReplication,
MaxMb: uint32(fs.option.MaxMB),
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index 497f59568..7edd5870f 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -48,7 +48,7 @@ import (
)
type FilerOption struct {
- Masters []pb.ServerAddress
+ Masters map[string]pb.ServerAddress
Collection string
DefaultReplication string
DisableDirListing bool
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index 671432d5c..b63e3a418 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -75,7 +75,7 @@ type MasterServer struct {
Cluster *cluster.Cluster
}
-func NewMasterServer(r *mux.Router, option *MasterOption, peers []pb.ServerAddress) *MasterServer {
+func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.ServerAddress) *MasterServer {
v := util.GetViper()
signingKey := v.GetString("jwt.signing.key")
diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go
index f22b7c45d..648e20505 100644
--- a/weed/server/raft_server.go
+++ b/weed/server/raft_server.go
@@ -5,8 +5,6 @@ import (
"math/rand"
"os"
"path"
- "sort"
- "strings"
"time"
"google.golang.org/grpc"
@@ -21,7 +19,7 @@ import (
type RaftServerOption struct {
GrpcDialOption grpc.DialOption
- Peers []pb.ServerAddress
+ Peers map[string]pb.ServerAddress
ServerAddr pb.ServerAddress
DataDir string
Topo *topology.Topology
@@ -31,7 +29,7 @@ type RaftServerOption struct {
}
type RaftServer struct {
- peers []pb.ServerAddress // initial peers to join with
+ peers map[string]pb.ServerAddress // initial peers to join with
raftServer raft.Server
dataDir string
serverAddr pb.ServerAddress
@@ -108,23 +106,15 @@ func NewRaftServer(option *RaftServerOption) (*RaftServer, error) {
return nil, err
}
- for _, peer := range s.peers {
- if err := s.raftServer.AddPeer(string(peer), peer.ToGrpcAddress()); err != nil {
+ for name, peer := range s.peers {
+ if err := s.raftServer.AddPeer(name, peer.ToGrpcAddress()); err != nil {
return nil, err
}
}
// Remove deleted peers
for existsPeerName := range s.raftServer.Peers() {
- exists := false
- var existingPeer pb.ServerAddress
- for _, peer := range s.peers {
- if peer.String() == existsPeerName {
- exists, existingPeer = true, peer
- break
- }
- }
- if !exists {
+ if existingPeer, found := s.peers[existsPeerName]; !found {
if err := s.raftServer.RemovePeer(existsPeerName); err != nil {
glog.V(0).Infoln(err)
return nil, err
@@ -136,11 +126,6 @@ func NewRaftServer(option *RaftServerOption) (*RaftServer, error) {
s.GrpcServer = raft.NewGrpcServer(s.raftServer)
- if s.raftServer.IsLogEmpty() && isTheFirstOne(option.ServerAddr, s.peers) {
- // Initialize the server by joining itself.
- // s.DoJoinCommand()
- }
-
glog.V(0).Infof("current cluster leader: %v", s.raftServer.Leader())
return s, nil
@@ -156,16 +141,6 @@ func (s *RaftServer) Peers() (members []string) {
return
}
-func isTheFirstOne(self pb.ServerAddress, peers []pb.ServerAddress) bool {
- sort.Slice(peers, func(i, j int) bool {
- return strings.Compare(string(peers[i]), string(peers[j])) < 0
- })
- if len(peers) <= 0 {
- return true
- }
- return self == peers[0]
-}
-
func (s *RaftServer) DoJoinCommand() {
glog.V(0).Infoln("Initializing new cluster")
diff --git a/weed/shell/commands.go b/weed/shell/commands.go
index ec71edee0..3ff49f1d2 100644
--- a/weed/shell/commands.go
+++ b/weed/shell/commands.go
@@ -46,7 +46,7 @@ var (
func NewCommandEnv(options *ShellOptions) *CommandEnv {
ce := &CommandEnv{
env: make(map[string]string),
- MasterClient: wdclient.NewMasterClient(options.GrpcDialOption, pb.AdminShellClient, "", "", pb.ServerAddresses(*options.Masters).ToAddresses()),
+ MasterClient: wdclient.NewMasterClient(options.GrpcDialOption, pb.AdminShellClient, "", "", pb.ServerAddresses(*options.Masters).ToAddressMap()),
option: options,
}
ce.locker = exclusive_locks.NewExclusiveLocker(ce.MasterClient, "admin")
diff --git a/weed/storage/erasure_coding/ec_encoder.go b/weed/storage/erasure_coding/ec_encoder.go
index 34b639407..157149865 100644
--- a/weed/storage/erasure_coding/ec_encoder.go
+++ b/weed/storage/erasure_coding/ec_encoder.go
@@ -220,7 +220,7 @@ func encodeDatFile(remainingSize int64, err error, baseFileName string, bufferSi
processedSize += largeBlockSize * DataShardsCount
}
for remainingSize > 0 {
- encodeData(file, enc, processedSize, smallBlockSize, buffers, outputs)
+ err = encodeData(file, enc, processedSize, smallBlockSize, buffers, outputs)
if err != nil {
return fmt.Errorf("failed to encode small chunk data: %v", err)
}
diff --git a/weed/util/constants.go b/weed/util/constants.go
index ea10f32f5..db2e1e958 100644
--- a/weed/util/constants.go
+++ b/weed/util/constants.go
@@ -5,7 +5,7 @@ import (
)
var (
- VERSION_NUMBER = fmt.Sprintf("%.02f", 2.95)
+ VERSION_NUMBER = fmt.Sprintf("%.02f", 2.96)
VERSION = sizeLimit + " " + VERSION_NUMBER
COMMIT = ""
)
diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go
index 5280305e2..daf74c1be 100644
--- a/weed/wdclient/masterclient.go
+++ b/weed/wdclient/masterclient.go
@@ -18,7 +18,7 @@ type MasterClient struct {
clientType string
clientHost pb.ServerAddress
currentMaster pb.ServerAddress
- masters []pb.ServerAddress
+ masters map[string]pb.ServerAddress
grpcDialOption grpc.DialOption
vidMap
@@ -26,7 +26,7 @@ type MasterClient struct {
OnPeerUpdate func(update *master_pb.ClusterNodeUpdate)
}
-func NewMasterClient(grpcDialOption grpc.DialOption, clientType string, clientHost pb.ServerAddress, clientDataCenter string, masters []pb.ServerAddress) *MasterClient {
+func NewMasterClient(grpcDialOption grpc.DialOption, clientType string, clientHost pb.ServerAddress, clientDataCenter string, masters map[string]pb.ServerAddress) *MasterClient {
return &MasterClient{
clientType: clientType,
clientHost: clientHost,