diff options
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/common.go | 1 | ||||
| -rw-r--r-- | weed/server/filer_grpc_server.go | 6 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write.go | 8 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write_autochunk.go | 1 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write_cipher.go | 1 | ||||
| -rw-r--r-- | weed/server/master_grpc_server.go | 8 | ||||
| -rw-r--r-- | weed/server/master_grpc_server_volume.go | 10 | ||||
| -rw-r--r-- | weed/server/master_server_handlers_admin.go | 8 | ||||
| -rw-r--r-- | weed/server/volume_server.go | 6 | ||||
| -rw-r--r-- | weed/server/webdav_server.go | 2 |
10 files changed, 38 insertions, 13 deletions
diff --git a/weed/server/common.go b/weed/server/common.go index 58079032e..852fffb5e 100644 --- a/weed/server/common.go +++ b/weed/server/common.go @@ -131,6 +131,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st Replication: r.FormValue("replication"), Collection: r.FormValue("collection"), Ttl: r.FormValue("ttl"), + VolumeType: r.FormValue("volumeType"), } assignResult, ae := operation.Assign(masterUrl, grpcDialOption, ar) if ae != nil { diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index 5f1b2d819..bc6d0ceb9 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -265,6 +265,7 @@ func (fs *FilerServer) cleanupChunks(fullpath string, existingEntry *filer.Entry newEntry.Attributes.Collection, newEntry.Attributes.Replication, newEntry.Attributes.TtlSec, + newEntry.Attributes.VolumeType, "", "", ) @@ -308,7 +309,7 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo } entry.Chunks = append(entry.Chunks, req.Chunks...) - so := fs.detectStorageOption(string(fullpath), entry.Collection, entry.Replication, entry.TtlSec, "", "") + so := fs.detectStorageOption(string(fullpath), entry.Collection, entry.Replication, entry.TtlSec, entry.VolumeType, "", "") entry.Chunks, err = filer.MaybeManifestize(fs.saveAsChunk(so), entry.Chunks) if err != nil { // not good, but should be ok @@ -334,7 +335,7 @@ func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntr func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVolumeRequest) (resp *filer_pb.AssignVolumeResponse, err error) { - so := fs.detectStorageOption(req.Path, req.Collection, req.Replication, req.TtlSec, req.DataCenter, req.Rack) + so := fs.detectStorageOption(req.Path, req.Collection, req.Replication, req.TtlSec, req.VolumeType, req.DataCenter, req.Rack) assignRequest, altRequest := so.ToAssignRequests(int(req.Count)) @@ -404,6 +405,7 @@ func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsR Replication: req.Replication, Collection: req.Collection, Ttl: req.Ttl, + VolumeType: req.VolumeType, }) if grpcErr != nil { return grpcErr diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index 9131b042b..e6fc31d8e 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -61,6 +61,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { query.Get("collection"), query.Get("replication"), query.Get("ttl"), + query.Get("volumeType"), query.Get("dataCenter"), query.Get("rack"), ) @@ -104,7 +105,7 @@ func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNoContent) } -func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication string, ttlSeconds int32, dataCenter, rack string) *operation.StorageOption { +func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication string, ttlSeconds int32, volumeType string, dataCenter, rack string) *operation.StorageOption { collection := util.Nvl(qCollection, fs.option.Collection) replication := util.Nvl(qReplication, fs.option.DefaultReplication) @@ -134,17 +135,18 @@ func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication DataCenter: util.Nvl(dataCenter, fs.option.DataCenter), Rack: util.Nvl(rack, fs.option.Rack), TtlSeconds: ttlSeconds, + VolumeType: volumeType, Fsync: fsync || rule.Fsync, VolumeGrowthCount: rule.VolumeGrowthCount, } } -func (fs *FilerServer) detectStorageOption0(requestURI, qCollection, qReplication string, qTtl string, dataCenter, rack string) *operation.StorageOption { +func (fs *FilerServer) detectStorageOption0(requestURI, qCollection, qReplication string, qTtl string, volumeType string, dataCenter, rack string) *operation.StorageOption { ttl, err := needle.ReadTTL(qTtl) if err != nil { glog.Errorf("fail to parse ttl %s: %v", qTtl, err) } - return fs.detectStorageOption(requestURI, qCollection, qReplication, int32(ttl.Minutes())*60, dataCenter, rack) + return fs.detectStorageOption(requestURI, qCollection, qReplication, int32(ttl.Minutes())*60, volumeType, dataCenter, rack) } diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index 0f6d0dc47..9f88bd202 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -159,6 +159,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa Replication: so.Replication, Collection: so.Collection, TtlSec: so.TtlSeconds, + VolumeType: so.VolumeType, Mime: contentType, Md5: md5bytes, FileSize: uint64(chunkOffset), diff --git a/weed/server/filer_server_handlers_write_cipher.go b/weed/server/filer_server_handlers_write_cipher.go index 3cc0d0c41..c5e8a74d8 100644 --- a/weed/server/filer_server_handlers_write_cipher.go +++ b/weed/server/filer_server_handlers_write_cipher.go @@ -68,6 +68,7 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht Replication: so.Replication, Collection: so.Collection, TtlSec: so.TtlSeconds, + VolumeType: so.VolumeType, Mime: pu.MimeType, Md5: util.Base64Md5ToBytes(pu.ContentMd5), }, diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index 9df88e956..62b8cdf07 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -67,9 +67,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ dcName, rackName := ms.Topo.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack) dc := ms.Topo.GetOrCreateDataCenter(dcName) rack := dc.GetOrCreateRack(rackName) - dn = rack.GetOrCreateDataNode(heartbeat.Ip, - int(heartbeat.Port), heartbeat.PublicUrl, - int64(heartbeat.MaxVolumeCount)) + dn = rack.GetOrCreateDataNode(heartbeat.Ip, int(heartbeat.Port), heartbeat.PublicUrl, int64(heartbeat.MaxVolumeCount), int64(heartbeat.MaxSsdVolumeCount)) glog.V(0).Infof("added volume server %v:%d", heartbeat.GetIp(), heartbeat.GetPort()) if err := stream.Send(&master_pb.HeartbeatResponse{ VolumeSizeLimit: uint64(ms.option.VolumeSizeLimitMB) * 1024 * 1024, @@ -83,6 +81,10 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ delta := int64(heartbeat.MaxVolumeCount) - dn.GetMaxVolumeCount() dn.UpAdjustMaxVolumeCountDelta(delta) } + if heartbeat.MaxSsdVolumeCount != 0 && dn.GetMaxSsdVolumeCount() != int64(heartbeat.MaxSsdVolumeCount) { + delta := int64(heartbeat.MaxSsdVolumeCount) - dn.GetMaxSsdVolumeCount() + dn.UpAdjustMaxSsdVolumeCountDelta(delta) + } glog.V(4).Infof("master received heartbeat %s", heartbeat.String()) message := &master_pb.VolumeLocation{ diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go index 20a22d126..e84d9b386 100644 --- a/weed/server/master_grpc_server_volume.go +++ b/weed/server/master_grpc_server_volume.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "github.com/chrislusf/raft" + "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/security" @@ -60,11 +61,16 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest if err != nil { return nil, err } + volumeType, err := storage.ToVolumeType(req.VolumeType) + if err != nil { + return nil, err + } option := &topology.VolumeGrowOption{ Collection: req.Collection, ReplicaPlacement: replicaPlacement, Ttl: ttl, + VolumeType: volumeType, Prealloacte: ms.preallocateSize, DataCenter: req.DataCenter, Rack: req.Rack, @@ -117,10 +123,10 @@ func (ms *MasterServer) Statistics(ctx context.Context, req *master_pb.Statistic return nil, err } - volumeLayout := ms.Topo.GetVolumeLayout(req.Collection, replicaPlacement, ttl) + volumeLayout := ms.Topo.GetVolumeLayout(req.Collection, replicaPlacement, ttl, storage.VolumeType(req.VolumeType)) stats := volumeLayout.Stats() - totalSize := ms.Topo.GetMaxVolumeCount() * int64(ms.option.VolumeSizeLimitMB) * 1024 * 1024 + totalSize := (ms.Topo.GetMaxVolumeCount() + ms.Topo.GetMaxSsdVolumeCount()) * int64(ms.option.VolumeSizeLimitMB) * 1024 * 1024 resp := &master_pb.StatisticsResponse{ TotalSize: uint64(totalSize), diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go index 34235384f..001ff9cdc 100644 --- a/weed/server/master_server_handlers_admin.go +++ b/weed/server/master_server_handlers_admin.go @@ -3,6 +3,7 @@ package weed_server import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/storage" "math/rand" "net/http" "strconv" @@ -136,7 +137,7 @@ func (ms *MasterServer) submitFromMasterServerHandler(w http.ResponseWriter, r * } func (ms *MasterServer) HasWritableVolume(option *topology.VolumeGrowOption) bool { - vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl) + vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.VolumeType) return vl.GetActiveVolumeCount(option) > 0 } @@ -157,6 +158,10 @@ func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGr if err != nil { return nil, err } + volumeType, err := storage.ToVolumeType(r.FormValue("volumeType")) + if err != nil { + return nil, err + } preallocate := ms.preallocateSize if r.FormValue("preallocate") != "" { @@ -169,6 +174,7 @@ func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGr Collection: r.FormValue("collection"), ReplicaPlacement: replicaPlacement, Ttl: ttl, + VolumeType: volumeType, Prealloacte: preallocate, DataCenter: r.FormValue("dataCenter"), Rack: r.FormValue("rack"), diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go index 468f75890..16f4198e8 100644 --- a/weed/server/volume_server.go +++ b/weed/server/volume_server.go @@ -20,6 +20,7 @@ type VolumeServer struct { pulseSeconds int dataCenter string rack string + VolumeType storage.VolumeType store *storage.Store guard *security.Guard grpcDialOption grpc.DialOption @@ -38,7 +39,7 @@ type VolumeServer struct { func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, port int, publicUrl string, folders []string, maxCounts []int, minFreeSpacePercents []float32, - idxFolder string, + idxFolder string, volumeType storage.VolumeType, needleMapKind storage.NeedleMapType, masterNodes []string, pulseSeconds int, dataCenter string, rack string, @@ -63,6 +64,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, pulseSeconds: pulseSeconds, dataCenter: dataCenter, rack: rack, + VolumeType: volumeType, needleMapKind: needleMapKind, FixJpgOrientation: fixJpgOrientation, ReadRedirect: readRedirect, @@ -76,7 +78,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, vs.checkWithMaster() - vs.store = storage.NewStore(vs.grpcDialOption, port, ip, publicUrl, folders, maxCounts, minFreeSpacePercents, idxFolder, vs.needleMapKind) + vs.store = storage.NewStore(vs.grpcDialOption, port, ip, publicUrl, folders, maxCounts, minFreeSpacePercents, idxFolder, vs.needleMapKind, vs.VolumeType) vs.guard = security.NewGuard(whiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec) handleStaticResources(adminMux) diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index 2b238e534..329560d3f 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -33,6 +33,7 @@ type WebDavOption struct { BucketsPath string GrpcDialOption grpc.DialOption Collection string + VolumeType string Uid uint32 Gid uint32 Cipher bool @@ -378,6 +379,7 @@ func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64 Count: 1, Replication: "", Collection: f.fs.option.Collection, + VolumeType: f.fs.option.VolumeType, Path: name, } |
