diff options
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/common.go | 1 | ||||
| -rw-r--r-- | weed/server/filer_grpc_server.go | 6 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write.go | 8 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write_autochunk.go | 1 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write_cipher.go | 1 | ||||
| -rw-r--r-- | weed/server/master_grpc_server.go | 8 | ||||
| -rw-r--r-- | weed/server/master_grpc_server_volume.go | 12 | ||||
| -rw-r--r-- | weed/server/master_server_handlers.go | 2 | ||||
| -rw-r--r-- | weed/server/master_server_handlers_admin.go | 12 | ||||
| -rw-r--r-- | weed/server/volume_grpc_admin.go | 2 | ||||
| -rw-r--r-- | weed/server/volume_grpc_copy.go | 11 | ||||
| -rw-r--r-- | weed/server/volume_grpc_erasure_coding.go | 2 | ||||
| -rw-r--r-- | weed/server/volume_server.go | 4 | ||||
| -rw-r--r-- | weed/server/volume_server_handlers_admin.go | 8 | ||||
| -rw-r--r-- | weed/server/volume_server_handlers_ui.go | 4 | ||||
| -rw-r--r-- | weed/server/volume_server_ui/templates.go | 2 | ||||
| -rw-r--r-- | weed/server/webdav_server.go | 2 |
17 files changed, 60 insertions, 26 deletions
diff --git a/weed/server/common.go b/weed/server/common.go index cf9547950..7d32d6a9a 100644 --- a/weed/server/common.go +++ b/weed/server/common.go @@ -131,6 +131,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st Replication: r.FormValue("replication"), Collection: r.FormValue("collection"), Ttl: r.FormValue("ttl"), + DiskType: r.FormValue("disk"), } assignResult, ae := operation.Assign(masterUrl, grpcDialOption, ar) if ae != nil { diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index b0563d8bd..04145f2f9 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -263,6 +263,7 @@ func (fs *FilerServer) cleanupChunks(fullpath string, existingEntry *filer.Entry newEntry.Attributes.Collection, newEntry.Attributes.Replication, newEntry.Attributes.TtlSec, + newEntry.Attributes.DiskType, "", "", ) @@ -306,7 +307,7 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo } entry.Chunks = append(entry.Chunks, req.Chunks...) - so := fs.detectStorageOption(string(fullpath), entry.Collection, entry.Replication, entry.TtlSec, "", "") + so := fs.detectStorageOption(string(fullpath), entry.Collection, entry.Replication, entry.TtlSec, entry.DiskType, "", "") entry.Chunks, err = filer.MaybeManifestize(fs.saveAsChunk(so), entry.Chunks) if err != nil { // not good, but should be ok @@ -332,7 +333,7 @@ func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntr func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVolumeRequest) (resp *filer_pb.AssignVolumeResponse, err error) { - so := fs.detectStorageOption(req.Path, req.Collection, req.Replication, req.TtlSec, req.DataCenter, req.Rack) + so := fs.detectStorageOption(req.Path, req.Collection, req.Replication, req.TtlSec, req.DiskType, req.DataCenter, req.Rack) assignRequest, altRequest := so.ToAssignRequests(int(req.Count)) @@ -402,6 +403,7 @@ func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsR Replication: req.Replication, Collection: req.Collection, Ttl: req.Ttl, + DiskType: req.DiskType, }) if grpcErr != nil { return grpcErr diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index 6f33e8494..28dff07c6 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -61,6 +61,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { query.Get("collection"), query.Get("replication"), query.Get("ttl"), + query.Get("disk"), query.Get("dataCenter"), query.Get("rack"), ) @@ -104,7 +105,7 @@ func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNoContent) } -func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication string, ttlSeconds int32, dataCenter, rack string) *operation.StorageOption { +func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication string, ttlSeconds int32, diskType string, dataCenter, rack string) *operation.StorageOption { collection := util.Nvl(qCollection, fs.option.Collection) replication := util.Nvl(qReplication, fs.option.DefaultReplication) @@ -134,17 +135,18 @@ func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication DataCenter: util.Nvl(dataCenter, fs.option.DataCenter), Rack: util.Nvl(rack, fs.option.Rack), TtlSeconds: ttlSeconds, + DiskType: util.Nvl(diskType, rule.DiskType), Fsync: fsync || rule.Fsync, VolumeGrowthCount: rule.VolumeGrowthCount, } } -func (fs *FilerServer) detectStorageOption0(requestURI, qCollection, qReplication string, qTtl string, dataCenter, rack string) *operation.StorageOption { +func (fs *FilerServer) detectStorageOption0(requestURI, qCollection, qReplication string, qTtl string, diskType string, dataCenter, rack string) *operation.StorageOption { ttl, err := needle.ReadTTL(qTtl) if err != nil { glog.Errorf("fail to parse ttl %s: %v", qTtl, err) } - return fs.detectStorageOption(requestURI, qCollection, qReplication, int32(ttl.Minutes())*60, dataCenter, rack) + return fs.detectStorageOption(requestURI, qCollection, qReplication, int32(ttl.Minutes())*60, diskType, dataCenter, rack) } diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index ec1ecbe0d..145989445 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -186,6 +186,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa Replication: so.Replication, Collection: so.Collection, TtlSec: so.TtlSeconds, + DiskType: so.DiskType, Mime: contentType, Md5: md5bytes, FileSize: uint64(chunkOffset), diff --git a/weed/server/filer_server_handlers_write_cipher.go b/weed/server/filer_server_handlers_write_cipher.go index 3cc0d0c41..8334d1618 100644 --- a/weed/server/filer_server_handlers_write_cipher.go +++ b/weed/server/filer_server_handlers_write_cipher.go @@ -68,6 +68,7 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht Replication: so.Replication, Collection: so.Collection, TtlSec: so.TtlSeconds, + DiskType: so.DiskType, Mime: pu.MimeType, Md5: util.Base64Md5ToBytes(pu.ContentMd5), }, diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index 9df88e956..62b8cdf07 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -67,9 +67,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ dcName, rackName := ms.Topo.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack) dc := ms.Topo.GetOrCreateDataCenter(dcName) rack := dc.GetOrCreateRack(rackName) - dn = rack.GetOrCreateDataNode(heartbeat.Ip, - int(heartbeat.Port), heartbeat.PublicUrl, - int64(heartbeat.MaxVolumeCount)) + dn = rack.GetOrCreateDataNode(heartbeat.Ip, int(heartbeat.Port), heartbeat.PublicUrl, int64(heartbeat.MaxVolumeCount), int64(heartbeat.MaxSsdVolumeCount)) glog.V(0).Infof("added volume server %v:%d", heartbeat.GetIp(), heartbeat.GetPort()) if err := stream.Send(&master_pb.HeartbeatResponse{ VolumeSizeLimit: uint64(ms.option.VolumeSizeLimitMB) * 1024 * 1024, @@ -83,6 +81,10 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ delta := int64(heartbeat.MaxVolumeCount) - dn.GetMaxVolumeCount() dn.UpAdjustMaxVolumeCountDelta(delta) } + if heartbeat.MaxSsdVolumeCount != 0 && dn.GetMaxSsdVolumeCount() != int64(heartbeat.MaxSsdVolumeCount) { + delta := int64(heartbeat.MaxSsdVolumeCount) - dn.GetMaxSsdVolumeCount() + dn.UpAdjustMaxSsdVolumeCountDelta(delta) + } glog.V(4).Infof("master received heartbeat %s", heartbeat.String()) message := &master_pb.VolumeLocation{ diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go index 20a22d126..f1d65896a 100644 --- a/weed/server/master_grpc_server_volume.go +++ b/weed/server/master_grpc_server_volume.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "github.com/chrislusf/raft" + "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/security" @@ -60,11 +61,16 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest if err != nil { return nil, err } + diskType, err := storage.ToDiskType(req.DiskType) + if err != nil { + return nil, err + } option := &topology.VolumeGrowOption{ Collection: req.Collection, ReplicaPlacement: replicaPlacement, Ttl: ttl, + DiskType: diskType, Prealloacte: ms.preallocateSize, DataCenter: req.DataCenter, Rack: req.Rack, @@ -73,7 +79,7 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest } if !ms.Topo.HasWritableVolume(option) { - if ms.Topo.FreeSpace() <= 0 { + if ms.Topo.AvailableSpaceFor(option) <= 0 { return nil, fmt.Errorf("No free volumes left!") } ms.vgLock.Lock() @@ -117,10 +123,10 @@ func (ms *MasterServer) Statistics(ctx context.Context, req *master_pb.Statistic return nil, err } - volumeLayout := ms.Topo.GetVolumeLayout(req.Collection, replicaPlacement, ttl) + volumeLayout := ms.Topo.GetVolumeLayout(req.Collection, replicaPlacement, ttl, storage.DiskType(req.DiskType)) stats := volumeLayout.Stats() - totalSize := ms.Topo.GetMaxVolumeCount() * int64(ms.option.VolumeSizeLimitMB) * 1024 * 1024 + totalSize := (ms.Topo.GetMaxVolumeCount() + ms.Topo.GetMaxSsdVolumeCount()) * int64(ms.option.VolumeSizeLimitMB) * 1024 * 1024 resp := &master_pb.StatisticsResponse{ TotalSize: uint64(totalSize), diff --git a/weed/server/master_server_handlers.go b/weed/server/master_server_handlers.go index ebcb7efd2..dc3df2348 100644 --- a/weed/server/master_server_handlers.go +++ b/weed/server/master_server_handlers.go @@ -112,7 +112,7 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request) } if !ms.Topo.HasWritableVolume(option) { - if ms.Topo.FreeSpace() <= 0 { + if ms.Topo.AvailableSpaceFor(option) <= 0 { writeJsonQuiet(w, r, http.StatusNotFound, operation.AssignResult{Error: "No free volumes left!"}) return } diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go index 34235384f..7b1ec1f87 100644 --- a/weed/server/master_server_handlers_admin.go +++ b/weed/server/master_server_handlers_admin.go @@ -3,6 +3,7 @@ package weed_server import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/storage" "math/rand" "net/http" "strconv" @@ -75,8 +76,8 @@ func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request } if count, err = strconv.Atoi(r.FormValue("count")); err == nil { - if ms.Topo.FreeSpace() < int64(count*option.ReplicaPlacement.GetCopyCount()) { - err = fmt.Errorf("only %d volumes left, not enough for %d", ms.Topo.FreeSpace(), count*option.ReplicaPlacement.GetCopyCount()) + if ms.Topo.AvailableSpaceFor(option) < int64(count*option.ReplicaPlacement.GetCopyCount()) { + err = fmt.Errorf("only %d volumes left, not enough for %d", ms.Topo.AvailableSpaceFor(option), count*option.ReplicaPlacement.GetCopyCount()) } else { count, err = ms.vg.GrowByCountAndType(ms.grpcDialOption, count, option, ms.Topo) } @@ -136,7 +137,7 @@ func (ms *MasterServer) submitFromMasterServerHandler(w http.ResponseWriter, r * } func (ms *MasterServer) HasWritableVolume(option *topology.VolumeGrowOption) bool { - vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl) + vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType) return vl.GetActiveVolumeCount(option) > 0 } @@ -157,6 +158,10 @@ func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGr if err != nil { return nil, err } + diskType, err := storage.ToDiskType(r.FormValue("disk")) + if err != nil { + return nil, err + } preallocate := ms.preallocateSize if r.FormValue("preallocate") != "" { @@ -169,6 +174,7 @@ func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGr Collection: r.FormValue("collection"), ReplicaPlacement: replicaPlacement, Ttl: ttl, + DiskType: diskType, Prealloacte: preallocate, DataCenter: r.FormValue("dataCenter"), Rack: r.FormValue("rack"), diff --git a/weed/server/volume_grpc_admin.go b/weed/server/volume_grpc_admin.go index 9296c63e9..55bb6573e 100644 --- a/weed/server/volume_grpc_admin.go +++ b/weed/server/volume_grpc_admin.go @@ -3,6 +3,7 @@ package weed_server import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/storage" "path/filepath" "github.com/chrislusf/seaweedfs/weed/glog" @@ -41,6 +42,7 @@ func (vs *VolumeServer) AllocateVolume(ctx context.Context, req *volume_server_p req.Ttl, req.Preallocate, req.MemoryMapMaxSizeMb, + storage.DiskType(req.DiskType), ) if err != nil { diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go index cfa3710a8..26c6b28e7 100644 --- a/weed/server/volume_grpc_copy.go +++ b/weed/server/volume_grpc_copy.go @@ -36,11 +36,6 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo glog.V(0).Infof("deleted existing volume %d before copying.", req.VolumeId) } - location := vs.store.FindFreeLocation() - if location == nil { - return nil, fmt.Errorf("no space left") - } - // the master will not start compaction for read-only volumes, so it is safe to just copy files directly // copy .dat and .idx files // read .idx .dat file size and timestamp @@ -59,6 +54,11 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo return fmt.Errorf("read volume file status failed, %v", err) } + location := vs.store.FindFreeLocation(storage.DiskType(volFileInfoResp.DiskType)) + if location == nil { + return fmt.Errorf("no space left") + } + dataBaseFileName = storage.VolumeFileName(location.Directory, volFileInfoResp.Collection, int(req.VolumeId)) indexBaseFileName = storage.VolumeFileName(location.IdxDirectory, volFileInfoResp.Collection, int(req.VolumeId)) @@ -206,6 +206,7 @@ func (vs *VolumeServer) ReadVolumeFileStatus(ctx context.Context, req *volume_se resp.FileCount = v.FileCount() resp.CompactionRevision = uint32(v.CompactionRevision) resp.Collection = v.Collection + resp.DiskType = string(v.DiskType()) return resp, nil } diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go index 2a7076e04..e13afdf55 100644 --- a/weed/server/volume_grpc_erasure_coding.go +++ b/weed/server/volume_grpc_erasure_coding.go @@ -105,7 +105,7 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv glog.V(0).Infof("VolumeEcShardsCopy: %v", req) - location := vs.store.FindFreeLocation() + location := vs.store.FindFreeLocation(storage.HardDriveType) if location == nil { return nil, fmt.Errorf("no space left") } diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go index 0443309fb..4cdb1628b 100644 --- a/weed/server/volume_server.go +++ b/weed/server/volume_server.go @@ -37,7 +37,7 @@ type VolumeServer struct { func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, port int, publicUrl string, - folders []string, maxCounts []int, minFreeSpacePercents []float32, + folders []string, maxCounts []int, minFreeSpacePercents []float32, diskTypes []storage.DiskType, idxFolder string, needleMapKind storage.NeedleMapKind, masterNodes []string, pulseSeconds int, @@ -76,7 +76,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, vs.checkWithMaster() - vs.store = storage.NewStore(vs.grpcDialOption, port, ip, publicUrl, folders, maxCounts, minFreeSpacePercents, idxFolder, vs.needleMapKind) + vs.store = storage.NewStore(vs.grpcDialOption, port, ip, publicUrl, folders, maxCounts, minFreeSpacePercents, idxFolder, vs.needleMapKind, diskTypes) vs.guard = security.NewGuard(whiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec) handleStaticResources(adminMux) diff --git a/weed/server/volume_server_handlers_admin.go b/weed/server/volume_server_handlers_admin.go index 4d84c9c4d..29bd70d71 100644 --- a/weed/server/volume_server_handlers_admin.go +++ b/weed/server/volume_server_handlers_admin.go @@ -16,7 +16,9 @@ func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) { var ds []*volume_server_pb.DiskStatus for _, loc := range vs.store.Locations { if dir, e := filepath.Abs(loc.Directory); e == nil { - ds = append(ds, stats.NewDiskStatus(dir)) + newDiskStatus := stats.NewDiskStatus(dir) + newDiskStatus.DiskType = loc.GetDiskType() + ds = append(ds, newDiskStatus) } } m["DiskStatuses"] = ds @@ -31,7 +33,9 @@ func (vs *VolumeServer) statsDiskHandler(w http.ResponseWriter, r *http.Request) var ds []*volume_server_pb.DiskStatus for _, loc := range vs.store.Locations { if dir, e := filepath.Abs(loc.Directory); e == nil { - ds = append(ds, stats.NewDiskStatus(dir)) + newDiskStatus := stats.NewDiskStatus(dir) + newDiskStatus.DiskType = loc.GetDiskType() + ds = append(ds, newDiskStatus) } } m["DiskStatuses"] = ds diff --git a/weed/server/volume_server_handlers_ui.go b/weed/server/volume_server_handlers_ui.go index e535327e2..95c7549d2 100644 --- a/weed/server/volume_server_handlers_ui.go +++ b/weed/server/volume_server_handlers_ui.go @@ -19,7 +19,9 @@ func (vs *VolumeServer) uiStatusHandler(w http.ResponseWriter, r *http.Request) var ds []*volume_server_pb.DiskStatus for _, loc := range vs.store.Locations { if dir, e := filepath.Abs(loc.Directory); e == nil { - ds = append(ds, stats.NewDiskStatus(dir)) + newDiskStatus := stats.NewDiskStatus(dir) + newDiskStatus.DiskType = loc.GetDiskType() + ds = append(ds, newDiskStatus) } } volumeInfos := vs.store.VolumeInfos() diff --git a/weed/server/volume_server_ui/templates.go b/weed/server/volume_server_ui/templates.go index 8705bc088..2a93c3441 100644 --- a/weed/server/volume_server_ui/templates.go +++ b/weed/server/volume_server_ui/templates.go @@ -69,6 +69,7 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC <thead> <tr> <th>Path</th> + <th>Type</th> <th>Total</th> <th>Free</th> <th>Usage</th> @@ -78,6 +79,7 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC {{ range .DiskStatuses }} <tr> <td>{{ .Dir }}</td> + <td>{{ .DiskType }}</td> <td>{{ bytesToHumanReadable .All }}</td> <td>{{ bytesToHumanReadable .Free }}</td> <td>{{ percentFrom .All .Used}}%</td> diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index ec9798997..4b57c7afe 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -33,6 +33,7 @@ type WebDavOption struct { BucketsPath string GrpcDialOption grpc.DialOption Collection string + DiskType string Uid uint32 Gid uint32 Cipher bool @@ -382,6 +383,7 @@ func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64 Count: 1, Replication: "", Collection: f.fs.option.Collection, + DiskType: f.fs.option.DiskType, Path: name, } |
