aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2019-11-11 21:30:14 -0800
committerChris Lu <chris.lu@gmail.com>2019-11-11 21:30:14 -0800
commitee90236a972acea8722dccfa1b04926d551fc82d (patch)
tree7890d2482177caad2fa72b741435bcefff140cf3 /weed/server
parent85f8649320033a46e71b20a588930013c00b3fdf (diff)
parentd7852ebb3b8900baeff3d806f8fe2404c511aa5d (diff)
downloadseaweedfs-ee90236a972acea8722dccfa1b04926d551fc82d.tar.xz
seaweedfs-ee90236a972acea8722dccfa1b04926d551fc82d.zip
Merge branch 'master' into refactoring_dat_backend
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/master_grpc_server_volume.go2
-rw-r--r--weed/server/master_server.go38
-rw-r--r--weed/server/master_server_handlers.go7
3 files changed, 42 insertions, 5 deletions
diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go
index 0580acf76..8fc56e9b8 100644
--- a/weed/server/master_grpc_server_volume.go
+++ b/weed/server/master_grpc_server_volume.go
@@ -78,7 +78,7 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
}
ms.vgLock.Lock()
if !ms.Topo.HasWritableVolume(option) {
- if _, err = ms.vg.AutomaticGrowByType(option, ms.grpcDialOption, ms.Topo); err != nil {
+ if _, err = ms.vg.AutomaticGrowByType(option, ms.grpcDialOption, ms.Topo, int(req.WritableVolumeCount)); err != nil {
ms.vgLock.Unlock()
return nil, fmt.Errorf("Cannot grow volume group! %v", err)
}
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index cde583560..15e6ee51c 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -27,6 +27,12 @@ import (
"google.golang.org/grpc"
)
+const (
+ MasterPrefix = "master.maintenance"
+ SequencerType = MasterPrefix + ".sequencer_type"
+ SequencerEtcdUrls = MasterPrefix + ".sequencer_etcd_urls"
+)
+
type MasterOption struct {
Port int
MetaFolder string
@@ -87,7 +93,11 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste
MasterClient: wdclient.NewMasterClient(context.Background(), grpcDialOption, "master", peers),
}
ms.bounedLeaderChan = make(chan int, 16)
- seq := sequence.NewMemorySequencer()
+
+ seq := ms.createSequencer(option)
+ if nil == seq {
+ glog.Fatalf("create sequencer failed.")
+ }
ms.Topo = topology.NewTopology("topo", seq, uint64(ms.option.VolumeSizeLimitMB)*1024*1024, ms.option.PulseSeconds)
ms.vg = topology.NewDefaultVolumeGrowth()
glog.V(0).Infoln("Volume Size Limit is", ms.option.VolumeSizeLimitMB, "MB")
@@ -165,8 +175,8 @@ func (ms *MasterServer) proxyToLeader(f func(w http.ResponseWriter, r *http.Requ
proxy.Transport = util.Transport
proxy.ServeHTTP(w, r)
} else {
- //drop it to the floor
- //writeJsonError(w, r, errors.New(ms.Topo.RaftServer.Name()+" does not know Leader yet:"+ms.Topo.RaftServer.Leader()))
+ // drop it to the floor
+ // writeJsonError(w, r, errors.New(ms.Topo.RaftServer.Name()+" does not know Leader yet:"+ms.Topo.RaftServer.Leader()))
}
}
}
@@ -230,3 +240,25 @@ func (ms *MasterServer) startAdminScripts() {
}
}()
}
+
+func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer {
+ var seq sequence.Sequencer
+ seqType := strings.ToLower(util.Config().GetString(SequencerType))
+ glog.V(0).Infof("[%s] : [%s]", SequencerType, seqType)
+ switch strings.ToLower(seqType) {
+ case "memory":
+ seq = sequence.NewMemorySequencer()
+ case "etcd":
+ var err error
+ urls := util.Config().GetString(SequencerEtcdUrls)
+ glog.V(0).Infof("[%s] : [%s]", SequencerEtcdUrls, urls)
+ seq, err = sequence.NewEtcdSequencer(urls, option.MetaFolder)
+ if err != nil {
+ glog.Error(err)
+ seq = nil
+ }
+ default:
+ seq = sequence.NewMemorySequencer()
+ }
+ return seq
+}
diff --git a/weed/server/master_server_handlers.go b/weed/server/master_server_handlers.go
index 9bcd35ced..514d86800 100644
--- a/weed/server/master_server_handlers.go
+++ b/weed/server/master_server_handlers.go
@@ -100,6 +100,11 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request)
requestedCount = 1
}
+ writableVolumeCount, e := strconv.Atoi(r.FormValue("writableVolumeCount"))
+ if e != nil {
+ writableVolumeCount = 0
+ }
+
option, err := ms.getVolumeGrowOption(r)
if err != nil {
writeJsonQuiet(w, r, http.StatusNotAcceptable, operation.AssignResult{Error: err.Error()})
@@ -114,7 +119,7 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request)
ms.vgLock.Lock()
defer ms.vgLock.Unlock()
if !ms.Topo.HasWritableVolume(option) {
- if _, err = ms.vg.AutomaticGrowByType(option, ms.grpcDialOption, ms.Topo); err != nil {
+ if _, err = ms.vg.AutomaticGrowByType(option, ms.grpcDialOption, ms.Topo, writableVolumeCount); err != nil {
writeJsonError(w, r, http.StatusInternalServerError,
fmt.Errorf("Cannot grow volume group! %v", err))
return