diff options
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/master_grpc_server_volume.go | 102 | ||||
| -rw-r--r-- | weed/server/master_server.go | 22 | ||||
| -rw-r--r-- | weed/server/master_server_handlers.go | 20 | ||||
| -rw-r--r-- | weed/server/master_server_handlers_admin.go | 10 |
4 files changed, 116 insertions, 38 deletions
diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go index 156afd4a1..3a4951cc5 100644 --- a/weed/server/master_grpc_server_volume.go +++ b/weed/server/master_grpc_server_volume.go @@ -4,15 +4,68 @@ import ( "context" "fmt" "github.com/chrislusf/raft" - "github.com/chrislusf/seaweedfs/weed/storage/types" + "reflect" + "sync" + "time" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/super_block" + "github.com/chrislusf/seaweedfs/weed/storage/types" "github.com/chrislusf/seaweedfs/weed/topology" ) +func (ms *MasterServer) ProcessGrowRequest() { + go func() { + filter := sync.Map{} + for { + req, ok := <-ms.vgCh + if !ok { + break + } + + if !ms.Topo.IsLeader() { + //discard buffered requests + time.Sleep(time.Second * 1) + continue + } + + // filter out identical requests being processed + found := false + filter.Range(func(k, v interface{}) bool { + if reflect.DeepEqual(k, req) { + found = true + } + return !found + }) + + // not atomic but it's okay + if !found && ms.shouldVolumeGrow(req.Option) { + filter.Store(req, nil) + // we have lock called inside vg + go func() { + glog.V(1).Infoln("starting automatic volume grow") + start := time.Now() + _, err := ms.vg.AutomaticGrowByType(req.Option, ms.grpcDialOption, ms.Topo, req.Count) + glog.V(1).Infoln("finished automatic volume grow, cost ", time.Now().Sub(start)) + + if req.ErrCh != nil { + req.ErrCh <- err + close(req.ErrCh) + } + + filter.Delete(req) + }() + + } else { + glog.V(4).Infoln("discard volume grow request") + } + } + }() +} + func (ms *MasterServer) LookupVolume(ctx context.Context, req *master_pb.LookupVolumeRequest) (*master_pb.LookupVolumeResponse, error) { if !ms.Topo.IsLeader() { @@ -68,38 +121,45 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest ReplicaPlacement: replicaPlacement, Ttl: ttl, DiskType: diskType, - Prealloacte: ms.preallocateSize, + Preallocate: ms.preallocateSize, DataCenter: req.DataCenter, Rack: req.Rack, DataNode: req.DataNode, MemoryMapMaxSizeMb: req.MemoryMapMaxSizeMb, } - if !ms.Topo.HasWritableVolume(option) { + if ms.shouldVolumeGrow(option) { if ms.Topo.AvailableSpaceFor(option) <= 0 { return nil, fmt.Errorf("no free volumes left for " + option.String()) } - ms.vgLock.Lock() - if !ms.Topo.HasWritableVolume(option) { - if _, err = ms.vg.AutomaticGrowByType(option, ms.grpcDialOption, ms.Topo, int(req.WritableVolumeCount)); err != nil { - ms.vgLock.Unlock() - return nil, fmt.Errorf("Cannot grow volume group! %v", err) - } + ms.vgCh <- &topology.VolumeGrowRequest{ + Option: option, + Count: int(req.WritableVolumeCount), } - ms.vgLock.Unlock() - } - fid, count, dn, err := ms.Topo.PickForWrite(req.Count, option) - if err != nil { - return nil, fmt.Errorf("%v", err) } - return &master_pb.AssignResponse{ - Fid: fid, - Url: dn.Url(), - PublicUrl: dn.PublicUrl, - Count: count, - Auth: string(security.GenJwt(ms.guard.SigningKey, ms.guard.ExpiresAfterSec, fid)), - }, nil + var ( + lastErr error + maxTimeout = time.Second * 10 + startTime = time.Now() + ) + + for time.Now().Sub(startTime) < maxTimeout { + fid, count, dn, err := ms.Topo.PickForWrite(req.Count, option) + if err == nil { + return &master_pb.AssignResponse{ + Fid: fid, + Url: dn.Url(), + PublicUrl: dn.PublicUrl, + Count: count, + Auth: string(security.GenJwt(ms.guard.SigningKey, ms.guard.ExpiresAfterSec, fid)), + }, nil + } + //glog.V(4).Infoln("waiting for volume growing...") + lastErr = err + time.Sleep(200 * time.Millisecond) + } + return nil, lastErr } func (ms *MasterServer) Statistics(ctx context.Context, req *master_pb.StatisticsRequest) (*master_pb.StatisticsResponse, error) { diff --git a/weed/server/master_server.go b/weed/server/master_server.go index e2b2df18d..838803908 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -51,9 +51,9 @@ type MasterServer struct { preallocateSize int64 - Topo *topology.Topology - vg *topology.VolumeGrowth - vgLock sync.Mutex + Topo *topology.Topology + vg *topology.VolumeGrowth + vgCh chan *topology.VolumeGrowRequest boundedLeaderChan chan int @@ -82,6 +82,12 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste v.SetDefault("master.replication.treat_replication_as_minimums", false) replicationAsMin := v.GetBool("master.replication.treat_replication_as_minimums") + v.SetDefault("master.volume_growth.copy_1", 7) + v.SetDefault("master.volume_growth.copy_2", 6) + v.SetDefault("master.volume_growth.copy_3", 3) + v.SetDefault("master.volume_growth.copy_other", 1) + v.SetDefault("master.volume_growth.threshold", 0.9) + var preallocateSize int64 if option.VolumePreallocate { preallocateSize = int64(option.VolumeSizeLimitMB) * (1 << 20) @@ -91,6 +97,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste ms := &MasterServer{ option: option, preallocateSize: preallocateSize, + vgCh: make(chan *topology.VolumeGrowRequest, 1 << 6), clientChans: make(map[string]chan *master_pb.VolumeLocation), grpcDialOption: grpcDialOption, MasterClient: wdclient.NewMasterClient(grpcDialOption, "master", option.Host, 0, "", peers), @@ -128,7 +135,14 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste r.HandleFunc("/{fileId}", ms.redirectHandler) } - ms.Topo.StartRefreshWritableVolumes(ms.grpcDialOption, ms.option.GarbageThreshold, ms.preallocateSize) + ms.Topo.StartRefreshWritableVolumes( + ms.grpcDialOption, + ms.option.GarbageThreshold, + v.GetFloat64("master.volume_growth.threshold"), + ms.preallocateSize, + ) + + ms.ProcessGrowRequest() ms.startAdminScripts() diff --git a/weed/server/master_server_handlers.go b/weed/server/master_server_handlers.go index a9fecc5bd..974b3308f 100644 --- a/weed/server/master_server_handlers.go +++ b/weed/server/master_server_handlers.go @@ -10,6 +10,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/topology" ) func (ms *MasterServer) lookupVolumeId(vids []string, collection string) (volumeLocations map[string]operation.LookupResult) { @@ -111,19 +112,20 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request) return } - if !ms.Topo.HasWritableVolume(option) { + if ms.shouldVolumeGrow(option) { if ms.Topo.AvailableSpaceFor(option) <= 0 { writeJsonQuiet(w, r, http.StatusNotFound, operation.AssignResult{Error: "No free volumes left for " + option.String()}) return } - ms.vgLock.Lock() - defer ms.vgLock.Unlock() - if !ms.Topo.HasWritableVolume(option) { - if _, err = ms.vg.AutomaticGrowByType(option, ms.grpcDialOption, ms.Topo, writableVolumeCount); err != nil { - writeJsonError(w, r, http.StatusInternalServerError, - fmt.Errorf("Cannot grow volume group! %v", err)) - return - } + errCh := make(chan error, 1) + ms.vgCh <- &topology.VolumeGrowRequest{ + Option: option, + Count: writableVolumeCount, + ErrCh: errCh, + } + if err := <- errCh; err != nil { + writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("cannot grow volume group! %v", err)) + return } } fid, count, dn, err := ms.Topo.PickForWrite(requestedCount, option) diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go index f24d4e924..fb16ef78c 100644 --- a/weed/server/master_server_handlers_admin.go +++ b/weed/server/master_server_handlers_admin.go @@ -3,7 +3,6 @@ package weed_server import ( "context" "fmt" - "github.com/chrislusf/seaweedfs/weed/storage/types" "math/rand" "net/http" "strconv" @@ -14,6 +13,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/storage/backend/memory_map" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/super_block" + "github.com/chrislusf/seaweedfs/weed/storage/types" "github.com/chrislusf/seaweedfs/weed/topology" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -136,9 +136,11 @@ func (ms *MasterServer) submitFromMasterServerHandler(w http.ResponseWriter, r * } } -func (ms *MasterServer) HasWritableVolume(option *topology.VolumeGrowOption) bool { +func (ms *MasterServer) shouldVolumeGrow(option *topology.VolumeGrowOption) bool { vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType) - return vl.GetActiveVolumeCount(option) > 0 + active, high := vl.GetActiveVolumeCount(option) + //glog.V(0).Infof("active volume: %d, high usage volume: %d\n", active, high) + return active <= high } func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGrowOption, error) { @@ -172,7 +174,7 @@ func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGr ReplicaPlacement: replicaPlacement, Ttl: ttl, DiskType: diskType, - Prealloacte: preallocate, + Preallocate: preallocate, DataCenter: r.FormValue("dataCenter"), Rack: r.FormValue("rack"), DataNode: r.FormValue("dataNode"), |
