aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2019-04-18 21:43:36 -0700
committerChris Lu <chris.lu@gmail.com>2019-04-18 21:43:36 -0700
commite5506152c0a27d38fa334b2e338d82ee02669ab9 (patch)
tree1f589cbbf7244cbe5dbfe84ca89f5996e4ca9ff3 /weed/server
parent33c92b819a334b5709e6f1cbe304e4b8855c1238 (diff)
downloadseaweedfs-e5506152c0a27d38fa334b2e338d82ee02669ab9.tar.xz
seaweedfs-e5506152c0a27d38fa334b2e338d82ee02669ab9.zip
refactoring
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/common.go7
-rw-r--r--weed/server/master_grpc_server_volume.go5
-rw-r--r--weed/server/master_server_handlers.go4
-rw-r--r--weed/server/master_server_handlers_admin.go12
-rw-r--r--weed/server/volume_grpc_admin.go10
-rw-r--r--weed/server/volume_grpc_batch_delete.go6
-rw-r--r--weed/server/volume_grpc_copy.go13
-rw-r--r--weed/server/volume_grpc_copy_incremental.go6
-rw-r--r--weed/server/volume_grpc_tail.go3
-rw-r--r--weed/server/volume_grpc_vacuum.go10
-rw-r--r--weed/server/volume_server_handlers_read.go10
-rw-r--r--weed/server/volume_server_handlers_write.go10
12 files changed, 52 insertions, 44 deletions
diff --git a/weed/server/common.go b/weed/server/common.go
index 1c75d44cf..e02ab38a6 100644
--- a/weed/server/common.go
+++ b/weed/server/common.go
@@ -5,17 +5,18 @@ import (
"encoding/json"
"errors"
"fmt"
- "google.golang.org/grpc"
"net/http"
"path/filepath"
"strconv"
"strings"
"time"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "google.golang.org/grpc"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/stats"
- "github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/util"
_ "github.com/chrislusf/seaweedfs/weed/statik"
@@ -90,7 +91,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st
}
debug("parsing upload file...")
- fname, data, mimeType, pairMap, isGzipped, originalDataSize, lastModified, _, _, pe := storage.ParseUpload(r)
+ fname, data, mimeType, pairMap, isGzipped, originalDataSize, lastModified, _, _, pe := needle.ParseUpload(r)
if pe != nil {
writeJsonError(w, r, http.StatusBadRequest, pe)
return
diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go
index 7b8efb933..2265cee3b 100644
--- a/weed/server/master_grpc_server_volume.go
+++ b/weed/server/master_grpc_server_volume.go
@@ -8,6 +8,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/topology"
)
@@ -55,7 +56,7 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
if err != nil {
return nil, err
}
- ttl, err := storage.ReadTTL(req.Ttl)
+ ttl, err := needle.ReadTTL(req.Ttl)
if err != nil {
return nil, err
}
@@ -110,7 +111,7 @@ func (ms *MasterServer) Statistics(ctx context.Context, req *master_pb.Statistic
if err != nil {
return nil, err
}
- ttl, err := storage.ReadTTL(req.Ttl)
+ ttl, err := needle.ReadTTL(req.Ttl)
if err != nil {
return nil, err
}
diff --git a/weed/server/master_server_handlers.go b/weed/server/master_server_handlers.go
index 5bdb448c1..60b593013 100644
--- a/weed/server/master_server_handlers.go
+++ b/weed/server/master_server_handlers.go
@@ -9,7 +9,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/stats"
- "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
)
func (ms *MasterServer) lookupVolumeId(vids []string, collection string) (volumeLocations map[string]operation.LookupResult) {
@@ -22,7 +22,7 @@ func (ms *MasterServer) lookupVolumeId(vids []string, collection string) (volume
if _, ok := volumeLocations[vid]; ok {
continue
}
- volumeId, err := storage.NewVolumeId(vid)
+ volumeId, err := needle.NewVolumeId(vid)
if err == nil {
machines := ms.Topo.Lookup(collection, volumeId)
if machines != nil {
diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go
index 4f0195084..244098515 100644
--- a/weed/server/master_server_handlers_admin.go
+++ b/weed/server/master_server_handlers_admin.go
@@ -4,15 +4,17 @@ import (
"context"
"errors"
"fmt"
+ "math/rand"
+ "net/http"
+ "strconv"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/topology"
"github.com/chrislusf/seaweedfs/weed/util"
- "math/rand"
- "net/http"
- "strconv"
)
func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.Request) {
@@ -93,7 +95,7 @@ func (ms *MasterServer) volumeStatusHandler(w http.ResponseWriter, r *http.Reque
func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request) {
vid, _, _, _, _ := parseURLPath(r.URL.Path)
- volumeId, err := storage.NewVolumeId(vid)
+ volumeId, err := needle.NewVolumeId(vid)
if err != nil {
debug("parsing error:", err, r.URL.Path)
return
@@ -146,7 +148,7 @@ func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGr
if err != nil {
return nil, err
}
- ttl, err := storage.ReadTTL(r.FormValue("ttl"))
+ ttl, err := needle.ReadTTL(r.FormValue("ttl"))
if err != nil {
return nil, err
}
diff --git a/weed/server/volume_grpc_admin.go b/weed/server/volume_grpc_admin.go
index c32f8a086..d9244aa64 100644
--- a/weed/server/volume_grpc_admin.go
+++ b/weed/server/volume_grpc_admin.go
@@ -5,7 +5,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
- "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
)
func (vs *VolumeServer) DeleteCollection(ctx context.Context, req *volume_server_pb.DeleteCollectionRequest) (*volume_server_pb.DeleteCollectionResponse, error) {
@@ -29,7 +29,7 @@ func (vs *VolumeServer) AllocateVolume(ctx context.Context, req *volume_server_p
resp := &volume_server_pb.AllocateVolumeResponse{}
err := vs.store.AddVolume(
- storage.VolumeId(req.VolumeId),
+ needle.VolumeId(req.VolumeId),
req.Collection,
vs.needleMapKind,
req.Replication,
@@ -51,7 +51,7 @@ func (vs *VolumeServer) VolumeMount(ctx context.Context, req *volume_server_pb.V
resp := &volume_server_pb.VolumeMountResponse{}
- err := vs.store.MountVolume(storage.VolumeId(req.VolumeId))
+ err := vs.store.MountVolume(needle.VolumeId(req.VolumeId))
if err != nil {
glog.Errorf("volume mount %v: %v", req, err)
@@ -67,7 +67,7 @@ func (vs *VolumeServer) VolumeUnmount(ctx context.Context, req *volume_server_pb
resp := &volume_server_pb.VolumeUnmountResponse{}
- err := vs.store.UnmountVolume(storage.VolumeId(req.VolumeId))
+ err := vs.store.UnmountVolume(needle.VolumeId(req.VolumeId))
if err != nil {
glog.Errorf("volume unmount %v: %v", req, err)
@@ -83,7 +83,7 @@ func (vs *VolumeServer) VolumeDelete(ctx context.Context, req *volume_server_pb.
resp := &volume_server_pb.VolumeDeleteResponse{}
- err := vs.store.DeleteVolume(storage.VolumeId(req.VolumeId))
+ err := vs.store.DeleteVolume(needle.VolumeId(req.VolumeId))
if err != nil {
glog.Errorf("volume delete %v: %v", req, err)
diff --git a/weed/server/volume_grpc_batch_delete.go b/weed/server/volume_grpc_batch_delete.go
index 3554d97ae..d7fbb6edf 100644
--- a/weed/server/volume_grpc_batch_delete.go
+++ b/weed/server/volume_grpc_batch_delete.go
@@ -7,7 +7,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
- "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
)
func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.BatchDeleteRequest) (*volume_server_pb.BatchDeleteResponse, error) {
@@ -26,8 +26,8 @@ func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.B
continue
}
- n := new(storage.Needle)
- volumeId, _ := storage.NewVolumeId(vid)
+ n := new(needle.Needle)
+ volumeId, _ := needle.NewVolumeId(vid)
n.ParsePath(id_cookie)
cookie := n.Cookie
diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go
index 2bac7d2ee..b7789f88d 100644
--- a/weed/server/volume_grpc_copy.go
+++ b/weed/server/volume_grpc_copy.go
@@ -3,10 +3,13 @@ package weed_server
import (
"context"
"fmt"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+
"io"
"os"
)
@@ -14,10 +17,10 @@ import (
// VolumeCopy copy the .idx .dat files, and mount the volume
func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.VolumeCopyRequest) (*volume_server_pb.VolumeCopyResponse, error) {
- v := vs.store.GetVolume(storage.VolumeId(req.VolumeId))
+ v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
if v != nil {
// unmount the volume
- err := vs.store.UnmountVolume(storage.VolumeId(req.VolumeId))
+ err := vs.store.UnmountVolume(needle.VolumeId(req.VolumeId))
if err != nil {
return nil, fmt.Errorf("failed to unmount volume %d: %v", req.VolumeId, err)
}
@@ -88,7 +91,7 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
}
// mount the volume
- err = vs.store.MountVolume(storage.VolumeId(req.VolumeId))
+ err = vs.store.MountVolume(needle.VolumeId(req.VolumeId))
if err != nil {
return nil, fmt.Errorf("failed to mount volume %d: %v", req.VolumeId, err)
}
@@ -144,7 +147,7 @@ func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName s
func (vs *VolumeServer) ReadVolumeFileStatus(ctx context.Context, req *volume_server_pb.ReadVolumeFileStatusRequest) (*volume_server_pb.ReadVolumeFileStatusResponse, error) {
resp := &volume_server_pb.ReadVolumeFileStatusResponse{}
- v := vs.store.GetVolume(storage.VolumeId(req.VolumeId))
+ v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
if v == nil {
return nil, fmt.Errorf("not found volume id %d", req.VolumeId)
}
@@ -160,7 +163,7 @@ func (vs *VolumeServer) ReadVolumeFileStatus(ctx context.Context, req *volume_se
func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream volume_server_pb.VolumeServer_CopyFileServer) error {
- v := vs.store.GetVolume(storage.VolumeId(req.VolumeId))
+ v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
if v == nil {
return fmt.Errorf("not found volume id %d", req.VolumeId)
}
diff --git a/weed/server/volume_grpc_copy_incremental.go b/weed/server/volume_grpc_copy_incremental.go
index 06e7017e8..5977c44f6 100644
--- a/weed/server/volume_grpc_copy_incremental.go
+++ b/weed/server/volume_grpc_copy_incremental.go
@@ -7,12 +7,12 @@ import (
"os"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
- "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
)
func (vs *VolumeServer) VolumeIncrementalCopy(req *volume_server_pb.VolumeIncrementalCopyRequest, stream volume_server_pb.VolumeServer_VolumeIncrementalCopyServer) error {
- v := vs.store.GetVolume(storage.VolumeId(req.VolumeId))
+ v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
if v == nil {
return fmt.Errorf("not found volume id %d", req.VolumeId)
}
@@ -36,7 +36,7 @@ func (vs *VolumeServer) VolumeIncrementalCopy(req *volume_server_pb.VolumeIncrem
func (vs *VolumeServer) VolumeSyncStatus(ctx context.Context, req *volume_server_pb.VolumeSyncStatusRequest) (*volume_server_pb.VolumeSyncStatusResponse, error) {
- v := vs.store.GetVolume(storage.VolumeId(req.VolumeId))
+ v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
if v == nil {
return nil, fmt.Errorf("not found volume id %d", req.VolumeId)
}
diff --git a/weed/server/volume_grpc_tail.go b/weed/server/volume_grpc_tail.go
index da248498f..87db6e146 100644
--- a/weed/server/volume_grpc_tail.go
+++ b/weed/server/volume_grpc_tail.go
@@ -7,11 +7,12 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
)
func (vs *VolumeServer) VolumeTail(req *volume_server_pb.VolumeTailRequest, stream volume_server_pb.VolumeServer_VolumeTailServer) error {
- v := vs.store.GetVolume(storage.VolumeId(req.VolumeId))
+ v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
if v == nil {
return fmt.Errorf("not found volume id %d", req.VolumeId)
}
diff --git a/weed/server/volume_grpc_vacuum.go b/weed/server/volume_grpc_vacuum.go
index d31b8f8e7..4aa6588cb 100644
--- a/weed/server/volume_grpc_vacuum.go
+++ b/weed/server/volume_grpc_vacuum.go
@@ -5,14 +5,14 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
- "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
)
func (vs *VolumeServer) VacuumVolumeCheck(ctx context.Context, req *volume_server_pb.VacuumVolumeCheckRequest) (*volume_server_pb.VacuumVolumeCheckResponse, error) {
resp := &volume_server_pb.VacuumVolumeCheckResponse{}
- garbageRatio, err := vs.store.CheckCompactVolume(storage.VolumeId(req.VolumeId))
+ garbageRatio, err := vs.store.CheckCompactVolume(needle.VolumeId(req.VolumeId))
resp.GarbageRatio = garbageRatio
@@ -28,7 +28,7 @@ func (vs *VolumeServer) VacuumVolumeCompact(ctx context.Context, req *volume_ser
resp := &volume_server_pb.VacuumVolumeCompactResponse{}
- err := vs.store.CompactVolume(storage.VolumeId(req.VolumeId), req.Preallocate)
+ err := vs.store.CompactVolume(needle.VolumeId(req.VolumeId), req.Preallocate)
if err != nil {
glog.Errorf("compact volume %d: %v", req.VolumeId, err)
@@ -44,7 +44,7 @@ func (vs *VolumeServer) VacuumVolumeCommit(ctx context.Context, req *volume_serv
resp := &volume_server_pb.VacuumVolumeCommitResponse{}
- err := vs.store.CommitCompactVolume(storage.VolumeId(req.VolumeId))
+ err := vs.store.CommitCompactVolume(needle.VolumeId(req.VolumeId))
if err != nil {
glog.Errorf("commit volume %d: %v", req.VolumeId, err)
@@ -60,7 +60,7 @@ func (vs *VolumeServer) VacuumVolumeCleanup(ctx context.Context, req *volume_ser
resp := &volume_server_pb.VacuumVolumeCleanupResponse{}
- err := vs.store.CommitCleanupVolume(storage.VolumeId(req.VolumeId))
+ err := vs.store.CommitCleanupVolume(needle.VolumeId(req.VolumeId))
if err != nil {
glog.Errorf("cleanup volume %d: %v", req.VolumeId, err)
diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go
index 92c728141..816afcb8b 100644
--- a/weed/server/volume_server_handlers_read.go
+++ b/weed/server/volume_server_handlers_read.go
@@ -17,16 +17,16 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/images"
"github.com/chrislusf/seaweedfs/weed/operation"
- "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
)
var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"")
func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) {
- n := new(storage.Needle)
+ n := new(needle.Needle)
vid, fid, filename, ext, _ := parseURLPath(r.URL.Path)
- volumeId, err := storage.NewVolumeId(vid)
+ volumeId, err := needle.NewVolumeId(vid)
if err != nil {
glog.V(2).Infoln("parsing error:", err, r.URL.Path)
w.WriteHeader(http.StatusBadRequest)
@@ -132,7 +132,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") {
w.Header().Set("Content-Encoding", "gzip")
} else {
- if n.Data, err = operation.UnGzipData(n.Data); err != nil {
+ if n.Data, err = util.UnGzipData(n.Data); err != nil {
glog.V(0).Infoln("ungzip error:", err, r.URL.Path)
}
}
@@ -146,7 +146,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
}
}
-func (vs *VolumeServer) tryHandleChunkedFile(n *storage.Needle, fileName string, w http.ResponseWriter, r *http.Request) (processed bool) {
+func (vs *VolumeServer) tryHandleChunkedFile(n *needle.Needle, fileName string, w http.ResponseWriter, r *http.Request) (processed bool) {
if !n.IsChunkedManifest() || r.URL.Query().Get("cm") == "false" {
return false
}
diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go
index 9fb252eb7..45c868c33 100644
--- a/weed/server/volume_server_handlers_write.go
+++ b/weed/server/volume_server_handlers_write.go
@@ -10,7 +10,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
- "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/topology"
)
@@ -22,7 +22,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
}
vid, fid, _, _, _ := parseURLPath(r.URL.Path)
- volumeId, ve := storage.NewVolumeId(vid)
+ volumeId, ve := needle.NewVolumeId(vid)
if ve != nil {
glog.V(0).Infoln("NewVolumeId error:", ve)
writeJsonError(w, r, http.StatusBadRequest, ve)
@@ -34,7 +34,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
return
}
- needle, originalSize, ne := storage.CreateNeedleFromRequest(r, vs.FixJpgOrientation)
+ needle, originalSize, ne := needle.CreateNeedleFromRequest(r, vs.FixJpgOrientation)
if ne != nil {
writeJsonError(w, r, http.StatusBadRequest, ne)
return
@@ -57,9 +57,9 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
}
func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
- n := new(storage.Needle)
+ n := new(needle.Needle)
vid, fid, _, _, _ := parseURLPath(r.URL.Path)
- volumeId, _ := storage.NewVolumeId(vid)
+ volumeId, _ := needle.NewVolumeId(vid)
n.ParsePath(fid)
if !vs.maybeCheckJwtAuthorization(r, vid, fid) {