diff options
| author | Chris Lu <chris.lu@gmail.com> | 2019-04-18 21:43:36 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2019-04-18 21:43:36 -0700 |
| commit | e5506152c0a27d38fa334b2e338d82ee02669ab9 (patch) | |
| tree | 1f589cbbf7244cbe5dbfe84ca89f5996e4ca9ff3 /weed/server | |
| parent | 33c92b819a334b5709e6f1cbe304e4b8855c1238 (diff) | |
| download | seaweedfs-e5506152c0a27d38fa334b2e338d82ee02669ab9.tar.xz seaweedfs-e5506152c0a27d38fa334b2e338d82ee02669ab9.zip | |
refactoring
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/common.go | 7 | ||||
| -rw-r--r-- | weed/server/master_grpc_server_volume.go | 5 | ||||
| -rw-r--r-- | weed/server/master_server_handlers.go | 4 | ||||
| -rw-r--r-- | weed/server/master_server_handlers_admin.go | 12 | ||||
| -rw-r--r-- | weed/server/volume_grpc_admin.go | 10 | ||||
| -rw-r--r-- | weed/server/volume_grpc_batch_delete.go | 6 | ||||
| -rw-r--r-- | weed/server/volume_grpc_copy.go | 13 | ||||
| -rw-r--r-- | weed/server/volume_grpc_copy_incremental.go | 6 | ||||
| -rw-r--r-- | weed/server/volume_grpc_tail.go | 3 | ||||
| -rw-r--r-- | weed/server/volume_grpc_vacuum.go | 10 | ||||
| -rw-r--r-- | weed/server/volume_server_handlers_read.go | 10 | ||||
| -rw-r--r-- | weed/server/volume_server_handlers_write.go | 10 |
12 files changed, 52 insertions, 44 deletions
diff --git a/weed/server/common.go b/weed/server/common.go index 1c75d44cf..e02ab38a6 100644 --- a/weed/server/common.go +++ b/weed/server/common.go @@ -5,17 +5,18 @@ import ( "encoding/json" "errors" "fmt" - "google.golang.org/grpc" "net/http" "path/filepath" "strconv" "strings" "time" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/stats" - "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/util" _ "github.com/chrislusf/seaweedfs/weed/statik" @@ -90,7 +91,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st } debug("parsing upload file...") - fname, data, mimeType, pairMap, isGzipped, originalDataSize, lastModified, _, _, pe := storage.ParseUpload(r) + fname, data, mimeType, pairMap, isGzipped, originalDataSize, lastModified, _, _, pe := needle.ParseUpload(r) if pe != nil { writeJsonError(w, r, http.StatusBadRequest, pe) return diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go index 7b8efb933..2265cee3b 100644 --- a/weed/server/master_grpc_server_volume.go +++ b/weed/server/master_grpc_server_volume.go @@ -8,6 +8,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/topology" ) @@ -55,7 +56,7 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest if err != nil { return nil, err } - ttl, err := storage.ReadTTL(req.Ttl) + ttl, err := needle.ReadTTL(req.Ttl) if err != nil { return nil, err } @@ -110,7 +111,7 @@ func (ms *MasterServer) Statistics(ctx context.Context, req *master_pb.Statistic if err != nil { return nil, err } - ttl, err := storage.ReadTTL(req.Ttl) + ttl, err := needle.ReadTTL(req.Ttl) if err != nil { return nil, err } diff --git a/weed/server/master_server_handlers.go b/weed/server/master_server_handlers.go index 5bdb448c1..60b593013 100644 --- a/weed/server/master_server_handlers.go +++ b/weed/server/master_server_handlers.go @@ -9,7 +9,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/stats" - "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" ) func (ms *MasterServer) lookupVolumeId(vids []string, collection string) (volumeLocations map[string]operation.LookupResult) { @@ -22,7 +22,7 @@ func (ms *MasterServer) lookupVolumeId(vids []string, collection string) (volume if _, ok := volumeLocations[vid]; ok { continue } - volumeId, err := storage.NewVolumeId(vid) + volumeId, err := needle.NewVolumeId(vid) if err == nil { machines := ms.Topo.Lookup(collection, volumeId) if machines != nil { diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go index 4f0195084..244098515 100644 --- a/weed/server/master_server_handlers_admin.go +++ b/weed/server/master_server_handlers_admin.go @@ -4,15 +4,17 @@ import ( "context" "errors" "fmt" + "math/rand" + "net/http" + "strconv" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/topology" "github.com/chrislusf/seaweedfs/weed/util" - "math/rand" - "net/http" - "strconv" ) func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.Request) { @@ -93,7 +95,7 @@ func (ms *MasterServer) volumeStatusHandler(w http.ResponseWriter, r *http.Reque func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request) { vid, _, _, _, _ := parseURLPath(r.URL.Path) - volumeId, err := storage.NewVolumeId(vid) + volumeId, err := needle.NewVolumeId(vid) if err != nil { debug("parsing error:", err, r.URL.Path) return @@ -146,7 +148,7 @@ func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGr if err != nil { return nil, err } - ttl, err := storage.ReadTTL(r.FormValue("ttl")) + ttl, err := needle.ReadTTL(r.FormValue("ttl")) if err != nil { return nil, err } diff --git a/weed/server/volume_grpc_admin.go b/weed/server/volume_grpc_admin.go index c32f8a086..d9244aa64 100644 --- a/weed/server/volume_grpc_admin.go +++ b/weed/server/volume_grpc_admin.go @@ -5,7 +5,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" - "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" ) func (vs *VolumeServer) DeleteCollection(ctx context.Context, req *volume_server_pb.DeleteCollectionRequest) (*volume_server_pb.DeleteCollectionResponse, error) { @@ -29,7 +29,7 @@ func (vs *VolumeServer) AllocateVolume(ctx context.Context, req *volume_server_p resp := &volume_server_pb.AllocateVolumeResponse{} err := vs.store.AddVolume( - storage.VolumeId(req.VolumeId), + needle.VolumeId(req.VolumeId), req.Collection, vs.needleMapKind, req.Replication, @@ -51,7 +51,7 @@ func (vs *VolumeServer) VolumeMount(ctx context.Context, req *volume_server_pb.V resp := &volume_server_pb.VolumeMountResponse{} - err := vs.store.MountVolume(storage.VolumeId(req.VolumeId)) + err := vs.store.MountVolume(needle.VolumeId(req.VolumeId)) if err != nil { glog.Errorf("volume mount %v: %v", req, err) @@ -67,7 +67,7 @@ func (vs *VolumeServer) VolumeUnmount(ctx context.Context, req *volume_server_pb resp := &volume_server_pb.VolumeUnmountResponse{} - err := vs.store.UnmountVolume(storage.VolumeId(req.VolumeId)) + err := vs.store.UnmountVolume(needle.VolumeId(req.VolumeId)) if err != nil { glog.Errorf("volume unmount %v: %v", req, err) @@ -83,7 +83,7 @@ func (vs *VolumeServer) VolumeDelete(ctx context.Context, req *volume_server_pb. resp := &volume_server_pb.VolumeDeleteResponse{} - err := vs.store.DeleteVolume(storage.VolumeId(req.VolumeId)) + err := vs.store.DeleteVolume(needle.VolumeId(req.VolumeId)) if err != nil { glog.Errorf("volume delete %v: %v", req, err) diff --git a/weed/server/volume_grpc_batch_delete.go b/weed/server/volume_grpc_batch_delete.go index 3554d97ae..d7fbb6edf 100644 --- a/weed/server/volume_grpc_batch_delete.go +++ b/weed/server/volume_grpc_batch_delete.go @@ -7,7 +7,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" - "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" ) func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.BatchDeleteRequest) (*volume_server_pb.BatchDeleteResponse, error) { @@ -26,8 +26,8 @@ func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.B continue } - n := new(storage.Needle) - volumeId, _ := storage.NewVolumeId(vid) + n := new(needle.Needle) + volumeId, _ := needle.NewVolumeId(vid) n.ParsePath(id_cookie) cookie := n.Cookie diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go index 2bac7d2ee..b7789f88d 100644 --- a/weed/server/volume_grpc_copy.go +++ b/weed/server/volume_grpc_copy.go @@ -3,10 +3,13 @@ package weed_server import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "io" "os" ) @@ -14,10 +17,10 @@ import ( // VolumeCopy copy the .idx .dat files, and mount the volume func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.VolumeCopyRequest) (*volume_server_pb.VolumeCopyResponse, error) { - v := vs.store.GetVolume(storage.VolumeId(req.VolumeId)) + v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) if v != nil { // unmount the volume - err := vs.store.UnmountVolume(storage.VolumeId(req.VolumeId)) + err := vs.store.UnmountVolume(needle.VolumeId(req.VolumeId)) if err != nil { return nil, fmt.Errorf("failed to unmount volume %d: %v", req.VolumeId, err) } @@ -88,7 +91,7 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo } // mount the volume - err = vs.store.MountVolume(storage.VolumeId(req.VolumeId)) + err = vs.store.MountVolume(needle.VolumeId(req.VolumeId)) if err != nil { return nil, fmt.Errorf("failed to mount volume %d: %v", req.VolumeId, err) } @@ -144,7 +147,7 @@ func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName s func (vs *VolumeServer) ReadVolumeFileStatus(ctx context.Context, req *volume_server_pb.ReadVolumeFileStatusRequest) (*volume_server_pb.ReadVolumeFileStatusResponse, error) { resp := &volume_server_pb.ReadVolumeFileStatusResponse{} - v := vs.store.GetVolume(storage.VolumeId(req.VolumeId)) + v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) if v == nil { return nil, fmt.Errorf("not found volume id %d", req.VolumeId) } @@ -160,7 +163,7 @@ func (vs *VolumeServer) ReadVolumeFileStatus(ctx context.Context, req *volume_se func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream volume_server_pb.VolumeServer_CopyFileServer) error { - v := vs.store.GetVolume(storage.VolumeId(req.VolumeId)) + v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) if v == nil { return fmt.Errorf("not found volume id %d", req.VolumeId) } diff --git a/weed/server/volume_grpc_copy_incremental.go b/weed/server/volume_grpc_copy_incremental.go index 06e7017e8..5977c44f6 100644 --- a/weed/server/volume_grpc_copy_incremental.go +++ b/weed/server/volume_grpc_copy_incremental.go @@ -7,12 +7,12 @@ import ( "os" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" - "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" ) func (vs *VolumeServer) VolumeIncrementalCopy(req *volume_server_pb.VolumeIncrementalCopyRequest, stream volume_server_pb.VolumeServer_VolumeIncrementalCopyServer) error { - v := vs.store.GetVolume(storage.VolumeId(req.VolumeId)) + v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) if v == nil { return fmt.Errorf("not found volume id %d", req.VolumeId) } @@ -36,7 +36,7 @@ func (vs *VolumeServer) VolumeIncrementalCopy(req *volume_server_pb.VolumeIncrem func (vs *VolumeServer) VolumeSyncStatus(ctx context.Context, req *volume_server_pb.VolumeSyncStatusRequest) (*volume_server_pb.VolumeSyncStatusResponse, error) { - v := vs.store.GetVolume(storage.VolumeId(req.VolumeId)) + v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) if v == nil { return nil, fmt.Errorf("not found volume id %d", req.VolumeId) } diff --git a/weed/server/volume_grpc_tail.go b/weed/server/volume_grpc_tail.go index da248498f..87db6e146 100644 --- a/weed/server/volume_grpc_tail.go +++ b/weed/server/volume_grpc_tail.go @@ -7,11 +7,12 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" ) func (vs *VolumeServer) VolumeTail(req *volume_server_pb.VolumeTailRequest, stream volume_server_pb.VolumeServer_VolumeTailServer) error { - v := vs.store.GetVolume(storage.VolumeId(req.VolumeId)) + v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) if v == nil { return fmt.Errorf("not found volume id %d", req.VolumeId) } diff --git a/weed/server/volume_grpc_vacuum.go b/weed/server/volume_grpc_vacuum.go index d31b8f8e7..4aa6588cb 100644 --- a/weed/server/volume_grpc_vacuum.go +++ b/weed/server/volume_grpc_vacuum.go @@ -5,14 +5,14 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" - "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" ) func (vs *VolumeServer) VacuumVolumeCheck(ctx context.Context, req *volume_server_pb.VacuumVolumeCheckRequest) (*volume_server_pb.VacuumVolumeCheckResponse, error) { resp := &volume_server_pb.VacuumVolumeCheckResponse{} - garbageRatio, err := vs.store.CheckCompactVolume(storage.VolumeId(req.VolumeId)) + garbageRatio, err := vs.store.CheckCompactVolume(needle.VolumeId(req.VolumeId)) resp.GarbageRatio = garbageRatio @@ -28,7 +28,7 @@ func (vs *VolumeServer) VacuumVolumeCompact(ctx context.Context, req *volume_ser resp := &volume_server_pb.VacuumVolumeCompactResponse{} - err := vs.store.CompactVolume(storage.VolumeId(req.VolumeId), req.Preallocate) + err := vs.store.CompactVolume(needle.VolumeId(req.VolumeId), req.Preallocate) if err != nil { glog.Errorf("compact volume %d: %v", req.VolumeId, err) @@ -44,7 +44,7 @@ func (vs *VolumeServer) VacuumVolumeCommit(ctx context.Context, req *volume_serv resp := &volume_server_pb.VacuumVolumeCommitResponse{} - err := vs.store.CommitCompactVolume(storage.VolumeId(req.VolumeId)) + err := vs.store.CommitCompactVolume(needle.VolumeId(req.VolumeId)) if err != nil { glog.Errorf("commit volume %d: %v", req.VolumeId, err) @@ -60,7 +60,7 @@ func (vs *VolumeServer) VacuumVolumeCleanup(ctx context.Context, req *volume_ser resp := &volume_server_pb.VacuumVolumeCleanupResponse{} - err := vs.store.CommitCleanupVolume(storage.VolumeId(req.VolumeId)) + err := vs.store.CommitCleanupVolume(needle.VolumeId(req.VolumeId)) if err != nil { glog.Errorf("cleanup volume %d: %v", req.VolumeId, err) diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go index 92c728141..816afcb8b 100644 --- a/weed/server/volume_server_handlers_read.go +++ b/weed/server/volume_server_handlers_read.go @@ -17,16 +17,16 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/images" "github.com/chrislusf/seaweedfs/weed/operation" - "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/util" ) var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"") func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) { - n := new(storage.Needle) + n := new(needle.Needle) vid, fid, filename, ext, _ := parseURLPath(r.URL.Path) - volumeId, err := storage.NewVolumeId(vid) + volumeId, err := needle.NewVolumeId(vid) if err != nil { glog.V(2).Infoln("parsing error:", err, r.URL.Path) w.WriteHeader(http.StatusBadRequest) @@ -132,7 +132,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") { w.Header().Set("Content-Encoding", "gzip") } else { - if n.Data, err = operation.UnGzipData(n.Data); err != nil { + if n.Data, err = util.UnGzipData(n.Data); err != nil { glog.V(0).Infoln("ungzip error:", err, r.URL.Path) } } @@ -146,7 +146,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) } } -func (vs *VolumeServer) tryHandleChunkedFile(n *storage.Needle, fileName string, w http.ResponseWriter, r *http.Request) (processed bool) { +func (vs *VolumeServer) tryHandleChunkedFile(n *needle.Needle, fileName string, w http.ResponseWriter, r *http.Request) (processed bool) { if !n.IsChunkedManifest() || r.URL.Query().Get("cm") == "false" { return false } diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go index 9fb252eb7..45c868c33 100644 --- a/weed/server/volume_server_handlers_write.go +++ b/weed/server/volume_server_handlers_write.go @@ -10,7 +10,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" - "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/topology" ) @@ -22,7 +22,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) { } vid, fid, _, _, _ := parseURLPath(r.URL.Path) - volumeId, ve := storage.NewVolumeId(vid) + volumeId, ve := needle.NewVolumeId(vid) if ve != nil { glog.V(0).Infoln("NewVolumeId error:", ve) writeJsonError(w, r, http.StatusBadRequest, ve) @@ -34,7 +34,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) { return } - needle, originalSize, ne := storage.CreateNeedleFromRequest(r, vs.FixJpgOrientation) + needle, originalSize, ne := needle.CreateNeedleFromRequest(r, vs.FixJpgOrientation) if ne != nil { writeJsonError(w, r, http.StatusBadRequest, ne) return @@ -57,9 +57,9 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) { } func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { - n := new(storage.Needle) + n := new(needle.Needle) vid, fid, _, _, _ := parseURLPath(r.URL.Path) - volumeId, _ := storage.NewVolumeId(vid) + volumeId, _ := needle.NewVolumeId(vid) n.ParsePath(fid) if !vs.maybeCheckJwtAuthorization(r, vid, fid) { |
