diff options
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/filer_server_handlers_read.go | 6 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write.go | 70 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write_autochunk.go | 21 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write_upload.go | 20 | ||||
| -rw-r--r-- | weed/server/raft_server.go | 38 | ||||
| -rw-r--r-- | weed/server/volume_server.go | 1 | ||||
| -rw-r--r-- | weed/server/volume_server_handlers_admin.go | 19 |
7 files changed, 151 insertions, 24 deletions
diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go index 8037b1d94..2c51931c1 100644 --- a/weed/server/filer_server_handlers_read.go +++ b/weed/server/filer_server_handlers_read.go @@ -21,7 +21,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) - // Validates the preconditions. Returns true if GET/HEAD operation should not proceed. // Preconditions supported are: // If-Modified-Since @@ -119,6 +118,11 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) return } + if r.URL.Query().Has("metadata") { + writeJsonQuiet(w, r, http.StatusOK, entry) + return + } + etag := filer.ETagEntry(entry) if checkPreconditions(w, r, entry) { return diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index 1ebe66d43..3bbae8197 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -3,6 +3,7 @@ package weed_server import ( "context" "errors" + "fmt" "net/http" "os" "strings" @@ -78,11 +79,78 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request, conte return } - fs.autoChunk(ctx, w, r, contentLength, so) + if query.Has("mv.from") { + fs.move(ctx, w, r, so) + } else { + fs.autoChunk(ctx, w, r, contentLength, so) + } + util.CloseRequest(r) } +func (fs *FilerServer) move(ctx context.Context, w http.ResponseWriter, r *http.Request, so *operation.StorageOption) { + src := r.URL.Query().Get("mv.from") + dst := r.URL.Path + + glog.V(2).Infof("FilerServer.move %v to %v", src, dst) + + var err error + if src, err = clearName(src); err != nil { + writeJsonError(w, r, http.StatusBadRequest, err) + return + } + if dst, err = clearName(dst); err != nil { + writeJsonError(w, r, http.StatusBadRequest, err) + return + } + src = strings.TrimRight(src, "/") + if src == "" { + err = fmt.Errorf("invalid source '/'") + writeJsonError(w, r, http.StatusBadRequest, err) + return + } + + srcPath := util.FullPath(src) + dstPath := util.FullPath(dst) + srcEntry, err := fs.filer.FindEntry(ctx, srcPath) + if err != nil { + err = fmt.Errorf("failed to get src entry '%s', err: %s", src, err) + writeJsonError(w, r, http.StatusBadRequest, err) + return + } + + oldDir, oldName := srcPath.DirAndName() + newDir, newName := dstPath.DirAndName() + newName = util.Nvl(newName, oldName) + + dstEntry, err := fs.filer.FindEntry(ctx, util.FullPath(strings.TrimRight(dst, "/"))) + if err != nil && err != filer_pb.ErrNotFound { + err = fmt.Errorf("failed to get dst entry '%s', err: %s", dst, err) + writeJsonError(w, r, http.StatusInternalServerError, err) + return + } + if err == nil && !dstEntry.IsDirectory() && srcEntry.IsDirectory() { + err = fmt.Errorf("move: cannot overwrite non-directory '%s' with directory '%s'", dst, src) + writeJsonError(w, r, http.StatusBadRequest, err) + return + } + + _, err = fs.AtomicRenameEntry(ctx, &filer_pb.AtomicRenameEntryRequest{ + OldDirectory: oldDir, + OldName: oldName, + NewDirectory: newDir, + NewName: newName, + }) + if err != nil { + err = fmt.Errorf("failed to move entry from '%s' to '%s', err: %s", src, dst, err) + writeJsonError(w, r, http.StatusBadRequest, err) + return + } + + w.WriteHeader(http.StatusNoContent) +} + // curl -X DELETE http://localhost:8888/path/to // curl -X DELETE http://localhost:8888/path/to?recursive=true // curl -X DELETE http://localhost:8888/path/to?recursive=true&ignoreRecursiveError=true diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index 61d30372b..be6e0c652 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -126,10 +126,6 @@ func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter return } -func isAppend(r *http.Request) bool { - return r.URL.Query().Get("op") == "append" -} - func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileName string, contentType string, so *operation.StorageOption, md5bytes []byte, fileChunks []*filer_pb.FileChunk, chunkOffset int64, content []byte) (filerResult *FilerPostResult, replyerr error) { // detect file mode @@ -161,8 +157,11 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa var entry *filer.Entry var mergedChunks []*filer_pb.FileChunk + + isAppend := r.URL.Query().Get("op") == "append" + isOffsetWrite := fileChunks[0].Offset > 0 // when it is an append - if isAppend(r) { + if isAppend || isOffsetWrite { existingEntry, findErr := fs.filer.FindEntry(ctx, util.FullPath(path)) if findErr != nil && findErr != filer_pb.ErrNotFound { glog.V(0).Infof("failing to find %s: %v", path, findErr) @@ -173,11 +172,13 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa entry.Mtime = time.Now() entry.Md5 = nil // adjust chunk offsets - for _, chunk := range fileChunks { - chunk.Offset += int64(entry.FileSize) + if isAppend { + for _, chunk := range fileChunks { + chunk.Offset += int64(entry.FileSize) + } + entry.FileSize += uint64(chunkOffset) } mergedChunks = append(entry.Chunks, fileChunks...) - entry.FileSize += uint64(chunkOffset) // TODO if len(entry.Content) > 0 { @@ -215,6 +216,10 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa return } entry.Chunks = mergedChunks + if isOffsetWrite { + entry.Md5 = nil + entry.FileSize = entry.Size() + } filerResult = &FilerPostResult{ Name: fileName, diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go index a7716ef02..294a97582 100644 --- a/weed/server/filer_server_handlers_write_upload.go +++ b/weed/server/filer_server_handlers_write_upload.go @@ -3,10 +3,12 @@ package weed_server import ( "bytes" "crypto/md5" + "fmt" "hash" "io" "net/http" "sort" + "strconv" "strings" "sync" "sync/atomic" @@ -28,6 +30,22 @@ var bufPool = sync.Pool{ } func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, chunkOffset int64, uploadErr error, smallContent []byte) { + query := r.URL.Query() + isAppend := query.Get("op") == "append" + + if query.Has("offset") { + offset := query.Get("offset") + offsetInt, err := strconv.ParseInt(offset, 10, 64) + if err != nil || offsetInt < 0 { + err = fmt.Errorf("invalid 'offset': '%s'", offset) + return nil, nil, 0, err, nil + } + if isAppend && offsetInt > 0 { + err = fmt.Errorf("cannot set offset when op=append") + return nil, nil, 0, err, nil + } + chunkOffset = offsetInt + } md5Hash = md5.New() var partReader = io.NopCloser(io.TeeReader(reader, md5Hash)) @@ -63,7 +81,7 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque bytesBufferLimitCond.Signal() break } - if chunkOffset == 0 && !isAppend(r) { + if chunkOffset == 0 && !isAppend { if dataSize < fs.option.SaveToFilerLimit || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) { chunkOffset += dataSize smallContent = make([]byte, dataSize) diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go index 568bfc7b5..91dd185c8 100644 --- a/weed/server/raft_server.go +++ b/weed/server/raft_server.go @@ -19,6 +19,17 @@ import ( "github.com/chrislusf/seaweedfs/weed/topology" ) +type RaftServerOption struct { + GrpcDialOption grpc.DialOption + Peers []pb.ServerAddress + ServerAddr pb.ServerAddress + DataDir string + Topo *topology.Topology + RaftResumeState bool + HeartbeatInterval time.Duration + ElectionTimeout time.Duration +} + type RaftServer struct { peers []pb.ServerAddress // initial peers to join with raftServer raft.Server @@ -52,12 +63,12 @@ func (s StateMachine) Recovery(data []byte) error { return nil } -func NewRaftServer(grpcDialOption grpc.DialOption, peers []pb.ServerAddress, serverAddr pb.ServerAddress, dataDir string, topo *topology.Topology, raftResumeState bool) (*RaftServer, error) { +func NewRaftServer(option *RaftServerOption) (*RaftServer, error) { s := &RaftServer{ - peers: peers, - serverAddr: serverAddr, - dataDir: dataDir, - topo: topo, + peers: option.Peers, + serverAddr: option.ServerAddr, + dataDir: option.DataDir, + topo: option.Topo, } if glog.V(4) { @@ -67,10 +78,10 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []pb.ServerAddress, ser raft.RegisterCommand(&topology.MaxVolumeIdCommand{}) var err error - transporter := raft.NewGrpcTransporter(grpcDialOption) - glog.V(0).Infof("Starting RaftServer with %v", serverAddr) + transporter := raft.NewGrpcTransporter(option.GrpcDialOption) + glog.V(0).Infof("Starting RaftServer with %v", option.ServerAddr) - if !raftResumeState { + if !option.RaftResumeState { // always clear previous metadata os.RemoveAll(path.Join(s.dataDir, "conf")) os.RemoveAll(path.Join(s.dataDir, "log")) @@ -80,14 +91,15 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []pb.ServerAddress, ser return nil, err } - stateMachine := StateMachine{topo: topo} - s.raftServer, err = raft.NewServer(string(s.serverAddr), s.dataDir, transporter, stateMachine, topo, "") + stateMachine := StateMachine{topo: option.Topo} + s.raftServer, err = raft.NewServer(string(s.serverAddr), s.dataDir, transporter, stateMachine, option.Topo, "") if err != nil { glog.V(0).Infoln(err) return nil, err } - s.raftServer.SetHeartbeatInterval(time.Duration(300+rand.Intn(150)) * time.Millisecond) - s.raftServer.SetElectionTimeout(10 * time.Second) + heartbeatInterval := time.Duration(float64(option.HeartbeatInterval) * (rand.Float64()*0.25 + 1)) + s.raftServer.SetHeartbeatInterval(heartbeatInterval) + s.raftServer.SetElectionTimeout(option.ElectionTimeout) if err := s.raftServer.LoadSnapshot(); err != nil { return nil, err } @@ -123,7 +135,7 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []pb.ServerAddress, ser s.GrpcServer = raft.NewGrpcServer(s.raftServer) - if s.raftServer.IsLogEmpty() && isTheFirstOne(serverAddr, s.peers) { + if s.raftServer.IsLogEmpty() && isTheFirstOne(option.ServerAddr, s.peers) { // Initialize the server by joining itself. // s.DoJoinCommand() } diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go index 2551cc6e6..4199ae36b 100644 --- a/weed/server/volume_server.go +++ b/weed/server/volume_server.go @@ -98,6 +98,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, handleStaticResources(adminMux) adminMux.HandleFunc("/status", vs.statusHandler) + adminMux.HandleFunc("/healthz", vs.healthzHandler) if signingKey == "" || enableUiAccess { // only expose the volume server details for safe environments adminMux.HandleFunc("/ui/index.html", vs.uiStatusHandler) diff --git a/weed/server/volume_server_handlers_admin.go b/weed/server/volume_server_handlers_admin.go index 7e6c06871..37cf109e2 100644 --- a/weed/server/volume_server_handlers_admin.go +++ b/weed/server/volume_server_handlers_admin.go @@ -1,6 +1,7 @@ package weed_server import ( + "github.com/chrislusf/seaweedfs/weed/topology" "net/http" "path/filepath" @@ -9,6 +10,24 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) +func (vs *VolumeServer) healthzHandler(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION) + volumeInfos := vs.store.VolumeInfos() + for _, vinfo := range volumeInfos { + if len(vinfo.Collection) == 0 { + continue + } + if vinfo.ReplicaPlacement.GetCopyCount() > 1 { + _, err := topology.GetWritableRemoteReplications(vs.store, vs.grpcDialOption, vinfo.Id, vs.GetMaster) + if err != nil { + w.WriteHeader(http.StatusServiceUnavailable) + return + } + } + } + w.WriteHeader(http.StatusOK) +} + func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) { w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION) m := make(map[string]interface{}) |
