aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/common.go1
-rw-r--r--weed/server/filer_grpc_server.go6
-rw-r--r--weed/server/filer_server_handlers_write.go8
-rw-r--r--weed/server/filer_server_handlers_write_autochunk.go1
-rw-r--r--weed/server/filer_server_handlers_write_cipher.go1
-rw-r--r--weed/server/master_grpc_server.go8
-rw-r--r--weed/server/master_grpc_server_volume.go10
-rw-r--r--weed/server/master_server_handlers_admin.go8
-rw-r--r--weed/server/volume_server.go6
-rw-r--r--weed/server/webdav_server.go2
10 files changed, 38 insertions, 13 deletions
diff --git a/weed/server/common.go b/weed/server/common.go
index 58079032e..852fffb5e 100644
--- a/weed/server/common.go
+++ b/weed/server/common.go
@@ -131,6 +131,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st
Replication: r.FormValue("replication"),
Collection: r.FormValue("collection"),
Ttl: r.FormValue("ttl"),
+ VolumeType: r.FormValue("volumeType"),
}
assignResult, ae := operation.Assign(masterUrl, grpcDialOption, ar)
if ae != nil {
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go
index 5f1b2d819..bc6d0ceb9 100644
--- a/weed/server/filer_grpc_server.go
+++ b/weed/server/filer_grpc_server.go
@@ -265,6 +265,7 @@ func (fs *FilerServer) cleanupChunks(fullpath string, existingEntry *filer.Entry
newEntry.Attributes.Collection,
newEntry.Attributes.Replication,
newEntry.Attributes.TtlSec,
+ newEntry.Attributes.VolumeType,
"",
"",
)
@@ -308,7 +309,7 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo
}
entry.Chunks = append(entry.Chunks, req.Chunks...)
- so := fs.detectStorageOption(string(fullpath), entry.Collection, entry.Replication, entry.TtlSec, "", "")
+ so := fs.detectStorageOption(string(fullpath), entry.Collection, entry.Replication, entry.TtlSec, entry.VolumeType, "", "")
entry.Chunks, err = filer.MaybeManifestize(fs.saveAsChunk(so), entry.Chunks)
if err != nil {
// not good, but should be ok
@@ -334,7 +335,7 @@ func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntr
func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVolumeRequest) (resp *filer_pb.AssignVolumeResponse, err error) {
- so := fs.detectStorageOption(req.Path, req.Collection, req.Replication, req.TtlSec, req.DataCenter, req.Rack)
+ so := fs.detectStorageOption(req.Path, req.Collection, req.Replication, req.TtlSec, req.VolumeType, req.DataCenter, req.Rack)
assignRequest, altRequest := so.ToAssignRequests(int(req.Count))
@@ -404,6 +405,7 @@ func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsR
Replication: req.Replication,
Collection: req.Collection,
Ttl: req.Ttl,
+ VolumeType: req.VolumeType,
})
if grpcErr != nil {
return grpcErr
diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go
index 9131b042b..e6fc31d8e 100644
--- a/weed/server/filer_server_handlers_write.go
+++ b/weed/server/filer_server_handlers_write.go
@@ -61,6 +61,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
query.Get("collection"),
query.Get("replication"),
query.Get("ttl"),
+ query.Get("volumeType"),
query.Get("dataCenter"),
query.Get("rack"),
)
@@ -104,7 +105,7 @@ func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}
-func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication string, ttlSeconds int32, dataCenter, rack string) *operation.StorageOption {
+func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication string, ttlSeconds int32, volumeType string, dataCenter, rack string) *operation.StorageOption {
collection := util.Nvl(qCollection, fs.option.Collection)
replication := util.Nvl(qReplication, fs.option.DefaultReplication)
@@ -134,17 +135,18 @@ func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication
DataCenter: util.Nvl(dataCenter, fs.option.DataCenter),
Rack: util.Nvl(rack, fs.option.Rack),
TtlSeconds: ttlSeconds,
+ VolumeType: volumeType,
Fsync: fsync || rule.Fsync,
VolumeGrowthCount: rule.VolumeGrowthCount,
}
}
-func (fs *FilerServer) detectStorageOption0(requestURI, qCollection, qReplication string, qTtl string, dataCenter, rack string) *operation.StorageOption {
+func (fs *FilerServer) detectStorageOption0(requestURI, qCollection, qReplication string, qTtl string, volumeType string, dataCenter, rack string) *operation.StorageOption {
ttl, err := needle.ReadTTL(qTtl)
if err != nil {
glog.Errorf("fail to parse ttl %s: %v", qTtl, err)
}
- return fs.detectStorageOption(requestURI, qCollection, qReplication, int32(ttl.Minutes())*60, dataCenter, rack)
+ return fs.detectStorageOption(requestURI, qCollection, qReplication, int32(ttl.Minutes())*60, volumeType, dataCenter, rack)
}
diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go
index 0f6d0dc47..9f88bd202 100644
--- a/weed/server/filer_server_handlers_write_autochunk.go
+++ b/weed/server/filer_server_handlers_write_autochunk.go
@@ -159,6 +159,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
Replication: so.Replication,
Collection: so.Collection,
TtlSec: so.TtlSeconds,
+ VolumeType: so.VolumeType,
Mime: contentType,
Md5: md5bytes,
FileSize: uint64(chunkOffset),
diff --git a/weed/server/filer_server_handlers_write_cipher.go b/weed/server/filer_server_handlers_write_cipher.go
index 3cc0d0c41..c5e8a74d8 100644
--- a/weed/server/filer_server_handlers_write_cipher.go
+++ b/weed/server/filer_server_handlers_write_cipher.go
@@ -68,6 +68,7 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht
Replication: so.Replication,
Collection: so.Collection,
TtlSec: so.TtlSeconds,
+ VolumeType: so.VolumeType,
Mime: pu.MimeType,
Md5: util.Base64Md5ToBytes(pu.ContentMd5),
},
diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go
index 9df88e956..62b8cdf07 100644
--- a/weed/server/master_grpc_server.go
+++ b/weed/server/master_grpc_server.go
@@ -67,9 +67,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
dcName, rackName := ms.Topo.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack)
dc := ms.Topo.GetOrCreateDataCenter(dcName)
rack := dc.GetOrCreateRack(rackName)
- dn = rack.GetOrCreateDataNode(heartbeat.Ip,
- int(heartbeat.Port), heartbeat.PublicUrl,
- int64(heartbeat.MaxVolumeCount))
+ dn = rack.GetOrCreateDataNode(heartbeat.Ip, int(heartbeat.Port), heartbeat.PublicUrl, int64(heartbeat.MaxVolumeCount), int64(heartbeat.MaxSsdVolumeCount))
glog.V(0).Infof("added volume server %v:%d", heartbeat.GetIp(), heartbeat.GetPort())
if err := stream.Send(&master_pb.HeartbeatResponse{
VolumeSizeLimit: uint64(ms.option.VolumeSizeLimitMB) * 1024 * 1024,
@@ -83,6 +81,10 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
delta := int64(heartbeat.MaxVolumeCount) - dn.GetMaxVolumeCount()
dn.UpAdjustMaxVolumeCountDelta(delta)
}
+ if heartbeat.MaxSsdVolumeCount != 0 && dn.GetMaxSsdVolumeCount() != int64(heartbeat.MaxSsdVolumeCount) {
+ delta := int64(heartbeat.MaxSsdVolumeCount) - dn.GetMaxSsdVolumeCount()
+ dn.UpAdjustMaxSsdVolumeCountDelta(delta)
+ }
glog.V(4).Infof("master received heartbeat %s", heartbeat.String())
message := &master_pb.VolumeLocation{
diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go
index 20a22d126..e84d9b386 100644
--- a/weed/server/master_grpc_server_volume.go
+++ b/weed/server/master_grpc_server_volume.go
@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"github.com/chrislusf/raft"
+ "github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/security"
@@ -60,11 +61,16 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
if err != nil {
return nil, err
}
+ volumeType, err := storage.ToVolumeType(req.VolumeType)
+ if err != nil {
+ return nil, err
+ }
option := &topology.VolumeGrowOption{
Collection: req.Collection,
ReplicaPlacement: replicaPlacement,
Ttl: ttl,
+ VolumeType: volumeType,
Prealloacte: ms.preallocateSize,
DataCenter: req.DataCenter,
Rack: req.Rack,
@@ -117,10 +123,10 @@ func (ms *MasterServer) Statistics(ctx context.Context, req *master_pb.Statistic
return nil, err
}
- volumeLayout := ms.Topo.GetVolumeLayout(req.Collection, replicaPlacement, ttl)
+ volumeLayout := ms.Topo.GetVolumeLayout(req.Collection, replicaPlacement, ttl, storage.VolumeType(req.VolumeType))
stats := volumeLayout.Stats()
- totalSize := ms.Topo.GetMaxVolumeCount() * int64(ms.option.VolumeSizeLimitMB) * 1024 * 1024
+ totalSize := (ms.Topo.GetMaxVolumeCount() + ms.Topo.GetMaxSsdVolumeCount()) * int64(ms.option.VolumeSizeLimitMB) * 1024 * 1024
resp := &master_pb.StatisticsResponse{
TotalSize: uint64(totalSize),
diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go
index 34235384f..001ff9cdc 100644
--- a/weed/server/master_server_handlers_admin.go
+++ b/weed/server/master_server_handlers_admin.go
@@ -3,6 +3,7 @@ package weed_server
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/storage"
"math/rand"
"net/http"
"strconv"
@@ -136,7 +137,7 @@ func (ms *MasterServer) submitFromMasterServerHandler(w http.ResponseWriter, r *
}
func (ms *MasterServer) HasWritableVolume(option *topology.VolumeGrowOption) bool {
- vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl)
+ vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.VolumeType)
return vl.GetActiveVolumeCount(option) > 0
}
@@ -157,6 +158,10 @@ func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGr
if err != nil {
return nil, err
}
+ volumeType, err := storage.ToVolumeType(r.FormValue("volumeType"))
+ if err != nil {
+ return nil, err
+ }
preallocate := ms.preallocateSize
if r.FormValue("preallocate") != "" {
@@ -169,6 +174,7 @@ func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGr
Collection: r.FormValue("collection"),
ReplicaPlacement: replicaPlacement,
Ttl: ttl,
+ VolumeType: volumeType,
Prealloacte: preallocate,
DataCenter: r.FormValue("dataCenter"),
Rack: r.FormValue("rack"),
diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go
index 468f75890..16f4198e8 100644
--- a/weed/server/volume_server.go
+++ b/weed/server/volume_server.go
@@ -20,6 +20,7 @@ type VolumeServer struct {
pulseSeconds int
dataCenter string
rack string
+ VolumeType storage.VolumeType
store *storage.Store
guard *security.Guard
grpcDialOption grpc.DialOption
@@ -38,7 +39,7 @@ type VolumeServer struct {
func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
port int, publicUrl string,
folders []string, maxCounts []int, minFreeSpacePercents []float32,
- idxFolder string,
+ idxFolder string, volumeType storage.VolumeType,
needleMapKind storage.NeedleMapType,
masterNodes []string, pulseSeconds int,
dataCenter string, rack string,
@@ -63,6 +64,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
pulseSeconds: pulseSeconds,
dataCenter: dataCenter,
rack: rack,
+ VolumeType: volumeType,
needleMapKind: needleMapKind,
FixJpgOrientation: fixJpgOrientation,
ReadRedirect: readRedirect,
@@ -76,7 +78,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
vs.checkWithMaster()
- vs.store = storage.NewStore(vs.grpcDialOption, port, ip, publicUrl, folders, maxCounts, minFreeSpacePercents, idxFolder, vs.needleMapKind)
+ vs.store = storage.NewStore(vs.grpcDialOption, port, ip, publicUrl, folders, maxCounts, minFreeSpacePercents, idxFolder, vs.needleMapKind, vs.VolumeType)
vs.guard = security.NewGuard(whiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec)
handleStaticResources(adminMux)
diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go
index 2b238e534..329560d3f 100644
--- a/weed/server/webdav_server.go
+++ b/weed/server/webdav_server.go
@@ -33,6 +33,7 @@ type WebDavOption struct {
BucketsPath string
GrpcDialOption grpc.DialOption
Collection string
+ VolumeType string
Uid uint32
Gid uint32
Cipher bool
@@ -378,6 +379,7 @@ func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64
Count: 1,
Replication: "",
Collection: f.fs.option.Collection,
+ VolumeType: f.fs.option.VolumeType,
Path: name,
}