aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/common.go1
-rw-r--r--weed/server/filer_grpc_server.go6
-rw-r--r--weed/server/filer_server_handlers_write.go8
-rw-r--r--weed/server/filer_server_handlers_write_autochunk.go1
-rw-r--r--weed/server/filer_server_handlers_write_cipher.go1
-rw-r--r--weed/server/master_grpc_server.go8
-rw-r--r--weed/server/master_grpc_server_volume.go12
-rw-r--r--weed/server/master_server_handlers.go2
-rw-r--r--weed/server/master_server_handlers_admin.go12
-rw-r--r--weed/server/volume_grpc_admin.go2
-rw-r--r--weed/server/volume_grpc_copy.go11
-rw-r--r--weed/server/volume_grpc_erasure_coding.go2
-rw-r--r--weed/server/volume_server.go4
-rw-r--r--weed/server/volume_server_handlers_admin.go8
-rw-r--r--weed/server/volume_server_handlers_ui.go4
-rw-r--r--weed/server/volume_server_ui/templates.go2
-rw-r--r--weed/server/webdav_server.go2
17 files changed, 60 insertions, 26 deletions
diff --git a/weed/server/common.go b/weed/server/common.go
index cf9547950..7d32d6a9a 100644
--- a/weed/server/common.go
+++ b/weed/server/common.go
@@ -131,6 +131,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st
Replication: r.FormValue("replication"),
Collection: r.FormValue("collection"),
Ttl: r.FormValue("ttl"),
+ DiskType: r.FormValue("disk"),
}
assignResult, ae := operation.Assign(masterUrl, grpcDialOption, ar)
if ae != nil {
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go
index b0563d8bd..04145f2f9 100644
--- a/weed/server/filer_grpc_server.go
+++ b/weed/server/filer_grpc_server.go
@@ -263,6 +263,7 @@ func (fs *FilerServer) cleanupChunks(fullpath string, existingEntry *filer.Entry
newEntry.Attributes.Collection,
newEntry.Attributes.Replication,
newEntry.Attributes.TtlSec,
+ newEntry.Attributes.DiskType,
"",
"",
)
@@ -306,7 +307,7 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo
}
entry.Chunks = append(entry.Chunks, req.Chunks...)
- so := fs.detectStorageOption(string(fullpath), entry.Collection, entry.Replication, entry.TtlSec, "", "")
+ so := fs.detectStorageOption(string(fullpath), entry.Collection, entry.Replication, entry.TtlSec, entry.DiskType, "", "")
entry.Chunks, err = filer.MaybeManifestize(fs.saveAsChunk(so), entry.Chunks)
if err != nil {
// not good, but should be ok
@@ -332,7 +333,7 @@ func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntr
func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVolumeRequest) (resp *filer_pb.AssignVolumeResponse, err error) {
- so := fs.detectStorageOption(req.Path, req.Collection, req.Replication, req.TtlSec, req.DataCenter, req.Rack)
+ so := fs.detectStorageOption(req.Path, req.Collection, req.Replication, req.TtlSec, req.DiskType, req.DataCenter, req.Rack)
assignRequest, altRequest := so.ToAssignRequests(int(req.Count))
@@ -402,6 +403,7 @@ func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsR
Replication: req.Replication,
Collection: req.Collection,
Ttl: req.Ttl,
+ DiskType: req.DiskType,
})
if grpcErr != nil {
return grpcErr
diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go
index 6f33e8494..28dff07c6 100644
--- a/weed/server/filer_server_handlers_write.go
+++ b/weed/server/filer_server_handlers_write.go
@@ -61,6 +61,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
query.Get("collection"),
query.Get("replication"),
query.Get("ttl"),
+ query.Get("disk"),
query.Get("dataCenter"),
query.Get("rack"),
)
@@ -104,7 +105,7 @@ func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}
-func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication string, ttlSeconds int32, dataCenter, rack string) *operation.StorageOption {
+func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication string, ttlSeconds int32, diskType string, dataCenter, rack string) *operation.StorageOption {
collection := util.Nvl(qCollection, fs.option.Collection)
replication := util.Nvl(qReplication, fs.option.DefaultReplication)
@@ -134,17 +135,18 @@ func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication
DataCenter: util.Nvl(dataCenter, fs.option.DataCenter),
Rack: util.Nvl(rack, fs.option.Rack),
TtlSeconds: ttlSeconds,
+ DiskType: util.Nvl(diskType, rule.DiskType),
Fsync: fsync || rule.Fsync,
VolumeGrowthCount: rule.VolumeGrowthCount,
}
}
-func (fs *FilerServer) detectStorageOption0(requestURI, qCollection, qReplication string, qTtl string, dataCenter, rack string) *operation.StorageOption {
+func (fs *FilerServer) detectStorageOption0(requestURI, qCollection, qReplication string, qTtl string, diskType string, dataCenter, rack string) *operation.StorageOption {
ttl, err := needle.ReadTTL(qTtl)
if err != nil {
glog.Errorf("fail to parse ttl %s: %v", qTtl, err)
}
- return fs.detectStorageOption(requestURI, qCollection, qReplication, int32(ttl.Minutes())*60, dataCenter, rack)
+ return fs.detectStorageOption(requestURI, qCollection, qReplication, int32(ttl.Minutes())*60, diskType, dataCenter, rack)
}
diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go
index ec1ecbe0d..145989445 100644
--- a/weed/server/filer_server_handlers_write_autochunk.go
+++ b/weed/server/filer_server_handlers_write_autochunk.go
@@ -186,6 +186,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
Replication: so.Replication,
Collection: so.Collection,
TtlSec: so.TtlSeconds,
+ DiskType: so.DiskType,
Mime: contentType,
Md5: md5bytes,
FileSize: uint64(chunkOffset),
diff --git a/weed/server/filer_server_handlers_write_cipher.go b/weed/server/filer_server_handlers_write_cipher.go
index 3cc0d0c41..8334d1618 100644
--- a/weed/server/filer_server_handlers_write_cipher.go
+++ b/weed/server/filer_server_handlers_write_cipher.go
@@ -68,6 +68,7 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht
Replication: so.Replication,
Collection: so.Collection,
TtlSec: so.TtlSeconds,
+ DiskType: so.DiskType,
Mime: pu.MimeType,
Md5: util.Base64Md5ToBytes(pu.ContentMd5),
},
diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go
index 9df88e956..62b8cdf07 100644
--- a/weed/server/master_grpc_server.go
+++ b/weed/server/master_grpc_server.go
@@ -67,9 +67,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
dcName, rackName := ms.Topo.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack)
dc := ms.Topo.GetOrCreateDataCenter(dcName)
rack := dc.GetOrCreateRack(rackName)
- dn = rack.GetOrCreateDataNode(heartbeat.Ip,
- int(heartbeat.Port), heartbeat.PublicUrl,
- int64(heartbeat.MaxVolumeCount))
+ dn = rack.GetOrCreateDataNode(heartbeat.Ip, int(heartbeat.Port), heartbeat.PublicUrl, int64(heartbeat.MaxVolumeCount), int64(heartbeat.MaxSsdVolumeCount))
glog.V(0).Infof("added volume server %v:%d", heartbeat.GetIp(), heartbeat.GetPort())
if err := stream.Send(&master_pb.HeartbeatResponse{
VolumeSizeLimit: uint64(ms.option.VolumeSizeLimitMB) * 1024 * 1024,
@@ -83,6 +81,10 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
delta := int64(heartbeat.MaxVolumeCount) - dn.GetMaxVolumeCount()
dn.UpAdjustMaxVolumeCountDelta(delta)
}
+ if heartbeat.MaxSsdVolumeCount != 0 && dn.GetMaxSsdVolumeCount() != int64(heartbeat.MaxSsdVolumeCount) {
+ delta := int64(heartbeat.MaxSsdVolumeCount) - dn.GetMaxSsdVolumeCount()
+ dn.UpAdjustMaxSsdVolumeCountDelta(delta)
+ }
glog.V(4).Infof("master received heartbeat %s", heartbeat.String())
message := &master_pb.VolumeLocation{
diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go
index 20a22d126..f1d65896a 100644
--- a/weed/server/master_grpc_server_volume.go
+++ b/weed/server/master_grpc_server_volume.go
@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"github.com/chrislusf/raft"
+ "github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/security"
@@ -60,11 +61,16 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
if err != nil {
return nil, err
}
+ diskType, err := storage.ToDiskType(req.DiskType)
+ if err != nil {
+ return nil, err
+ }
option := &topology.VolumeGrowOption{
Collection: req.Collection,
ReplicaPlacement: replicaPlacement,
Ttl: ttl,
+ DiskType: diskType,
Prealloacte: ms.preallocateSize,
DataCenter: req.DataCenter,
Rack: req.Rack,
@@ -73,7 +79,7 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
}
if !ms.Topo.HasWritableVolume(option) {
- if ms.Topo.FreeSpace() <= 0 {
+ if ms.Topo.AvailableSpaceFor(option) <= 0 {
return nil, fmt.Errorf("No free volumes left!")
}
ms.vgLock.Lock()
@@ -117,10 +123,10 @@ func (ms *MasterServer) Statistics(ctx context.Context, req *master_pb.Statistic
return nil, err
}
- volumeLayout := ms.Topo.GetVolumeLayout(req.Collection, replicaPlacement, ttl)
+ volumeLayout := ms.Topo.GetVolumeLayout(req.Collection, replicaPlacement, ttl, storage.DiskType(req.DiskType))
stats := volumeLayout.Stats()
- totalSize := ms.Topo.GetMaxVolumeCount() * int64(ms.option.VolumeSizeLimitMB) * 1024 * 1024
+ totalSize := (ms.Topo.GetMaxVolumeCount() + ms.Topo.GetMaxSsdVolumeCount()) * int64(ms.option.VolumeSizeLimitMB) * 1024 * 1024
resp := &master_pb.StatisticsResponse{
TotalSize: uint64(totalSize),
diff --git a/weed/server/master_server_handlers.go b/weed/server/master_server_handlers.go
index ebcb7efd2..dc3df2348 100644
--- a/weed/server/master_server_handlers.go
+++ b/weed/server/master_server_handlers.go
@@ -112,7 +112,7 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request)
}
if !ms.Topo.HasWritableVolume(option) {
- if ms.Topo.FreeSpace() <= 0 {
+ if ms.Topo.AvailableSpaceFor(option) <= 0 {
writeJsonQuiet(w, r, http.StatusNotFound, operation.AssignResult{Error: "No free volumes left!"})
return
}
diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go
index 34235384f..7b1ec1f87 100644
--- a/weed/server/master_server_handlers_admin.go
+++ b/weed/server/master_server_handlers_admin.go
@@ -3,6 +3,7 @@ package weed_server
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/storage"
"math/rand"
"net/http"
"strconv"
@@ -75,8 +76,8 @@ func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request
}
if count, err = strconv.Atoi(r.FormValue("count")); err == nil {
- if ms.Topo.FreeSpace() < int64(count*option.ReplicaPlacement.GetCopyCount()) {
- err = fmt.Errorf("only %d volumes left, not enough for %d", ms.Topo.FreeSpace(), count*option.ReplicaPlacement.GetCopyCount())
+ if ms.Topo.AvailableSpaceFor(option) < int64(count*option.ReplicaPlacement.GetCopyCount()) {
+ err = fmt.Errorf("only %d volumes left, not enough for %d", ms.Topo.AvailableSpaceFor(option), count*option.ReplicaPlacement.GetCopyCount())
} else {
count, err = ms.vg.GrowByCountAndType(ms.grpcDialOption, count, option, ms.Topo)
}
@@ -136,7 +137,7 @@ func (ms *MasterServer) submitFromMasterServerHandler(w http.ResponseWriter, r *
}
func (ms *MasterServer) HasWritableVolume(option *topology.VolumeGrowOption) bool {
- vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl)
+ vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType)
return vl.GetActiveVolumeCount(option) > 0
}
@@ -157,6 +158,10 @@ func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGr
if err != nil {
return nil, err
}
+ diskType, err := storage.ToDiskType(r.FormValue("disk"))
+ if err != nil {
+ return nil, err
+ }
preallocate := ms.preallocateSize
if r.FormValue("preallocate") != "" {
@@ -169,6 +174,7 @@ func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGr
Collection: r.FormValue("collection"),
ReplicaPlacement: replicaPlacement,
Ttl: ttl,
+ DiskType: diskType,
Prealloacte: preallocate,
DataCenter: r.FormValue("dataCenter"),
Rack: r.FormValue("rack"),
diff --git a/weed/server/volume_grpc_admin.go b/weed/server/volume_grpc_admin.go
index 9296c63e9..55bb6573e 100644
--- a/weed/server/volume_grpc_admin.go
+++ b/weed/server/volume_grpc_admin.go
@@ -3,6 +3,7 @@ package weed_server
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/storage"
"path/filepath"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -41,6 +42,7 @@ func (vs *VolumeServer) AllocateVolume(ctx context.Context, req *volume_server_p
req.Ttl,
req.Preallocate,
req.MemoryMapMaxSizeMb,
+ storage.DiskType(req.DiskType),
)
if err != nil {
diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go
index cfa3710a8..26c6b28e7 100644
--- a/weed/server/volume_grpc_copy.go
+++ b/weed/server/volume_grpc_copy.go
@@ -36,11 +36,6 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
glog.V(0).Infof("deleted existing volume %d before copying.", req.VolumeId)
}
- location := vs.store.FindFreeLocation()
- if location == nil {
- return nil, fmt.Errorf("no space left")
- }
-
// the master will not start compaction for read-only volumes, so it is safe to just copy files directly
// copy .dat and .idx files
// read .idx .dat file size and timestamp
@@ -59,6 +54,11 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
return fmt.Errorf("read volume file status failed, %v", err)
}
+ location := vs.store.FindFreeLocation(storage.DiskType(volFileInfoResp.DiskType))
+ if location == nil {
+ return fmt.Errorf("no space left")
+ }
+
dataBaseFileName = storage.VolumeFileName(location.Directory, volFileInfoResp.Collection, int(req.VolumeId))
indexBaseFileName = storage.VolumeFileName(location.IdxDirectory, volFileInfoResp.Collection, int(req.VolumeId))
@@ -206,6 +206,7 @@ func (vs *VolumeServer) ReadVolumeFileStatus(ctx context.Context, req *volume_se
resp.FileCount = v.FileCount()
resp.CompactionRevision = uint32(v.CompactionRevision)
resp.Collection = v.Collection
+ resp.DiskType = string(v.DiskType())
return resp, nil
}
diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go
index 2a7076e04..e13afdf55 100644
--- a/weed/server/volume_grpc_erasure_coding.go
+++ b/weed/server/volume_grpc_erasure_coding.go
@@ -105,7 +105,7 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv
glog.V(0).Infof("VolumeEcShardsCopy: %v", req)
- location := vs.store.FindFreeLocation()
+ location := vs.store.FindFreeLocation(storage.HardDriveType)
if location == nil {
return nil, fmt.Errorf("no space left")
}
diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go
index 0443309fb..4cdb1628b 100644
--- a/weed/server/volume_server.go
+++ b/weed/server/volume_server.go
@@ -37,7 +37,7 @@ type VolumeServer struct {
func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
port int, publicUrl string,
- folders []string, maxCounts []int, minFreeSpacePercents []float32,
+ folders []string, maxCounts []int, minFreeSpacePercents []float32, diskTypes []storage.DiskType,
idxFolder string,
needleMapKind storage.NeedleMapKind,
masterNodes []string, pulseSeconds int,
@@ -76,7 +76,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
vs.checkWithMaster()
- vs.store = storage.NewStore(vs.grpcDialOption, port, ip, publicUrl, folders, maxCounts, minFreeSpacePercents, idxFolder, vs.needleMapKind)
+ vs.store = storage.NewStore(vs.grpcDialOption, port, ip, publicUrl, folders, maxCounts, minFreeSpacePercents, idxFolder, vs.needleMapKind, diskTypes)
vs.guard = security.NewGuard(whiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec)
handleStaticResources(adminMux)
diff --git a/weed/server/volume_server_handlers_admin.go b/weed/server/volume_server_handlers_admin.go
index 4d84c9c4d..29bd70d71 100644
--- a/weed/server/volume_server_handlers_admin.go
+++ b/weed/server/volume_server_handlers_admin.go
@@ -16,7 +16,9 @@ func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) {
var ds []*volume_server_pb.DiskStatus
for _, loc := range vs.store.Locations {
if dir, e := filepath.Abs(loc.Directory); e == nil {
- ds = append(ds, stats.NewDiskStatus(dir))
+ newDiskStatus := stats.NewDiskStatus(dir)
+ newDiskStatus.DiskType = loc.GetDiskType()
+ ds = append(ds, newDiskStatus)
}
}
m["DiskStatuses"] = ds
@@ -31,7 +33,9 @@ func (vs *VolumeServer) statsDiskHandler(w http.ResponseWriter, r *http.Request)
var ds []*volume_server_pb.DiskStatus
for _, loc := range vs.store.Locations {
if dir, e := filepath.Abs(loc.Directory); e == nil {
- ds = append(ds, stats.NewDiskStatus(dir))
+ newDiskStatus := stats.NewDiskStatus(dir)
+ newDiskStatus.DiskType = loc.GetDiskType()
+ ds = append(ds, newDiskStatus)
}
}
m["DiskStatuses"] = ds
diff --git a/weed/server/volume_server_handlers_ui.go b/weed/server/volume_server_handlers_ui.go
index e535327e2..95c7549d2 100644
--- a/weed/server/volume_server_handlers_ui.go
+++ b/weed/server/volume_server_handlers_ui.go
@@ -19,7 +19,9 @@ func (vs *VolumeServer) uiStatusHandler(w http.ResponseWriter, r *http.Request)
var ds []*volume_server_pb.DiskStatus
for _, loc := range vs.store.Locations {
if dir, e := filepath.Abs(loc.Directory); e == nil {
- ds = append(ds, stats.NewDiskStatus(dir))
+ newDiskStatus := stats.NewDiskStatus(dir)
+ newDiskStatus.DiskType = loc.GetDiskType()
+ ds = append(ds, newDiskStatus)
}
}
volumeInfos := vs.store.VolumeInfos()
diff --git a/weed/server/volume_server_ui/templates.go b/weed/server/volume_server_ui/templates.go
index 8705bc088..2a93c3441 100644
--- a/weed/server/volume_server_ui/templates.go
+++ b/weed/server/volume_server_ui/templates.go
@@ -69,6 +69,7 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC
<thead>
<tr>
<th>Path</th>
+ <th>Type</th>
<th>Total</th>
<th>Free</th>
<th>Usage</th>
@@ -78,6 +79,7 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC
{{ range .DiskStatuses }}
<tr>
<td>{{ .Dir }}</td>
+ <td>{{ .DiskType }}</td>
<td>{{ bytesToHumanReadable .All }}</td>
<td>{{ bytesToHumanReadable .Free }}</td>
<td>{{ percentFrom .All .Used}}%</td>
diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go
index ec9798997..4b57c7afe 100644
--- a/weed/server/webdav_server.go
+++ b/weed/server/webdav_server.go
@@ -33,6 +33,7 @@ type WebDavOption struct {
BucketsPath string
GrpcDialOption grpc.DialOption
Collection string
+ DiskType string
Uid uint32
Gid uint32
Cipher bool
@@ -382,6 +383,7 @@ func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64
Count: 1,
Replication: "",
Collection: f.fs.option.Collection,
+ DiskType: f.fs.option.DiskType,
Path: name,
}