aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/filer_server_handlers_read.go6
-rw-r--r--weed/server/filer_server_handlers_write.go70
-rw-r--r--weed/server/filer_server_handlers_write_autochunk.go21
-rw-r--r--weed/server/filer_server_handlers_write_upload.go20
-rw-r--r--weed/server/raft_server.go38
-rw-r--r--weed/server/volume_server.go1
-rw-r--r--weed/server/volume_server_handlers_admin.go19
7 files changed, 151 insertions, 24 deletions
diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go
index 8037b1d94..2c51931c1 100644
--- a/weed/server/filer_server_handlers_read.go
+++ b/weed/server/filer_server_handlers_read.go
@@ -21,7 +21,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
)
-
// Validates the preconditions. Returns true if GET/HEAD operation should not proceed.
// Preconditions supported are:
// If-Modified-Since
@@ -119,6 +118,11 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
return
}
+ if r.URL.Query().Has("metadata") {
+ writeJsonQuiet(w, r, http.StatusOK, entry)
+ return
+ }
+
etag := filer.ETagEntry(entry)
if checkPreconditions(w, r, entry) {
return
diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go
index 1ebe66d43..3bbae8197 100644
--- a/weed/server/filer_server_handlers_write.go
+++ b/weed/server/filer_server_handlers_write.go
@@ -3,6 +3,7 @@ package weed_server
import (
"context"
"errors"
+ "fmt"
"net/http"
"os"
"strings"
@@ -78,11 +79,78 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request, conte
return
}
- fs.autoChunk(ctx, w, r, contentLength, so)
+ if query.Has("mv.from") {
+ fs.move(ctx, w, r, so)
+ } else {
+ fs.autoChunk(ctx, w, r, contentLength, so)
+ }
+
util.CloseRequest(r)
}
+func (fs *FilerServer) move(ctx context.Context, w http.ResponseWriter, r *http.Request, so *operation.StorageOption) {
+ src := r.URL.Query().Get("mv.from")
+ dst := r.URL.Path
+
+ glog.V(2).Infof("FilerServer.move %v to %v", src, dst)
+
+ var err error
+ if src, err = clearName(src); err != nil {
+ writeJsonError(w, r, http.StatusBadRequest, err)
+ return
+ }
+ if dst, err = clearName(dst); err != nil {
+ writeJsonError(w, r, http.StatusBadRequest, err)
+ return
+ }
+ src = strings.TrimRight(src, "/")
+ if src == "" {
+ err = fmt.Errorf("invalid source '/'")
+ writeJsonError(w, r, http.StatusBadRequest, err)
+ return
+ }
+
+ srcPath := util.FullPath(src)
+ dstPath := util.FullPath(dst)
+ srcEntry, err := fs.filer.FindEntry(ctx, srcPath)
+ if err != nil {
+ err = fmt.Errorf("failed to get src entry '%s', err: %s", src, err)
+ writeJsonError(w, r, http.StatusBadRequest, err)
+ return
+ }
+
+ oldDir, oldName := srcPath.DirAndName()
+ newDir, newName := dstPath.DirAndName()
+ newName = util.Nvl(newName, oldName)
+
+ dstEntry, err := fs.filer.FindEntry(ctx, util.FullPath(strings.TrimRight(dst, "/")))
+ if err != nil && err != filer_pb.ErrNotFound {
+ err = fmt.Errorf("failed to get dst entry '%s', err: %s", dst, err)
+ writeJsonError(w, r, http.StatusInternalServerError, err)
+ return
+ }
+ if err == nil && !dstEntry.IsDirectory() && srcEntry.IsDirectory() {
+ err = fmt.Errorf("move: cannot overwrite non-directory '%s' with directory '%s'", dst, src)
+ writeJsonError(w, r, http.StatusBadRequest, err)
+ return
+ }
+
+ _, err = fs.AtomicRenameEntry(ctx, &filer_pb.AtomicRenameEntryRequest{
+ OldDirectory: oldDir,
+ OldName: oldName,
+ NewDirectory: newDir,
+ NewName: newName,
+ })
+ if err != nil {
+ err = fmt.Errorf("failed to move entry from '%s' to '%s', err: %s", src, dst, err)
+ writeJsonError(w, r, http.StatusBadRequest, err)
+ return
+ }
+
+ w.WriteHeader(http.StatusNoContent)
+}
+
// curl -X DELETE http://localhost:8888/path/to
// curl -X DELETE http://localhost:8888/path/to?recursive=true
// curl -X DELETE http://localhost:8888/path/to?recursive=true&ignoreRecursiveError=true
diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go
index 61d30372b..be6e0c652 100644
--- a/weed/server/filer_server_handlers_write_autochunk.go
+++ b/weed/server/filer_server_handlers_write_autochunk.go
@@ -126,10 +126,6 @@ func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter
return
}
-func isAppend(r *http.Request) bool {
- return r.URL.Query().Get("op") == "append"
-}
-
func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileName string, contentType string, so *operation.StorageOption, md5bytes []byte, fileChunks []*filer_pb.FileChunk, chunkOffset int64, content []byte) (filerResult *FilerPostResult, replyerr error) {
// detect file mode
@@ -161,8 +157,11 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
var entry *filer.Entry
var mergedChunks []*filer_pb.FileChunk
+
+ isAppend := r.URL.Query().Get("op") == "append"
+ isOffsetWrite := fileChunks[0].Offset > 0
// when it is an append
- if isAppend(r) {
+ if isAppend || isOffsetWrite {
existingEntry, findErr := fs.filer.FindEntry(ctx, util.FullPath(path))
if findErr != nil && findErr != filer_pb.ErrNotFound {
glog.V(0).Infof("failing to find %s: %v", path, findErr)
@@ -173,11 +172,13 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
entry.Mtime = time.Now()
entry.Md5 = nil
// adjust chunk offsets
- for _, chunk := range fileChunks {
- chunk.Offset += int64(entry.FileSize)
+ if isAppend {
+ for _, chunk := range fileChunks {
+ chunk.Offset += int64(entry.FileSize)
+ }
+ entry.FileSize += uint64(chunkOffset)
}
mergedChunks = append(entry.Chunks, fileChunks...)
- entry.FileSize += uint64(chunkOffset)
// TODO
if len(entry.Content) > 0 {
@@ -215,6 +216,10 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
return
}
entry.Chunks = mergedChunks
+ if isOffsetWrite {
+ entry.Md5 = nil
+ entry.FileSize = entry.Size()
+ }
filerResult = &FilerPostResult{
Name: fileName,
diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go
index a7716ef02..294a97582 100644
--- a/weed/server/filer_server_handlers_write_upload.go
+++ b/weed/server/filer_server_handlers_write_upload.go
@@ -3,10 +3,12 @@ package weed_server
import (
"bytes"
"crypto/md5"
+ "fmt"
"hash"
"io"
"net/http"
"sort"
+ "strconv"
"strings"
"sync"
"sync/atomic"
@@ -28,6 +30,22 @@ var bufPool = sync.Pool{
}
func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, chunkOffset int64, uploadErr error, smallContent []byte) {
+ query := r.URL.Query()
+ isAppend := query.Get("op") == "append"
+
+ if query.Has("offset") {
+ offset := query.Get("offset")
+ offsetInt, err := strconv.ParseInt(offset, 10, 64)
+ if err != nil || offsetInt < 0 {
+ err = fmt.Errorf("invalid 'offset': '%s'", offset)
+ return nil, nil, 0, err, nil
+ }
+ if isAppend && offsetInt > 0 {
+ err = fmt.Errorf("cannot set offset when op=append")
+ return nil, nil, 0, err, nil
+ }
+ chunkOffset = offsetInt
+ }
md5Hash = md5.New()
var partReader = io.NopCloser(io.TeeReader(reader, md5Hash))
@@ -63,7 +81,7 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
bytesBufferLimitCond.Signal()
break
}
- if chunkOffset == 0 && !isAppend(r) {
+ if chunkOffset == 0 && !isAppend {
if dataSize < fs.option.SaveToFilerLimit || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) {
chunkOffset += dataSize
smallContent = make([]byte, dataSize)
diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go
index 568bfc7b5..91dd185c8 100644
--- a/weed/server/raft_server.go
+++ b/weed/server/raft_server.go
@@ -19,6 +19,17 @@ import (
"github.com/chrislusf/seaweedfs/weed/topology"
)
+type RaftServerOption struct {
+ GrpcDialOption grpc.DialOption
+ Peers []pb.ServerAddress
+ ServerAddr pb.ServerAddress
+ DataDir string
+ Topo *topology.Topology
+ RaftResumeState bool
+ HeartbeatInterval time.Duration
+ ElectionTimeout time.Duration
+}
+
type RaftServer struct {
peers []pb.ServerAddress // initial peers to join with
raftServer raft.Server
@@ -52,12 +63,12 @@ func (s StateMachine) Recovery(data []byte) error {
return nil
}
-func NewRaftServer(grpcDialOption grpc.DialOption, peers []pb.ServerAddress, serverAddr pb.ServerAddress, dataDir string, topo *topology.Topology, raftResumeState bool) (*RaftServer, error) {
+func NewRaftServer(option *RaftServerOption) (*RaftServer, error) {
s := &RaftServer{
- peers: peers,
- serverAddr: serverAddr,
- dataDir: dataDir,
- topo: topo,
+ peers: option.Peers,
+ serverAddr: option.ServerAddr,
+ dataDir: option.DataDir,
+ topo: option.Topo,
}
if glog.V(4) {
@@ -67,10 +78,10 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []pb.ServerAddress, ser
raft.RegisterCommand(&topology.MaxVolumeIdCommand{})
var err error
- transporter := raft.NewGrpcTransporter(grpcDialOption)
- glog.V(0).Infof("Starting RaftServer with %v", serverAddr)
+ transporter := raft.NewGrpcTransporter(option.GrpcDialOption)
+ glog.V(0).Infof("Starting RaftServer with %v", option.ServerAddr)
- if !raftResumeState {
+ if !option.RaftResumeState {
// always clear previous metadata
os.RemoveAll(path.Join(s.dataDir, "conf"))
os.RemoveAll(path.Join(s.dataDir, "log"))
@@ -80,14 +91,15 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []pb.ServerAddress, ser
return nil, err
}
- stateMachine := StateMachine{topo: topo}
- s.raftServer, err = raft.NewServer(string(s.serverAddr), s.dataDir, transporter, stateMachine, topo, "")
+ stateMachine := StateMachine{topo: option.Topo}
+ s.raftServer, err = raft.NewServer(string(s.serverAddr), s.dataDir, transporter, stateMachine, option.Topo, "")
if err != nil {
glog.V(0).Infoln(err)
return nil, err
}
- s.raftServer.SetHeartbeatInterval(time.Duration(300+rand.Intn(150)) * time.Millisecond)
- s.raftServer.SetElectionTimeout(10 * time.Second)
+ heartbeatInterval := time.Duration(float64(option.HeartbeatInterval) * (rand.Float64()*0.25 + 1))
+ s.raftServer.SetHeartbeatInterval(heartbeatInterval)
+ s.raftServer.SetElectionTimeout(option.ElectionTimeout)
if err := s.raftServer.LoadSnapshot(); err != nil {
return nil, err
}
@@ -123,7 +135,7 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []pb.ServerAddress, ser
s.GrpcServer = raft.NewGrpcServer(s.raftServer)
- if s.raftServer.IsLogEmpty() && isTheFirstOne(serverAddr, s.peers) {
+ if s.raftServer.IsLogEmpty() && isTheFirstOne(option.ServerAddr, s.peers) {
// Initialize the server by joining itself.
// s.DoJoinCommand()
}
diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go
index 2551cc6e6..4199ae36b 100644
--- a/weed/server/volume_server.go
+++ b/weed/server/volume_server.go
@@ -98,6 +98,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
handleStaticResources(adminMux)
adminMux.HandleFunc("/status", vs.statusHandler)
+ adminMux.HandleFunc("/healthz", vs.healthzHandler)
if signingKey == "" || enableUiAccess {
// only expose the volume server details for safe environments
adminMux.HandleFunc("/ui/index.html", vs.uiStatusHandler)
diff --git a/weed/server/volume_server_handlers_admin.go b/weed/server/volume_server_handlers_admin.go
index 7e6c06871..37cf109e2 100644
--- a/weed/server/volume_server_handlers_admin.go
+++ b/weed/server/volume_server_handlers_admin.go
@@ -1,6 +1,7 @@
package weed_server
import (
+ "github.com/chrislusf/seaweedfs/weed/topology"
"net/http"
"path/filepath"
@@ -9,6 +10,24 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
)
+func (vs *VolumeServer) healthzHandler(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION)
+ volumeInfos := vs.store.VolumeInfos()
+ for _, vinfo := range volumeInfos {
+ if len(vinfo.Collection) == 0 {
+ continue
+ }
+ if vinfo.ReplicaPlacement.GetCopyCount() > 1 {
+ _, err := topology.GetWritableRemoteReplications(vs.store, vs.grpcDialOption, vinfo.Id, vs.GetMaster)
+ if err != nil {
+ w.WriteHeader(http.StatusServiceUnavailable)
+ return
+ }
+ }
+ }
+ w.WriteHeader(http.StatusOK)
+}
+
func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION)
m := make(map[string]interface{})