diff options
Diffstat (limited to 'weed/server')
31 files changed, 773 insertions, 229 deletions
diff --git a/weed/server/common.go b/weed/server/common.go index e02ab38a6..888ddec49 100644 --- a/weed/server/common.go +++ b/weed/server/common.go @@ -11,17 +11,18 @@ import ( "strings" "time" - "github.com/chrislusf/seaweedfs/weed/storage/needle" "google.golang.org/grpc" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/stats" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/util" - _ "github.com/chrislusf/seaweedfs/weed/statik" "github.com/gorilla/mux" statik "github.com/rakyll/statik/fs" + + _ "github.com/chrislusf/seaweedfs/weed/statik" ) var serverStats *stats.ServerStats @@ -48,10 +49,16 @@ func writeJson(w http.ResponseWriter, r *http.Request, httpStatus int, obj inter if callback == "" { w.Header().Set("Content-Type", "application/json") w.WriteHeader(httpStatus) + if httpStatus == http.StatusNotModified { + return + } _, err = w.Write(bytes) } else { w.Header().Set("Content-Type", "application/javascript") w.WriteHeader(httpStatus) + if httpStatus == http.StatusNotModified { + return + } if _, err = w.Write([]uint8(callback)); err != nil { return } @@ -109,6 +116,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st } ar := &operation.VolumeAssignRequest{ Count: count, + DataCenter: r.FormValue("dataCenter"), Replication: r.FormValue("replication"), Collection: r.FormValue("collection"), Ttl: r.FormValue("ttl"), diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index 8eea2441e..a84feec2d 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -20,7 +20,7 @@ func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.L entry, err := fs.filer.FindEntry(ctx, filer2.FullPath(filepath.ToSlash(filepath.Join(req.Directory, req.Name)))) if err != nil { - return nil, fmt.Errorf("%s not found under %s: %v", req.Name, req.Directory, err) + return nil, err } return &filer_pb.LookupDirectoryEntryResponse{ @@ -29,27 +29,32 @@ func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.L IsDirectory: entry.IsDirectory(), Attributes: filer2.EntryAttributeToPb(entry), Chunks: entry.Chunks, + Extended: entry.Extended, }, }, nil } -func (fs *FilerServer) ListEntries(ctx context.Context, req *filer_pb.ListEntriesRequest) (*filer_pb.ListEntriesResponse, error) { +func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream filer_pb.SeaweedFiler_ListEntriesServer) error { limit := int(req.Limit) if limit == 0 { limit = fs.option.DirListingLimit } - resp := &filer_pb.ListEntriesResponse{} + paginationLimit := filer2.PaginationSize + if limit < paginationLimit { + paginationLimit = limit + } + lastFileName := req.StartFromFileName includeLastFile := req.InclusiveStartFrom for limit > 0 { - entries, err := fs.filer.ListDirectoryEntries(ctx, filer2.FullPath(req.Directory), lastFileName, includeLastFile, 1024) + entries, err := fs.filer.ListDirectoryEntries(stream.Context(), filer2.FullPath(req.Directory), lastFileName, includeLastFile, paginationLimit) if err != nil { - return nil, err + return err } if len(entries) == 0 { - return resp, nil + return nil } includeLastFile = false @@ -64,22 +69,30 @@ func (fs *FilerServer) ListEntries(ctx context.Context, req *filer_pb.ListEntrie } } - resp.Entries = append(resp.Entries, &filer_pb.Entry{ - Name: entry.Name(), - IsDirectory: entry.IsDirectory(), - Chunks: entry.Chunks, - Attributes: filer2.EntryAttributeToPb(entry), - }) + if err := stream.Send(&filer_pb.ListEntriesResponse{ + Entry: &filer_pb.Entry{ + Name: entry.Name(), + IsDirectory: entry.IsDirectory(), + Chunks: entry.Chunks, + Attributes: filer2.EntryAttributeToPb(entry), + Extended: entry.Extended, + }, + }); err != nil { + return err + } limit-- + if limit == 0 { + return nil + } } - if len(resp.Entries) < 1024 { + if len(entries) < paginationLimit { break } } - return resp, nil + return nil } func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVolumeRequest) (*filer_pb.LookupVolumeResponse, error) { @@ -95,7 +108,11 @@ func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVol return nil, err } var locs []*filer_pb.Location - for _, loc := range fs.filer.MasterClient.GetLocations(uint32(vid)) { + locations, found := fs.filer.MasterClient.GetLocations(uint32(vid)) + if !found { + continue + } + for _, loc := range locations { locs = append(locs, &filer_pb.Location{ Url: loc.Url, PublicUrl: loc.PublicUrl, @@ -125,7 +142,7 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr }) if err == nil { - fs.filer.DeleteChunks(fullpath, garbages) + fs.filer.DeleteChunks(garbages) } return &filer_pb.CreateEntryResponse{}, err @@ -147,12 +164,14 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr newEntry := &filer2.Entry{ FullPath: filer2.FullPath(filepath.ToSlash(filepath.Join(req.Directory, req.Entry.Name))), Attr: entry.Attr, + Extended: req.Entry.Extended, Chunks: chunks, } - glog.V(3).Infof("updating %s: %+v, chunks %d: %v => %+v, chunks %d: %v", + glog.V(3).Infof("updating %s: %+v, chunks %d: %v => %+v, chunks %d: %v, extended: %v => %v", fullpath, entry.Attr, len(entry.Chunks), entry.Chunks, - req.Entry.Attributes, len(req.Entry.Chunks), req.Entry.Chunks) + req.Entry.Attributes, len(req.Entry.Chunks), req.Entry.Chunks, + entry.Extended, req.Entry.Extended) if req.Entry.Attributes != nil { if req.Entry.Attributes.Mtime != 0 { @@ -174,8 +193,8 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr } if err = fs.filer.UpdateEntry(ctx, entry, newEntry); err == nil { - fs.filer.DeleteChunks(entry.FullPath, unusedChunks) - fs.filer.DeleteChunks(entry.FullPath, garbages) + fs.filer.DeleteChunks(unusedChunks) + fs.filer.DeleteChunks(garbages) } fs.filer.NotifyUpdateEvent(entry, newEntry, true) @@ -184,7 +203,7 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr } func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntryRequest) (resp *filer_pb.DeleteEntryResponse, err error) { - err = fs.filer.DeleteEntryMetaAndData(ctx, filer2.FullPath(filepath.ToSlash(filepath.Join(req.Directory, req.Name))), req.IsRecursive, req.IsDeleteData) + err = fs.filer.DeleteEntryMetaAndData(ctx, filer2.FullPath(filepath.ToSlash(filepath.Join(req.Directory, req.Name))), req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData) return &filer_pb.DeleteEntryResponse{}, err } diff --git a/weed/server/filer_grpc_server_rename.go b/weed/server/filer_grpc_server_rename.go index 7142f7606..dfa59e7fe 100644 --- a/weed/server/filer_grpc_server_rename.go +++ b/weed/server/filer_grpc_server_rename.go @@ -73,11 +73,11 @@ func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent filer return err } - println("found", len(entries), "entries under", currentDirPath) + // println("found", len(entries), "entries under", currentDirPath) for _, item := range entries { lastFileName = item.Name() - println("processing", lastFileName) + // println("processing", lastFileName) err := fs.moveEntry(ctx, currentDirPath, item, newDirPath, item.Name(), events) if err != nil { return err @@ -113,7 +113,7 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent filer2.FullP } // delete old entry - deleteErr := fs.filer.DeleteEntryMetaAndData(ctx, oldPath, false, false) + deleteErr := fs.filer.DeleteEntryMetaAndData(ctx, oldPath, false, false, false) if deleteErr != nil { return deleteErr } diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index b9e6aa23d..41ba81366 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -15,12 +15,13 @@ import ( "github.com/chrislusf/seaweedfs/weed/filer2" _ "github.com/chrislusf/seaweedfs/weed/filer2/cassandra" + _ "github.com/chrislusf/seaweedfs/weed/filer2/etcd" _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb" _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb2" - _ "github.com/chrislusf/seaweedfs/weed/filer2/memdb" _ "github.com/chrislusf/seaweedfs/weed/filer2/mysql" _ "github.com/chrislusf/seaweedfs/weed/filer2/postgres" _ "github.com/chrislusf/seaweedfs/weed/filer2/redis" + _ "github.com/chrislusf/seaweedfs/weed/filer2/tikv" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/notification" _ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs" diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go index 0edf501a8..ba21298ba 100644 --- a/weed/server/filer_server_handlers_read.go +++ b/weed/server/filer_server_handlers_read.go @@ -32,10 +32,15 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, fs.listDirectoryHandler(w, r) return } - glog.V(1).Infof("Not found %s: %v", path, err) - - stats.FilerRequestCounter.WithLabelValues("read.notfound").Inc() - w.WriteHeader(http.StatusNotFound) + if err == filer2.ErrNotFound { + glog.V(1).Infof("Not found %s: %v", path, err) + stats.FilerRequestCounter.WithLabelValues("read.notfound").Inc() + w.WriteHeader(http.StatusNotFound) + } else { + glog.V(0).Infof("Internal %s: %v", path, err) + stats.FilerRequestCounter.WithLabelValues("read.internalerror").Inc() + w.WriteHeader(http.StatusInternalServerError) + } return } diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index 0bf1218ce..236e7027d 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -149,6 +149,16 @@ func (fs *FilerServer) updateFilerStore(ctx context.Context, r *http.Request, w stats.FilerRequestHistogram.WithLabelValues("postStoreWrite").Observe(time.Since(start).Seconds()) }() + modeStr := r.URL.Query().Get("mode") + if modeStr == "" { + modeStr = "0660" + } + mode, err := strconv.ParseUint(modeStr, 8, 32) + if err != nil { + glog.Errorf("Invalid mode format: %s, use 0660 by default", modeStr) + mode = 0660 + } + path := r.URL.Path if strings.HasSuffix(path, "/") { if ret.Name != "" { @@ -165,7 +175,7 @@ func (fs *FilerServer) updateFilerStore(ctx context.Context, r *http.Request, w Attr: filer2.Attr{ Mtime: time.Now(), Crtime: crTime, - Mode: 0660, + Mode: os.FileMode(mode), Uid: OS_UID, Gid: OS_GID, Replication: replication, @@ -184,7 +194,7 @@ func (fs *FilerServer) updateFilerStore(ctx context.Context, r *http.Request, w } // glog.V(4).Infof("saving %s => %+v", path, entry) if dbErr := fs.filer.CreateEntry(ctx, entry); dbErr != nil { - fs.filer.DeleteChunks(entry.FullPath, entry.Chunks) + fs.filer.DeleteChunks(entry.Chunks) glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr) writeJsonError(w, r, http.StatusInternalServerError, dbErr) err = dbErr @@ -269,14 +279,22 @@ func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth se // curl -X DELETE http://localhost:8888/path/to // curl -X DELETE http://localhost:8888/path/to?recursive=true +// curl -X DELETE http://localhost:8888/path/to?recursive=true&ignoreRecursiveError=true +// curl -X DELETE http://localhost:8888/path/to?recursive=true&skipChunkDeletion=true func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { isRecursive := r.FormValue("recursive") == "true" + ignoreRecursiveError := r.FormValue("ignoreRecursiveError") == "true" + skipChunkDeletion := r.FormValue("skipChunkDeletion") == "true" - err := fs.filer.DeleteEntryMetaAndData(context.Background(), filer2.FullPath(r.URL.Path), isRecursive, true) + err := fs.filer.DeleteEntryMetaAndData(context.Background(), filer2.FullPath(r.URL.Path), isRecursive, ignoreRecursiveError, !skipChunkDeletion) if err != nil { glog.V(1).Infoln("deleting", r.URL.Path, ":", err.Error()) - writeJsonError(w, r, http.StatusInternalServerError, err) + httpStatus := http.StatusInternalServerError + if err == filer2.ErrNotFound { + httpStatus = http.StatusNotFound + } + writeJsonError(w, r, httpStatus, err) return } diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index 492b55943..8ff7ab2c0 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -123,7 +123,7 @@ func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r // upload the chunk to the volume server chunkName := fileName + "_chunk_" + strconv.FormatInt(int64(len(fileChunks)+1), 10) - uploadErr := fs.doUpload(urlLocation, w, r, chunkBuf[0:chunkBufOffset], chunkName, "application/octet-stream", fileId, auth) + uploadErr := fs.doUpload(urlLocation, w, r, chunkBuf[0:chunkBufOffset], chunkName, "", fileId, auth) if uploadErr != nil { return nil, uploadErr } @@ -177,7 +177,7 @@ func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r Chunks: fileChunks, } if dbErr := fs.filer.CreateEntry(ctx, entry); dbErr != nil { - fs.filer.DeleteChunks(entry.FullPath, entry.Chunks) + fs.filer.DeleteChunks(entry.Chunks) replyerr = dbErr filerResult.Error = dbErr.Error() glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr) diff --git a/weed/server/filer_ui/breadcrumb.go b/weed/server/filer_ui/breadcrumb.go index 55a1909a8..2f0df7f91 100644 --- a/weed/server/filer_ui/breadcrumb.go +++ b/weed/server/filer_ui/breadcrumb.go @@ -14,10 +14,14 @@ func ToBreadcrumb(fullpath string) (crumbs []Breadcrumb) { parts := strings.Split(fullpath, "/") for i := 0; i < len(parts); i++ { - crumbs = append(crumbs, Breadcrumb{ - Name: parts[i] + "/", + crumb := Breadcrumb{ + Name: parts[i] + " /", Link: "/" + filepath.ToSlash(filepath.Join(parts[0:i+1]...)), - }) + } + if !strings.HasSuffix(crumb.Link, "/") { + crumb.Link += "/" + } + crumbs = append(crumbs, crumb) } return diff --git a/weed/server/filer_ui/templates.go b/weed/server/filer_ui/templates.go index 884798936..e532b27e2 100644 --- a/weed/server/filer_ui/templates.go +++ b/weed/server/filer_ui/templates.go @@ -50,7 +50,7 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC <div class="row"> <div> {{ range $entry := .Breadcrumbs }} - <a href={{ $entry.Link }} > + <a href="{{ $entry.Link }}" > {{ $entry.Name }} </a> {{ end }} @@ -78,20 +78,19 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC </a> {{end}} </td> - <td align="right"> + <td align="right" nowrap> {{if $entry.IsDirectory}} {{else}} - {{ $entry.Mime }} + {{ $entry.Mime }} {{end}} </td> - <td align="right"> + <td align="right" nowrap> {{if $entry.IsDirectory}} {{else}} - {{ $entry.Size | humanizeBytes }} - + {{ $entry.Size | humanizeBytes }} {{end}} </td> - <td> + <td nowrap> {{ $entry.Timestamp.Format "2006-01-02 15:04" }} </td> </tr> diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index e0d1fd174..fcfd98f7b 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -9,6 +9,7 @@ import ( "github.com/chrislusf/raft" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/storage/backend" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/topology" "google.golang.org/grpc/peer" @@ -49,6 +50,11 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ for { heartbeat, err := stream.Recv() if err != nil { + if dn != nil { + glog.Warningf("SendHeartbeat.Recv server %s:%d : %v", dn.Ip, dn.Port, err) + } else { + glog.Warningf("SendHeartbeat.Recv: %v", err) + } return err } @@ -71,8 +77,12 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ int64(heartbeat.MaxVolumeCount)) glog.V(0).Infof("added volume server %v:%d", heartbeat.GetIp(), heartbeat.GetPort()) if err := stream.Send(&master_pb.HeartbeatResponse{ - VolumeSizeLimit: uint64(ms.option.VolumeSizeLimitMB) * 1024 * 1024, + VolumeSizeLimit: uint64(ms.option.VolumeSizeLimitMB) * 1024 * 1024, + MetricsAddress: ms.option.MetricsAddress, + MetricsIntervalSeconds: uint32(ms.option.MetricsIntervalSec), + StorageBackends: backend.ToPbStorageBackends(), }); err != nil { + glog.Warningf("SendHeartbeat.Send volume size to %s:%d %v", dn.Ip, dn.Port, err) return err } } @@ -154,13 +164,13 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ // tell the volume servers about the leader newLeader, err := t.Leader() if err != nil { + glog.Warningf("SendHeartbeat find leader: %v", err) return err } if err := stream.Send(&master_pb.HeartbeatResponse{ - Leader: newLeader, - MetricsAddress: ms.option.MetricsAddress, - MetricsIntervalSeconds: uint32(ms.option.MetricsIntervalSec), + Leader: newLeader, }); err != nil { + glog.Warningf("SendHeartbeat.Send response to to %s:%d %v", dn.Ip, dn.Port, err) return err } } @@ -176,7 +186,7 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ } if !ms.Topo.IsLeader() { - return raft.NotLeaderError + return ms.informNewLeader(stream) } // remember client address @@ -236,7 +246,7 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ } case <-ticker.C: if !ms.Topo.IsLeader() { - return raft.NotLeaderError + return ms.informNewLeader(stream) } case <-stopChan: return nil @@ -245,3 +255,17 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ return nil } + +func (ms *MasterServer) informNewLeader(stream master_pb.Seaweed_KeepConnectedServer) error { + leader, err := ms.Topo.Leader() + if err != nil { + glog.Errorf("topo leader: %v", err) + return raft.NotLeaderError + } + if err := stream.Send(&master_pb.VolumeLocation{ + Leader: leader, + }); err != nil { + return err + } + return nil +} diff --git a/weed/server/master_grpc_server_collection.go b/weed/server/master_grpc_server_collection.go index a50cfa192..f8e0785f6 100644 --- a/weed/server/master_grpc_server_collection.go +++ b/weed/server/master_grpc_server_collection.go @@ -57,7 +57,7 @@ func (ms *MasterServer) doDeleteNormalCollection(collectionName string) error { } for _, server := range collection.ListVolumeServers() { - err := operation.WithVolumeServerClient(server.Url(), ms.grpcDialOpiton, func(client volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(server.Url(), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { _, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{ Collection: collectionName, }) @@ -77,7 +77,7 @@ func (ms *MasterServer) doDeleteEcCollection(collectionName string) error { listOfEcServers := ms.Topo.ListEcServersByCollection(collectionName) for _, server := range listOfEcServers { - err := operation.WithVolumeServerClient(server, ms.grpcDialOpiton, func(client volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(server, ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { _, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{ Collection: collectionName, }) diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go index 19064bcde..856c07890 100644 --- a/weed/server/master_grpc_server_volume.go +++ b/weed/server/master_grpc_server_volume.go @@ -5,10 +5,11 @@ import ( "fmt" "github.com/chrislusf/raft" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" "github.com/chrislusf/seaweedfs/weed/topology" ) @@ -52,7 +53,7 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest if req.Replication == "" { req.Replication = ms.option.DefaultReplicaPlacement } - replicaPlacement, err := storage.NewReplicaPlacementFromString(req.Replication) + replicaPlacement, err := super_block.NewReplicaPlacementFromString(req.Replication) if err != nil { return nil, err } @@ -62,13 +63,14 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest } option := &topology.VolumeGrowOption{ - Collection: req.Collection, - ReplicaPlacement: replicaPlacement, - Ttl: ttl, - Prealloacte: ms.preallocateSize, - DataCenter: req.DataCenter, - Rack: req.Rack, - DataNode: req.DataNode, + Collection: req.Collection, + ReplicaPlacement: replicaPlacement, + Ttl: ttl, + Prealloacte: ms.preallocateSize, + DataCenter: req.DataCenter, + Rack: req.Rack, + DataNode: req.DataNode, + MemoryMapMaxSizeMb: req.MemoryMapMaxSizeMb, } if !ms.Topo.HasWritableVolume(option) { @@ -77,7 +79,7 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest } ms.vgLock.Lock() if !ms.Topo.HasWritableVolume(option) { - if _, err = ms.vg.AutomaticGrowByType(option, ms.grpcDialOpiton, ms.Topo); err != nil { + if _, err = ms.vg.AutomaticGrowByType(option, ms.grpcDialOption, ms.Topo, int(req.WritableVolumeCount)); err != nil { ms.vgLock.Unlock() return nil, fmt.Errorf("Cannot grow volume group! %v", err) } @@ -107,7 +109,7 @@ func (ms *MasterServer) Statistics(ctx context.Context, req *master_pb.Statistic if req.Replication == "" { req.Replication = ms.option.DefaultReplicaPlacement } - replicaPlacement, err := storage.NewReplicaPlacementFromString(req.Replication) + replicaPlacement, err := super_block.NewReplicaPlacementFromString(req.Replication) if err != nil { return nil, err } diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 3689b5495..33a5129da 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -1,9 +1,8 @@ package weed_server import ( + "context" "fmt" - "github.com/chrislusf/seaweedfs/weed/shell" - "google.golang.org/grpc" "net/http" "net/http/httputil" "net/url" @@ -19,10 +18,18 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/sequence" + "github.com/chrislusf/seaweedfs/weed/shell" "github.com/chrislusf/seaweedfs/weed/topology" "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/wdclient" "github.com/gorilla/mux" "github.com/spf13/viper" + "google.golang.org/grpc" +) + +const ( + SequencerType = "master.sequencer.type" + SequencerEtcdUrls = "master.sequencer.sequencer_etcd_urls" ) type MasterOption struct { @@ -55,10 +62,12 @@ type MasterServer struct { clientChansLock sync.RWMutex clientChans map[string]chan *master_pb.VolumeLocation - grpcDialOpiton grpc.DialOption + grpcDialOption grpc.DialOption + + MasterClient *wdclient.MasterClient } -func NewMasterServer(r *mux.Router, option *MasterOption) *MasterServer { +func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *MasterServer { v := viper.GetViper() signingKey := v.GetString("jwt.signing.key") @@ -73,14 +82,21 @@ func NewMasterServer(r *mux.Router, option *MasterOption) *MasterServer { if option.VolumePreallocate { preallocateSize = int64(option.VolumeSizeLimitMB) * (1 << 20) } + + grpcDialOption := security.LoadClientTLS(v.Sub("grpc"), "master") ms := &MasterServer{ option: option, preallocateSize: preallocateSize, clientChans: make(map[string]chan *master_pb.VolumeLocation), - grpcDialOpiton: security.LoadClientTLS(v.Sub("grpc"), "master"), + grpcDialOption: grpcDialOption, + MasterClient: wdclient.NewMasterClient(context.Background(), grpcDialOption, "master", peers), } ms.bounedLeaderChan = make(chan int, 16) - seq := sequence.NewMemorySequencer() + + seq := ms.createSequencer(option) + if nil == seq { + glog.Fatalf("create sequencer failed.") + } ms.Topo = topology.NewTopology("topo", seq, uint64(ms.option.VolumeSizeLimitMB)*1024*1024, ms.option.PulseSeconds) ms.vg = topology.NewDefaultVolumeGrowth() glog.V(0).Infoln("Volume Size Limit is", ms.option.VolumeSizeLimitMB, "MB") @@ -92,7 +108,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption) *MasterServer { r.HandleFunc("/", ms.proxyToLeader(ms.uiStatusHandler)) r.HandleFunc("/ui/index.html", ms.uiStatusHandler) r.HandleFunc("/dir/assign", ms.proxyToLeader(ms.guard.WhiteList(ms.dirAssignHandler))) - r.HandleFunc("/dir/lookup", ms.proxyToLeader(ms.guard.WhiteList(ms.dirLookupHandler))) + r.HandleFunc("/dir/lookup", ms.guard.WhiteList(ms.dirLookupHandler)) r.HandleFunc("/dir/status", ms.proxyToLeader(ms.guard.WhiteList(ms.dirStatusHandler))) r.HandleFunc("/col/delete", ms.proxyToLeader(ms.guard.WhiteList(ms.collectionDeleteHandler))) r.HandleFunc("/vol/grow", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeGrowHandler))) @@ -102,10 +118,10 @@ func NewMasterServer(r *mux.Router, option *MasterOption) *MasterServer { r.HandleFunc("/stats/health", ms.guard.WhiteList(statsHealthHandler)) r.HandleFunc("/stats/counter", ms.guard.WhiteList(statsCounterHandler)) r.HandleFunc("/stats/memory", ms.guard.WhiteList(statsMemoryHandler)) - r.HandleFunc("/{fileId}", ms.proxyToLeader(ms.redirectHandler)) + r.HandleFunc("/{fileId}", ms.redirectHandler) } - ms.Topo.StartRefreshWritableVolumes(ms.grpcDialOpiton, ms.option.GarbageThreshold, ms.preallocateSize) + ms.Topo.StartRefreshWritableVolumes(ms.grpcDialOption, ms.option.GarbageThreshold, ms.preallocateSize) ms.startAdminScripts() @@ -158,23 +174,28 @@ func (ms *MasterServer) proxyToLeader(f func(w http.ResponseWriter, r *http.Requ proxy.Transport = util.Transport proxy.ServeHTTP(w, r) } else { - //drop it to the floor - //writeJsonError(w, r, errors.New(ms.Topo.RaftServer.Name()+" does not know Leader yet:"+ms.Topo.RaftServer.Leader())) + // drop it to the floor + // writeJsonError(w, r, errors.New(ms.Topo.RaftServer.Name()+" does not know Leader yet:"+ms.Topo.RaftServer.Leader())) } } } func (ms *MasterServer) startAdminScripts() { + var err error + v := viper.GetViper() adminScripts := v.GetString("master.maintenance.scripts") - v.SetDefault("master.maintenance.sleep_minutes", 17) - sleepMinutes := v.GetInt("master.maintenance.sleep_minutes") - glog.V(0).Infof("adminScripts:\n%v", adminScripts) if adminScripts == "" { return } + v.SetDefault("master.maintenance.sleep_minutes", 17) + sleepMinutes := v.GetInt("master.maintenance.sleep_minutes") + + v.SetDefault("master.filer.default_filer_url", "http://localhost:8888/") + filerURL := v.GetString("master.filer.default_filer_url") + scriptLines := strings.Split(adminScripts, "\n") masterAddress := "localhost:" + strconv.Itoa(ms.option.Port) @@ -182,9 +203,12 @@ func (ms *MasterServer) startAdminScripts() { var shellOptions shell.ShellOptions shellOptions.GrpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "master") shellOptions.Masters = &masterAddress - shellOptions.FilerHost = "localhost" - shellOptions.FilerPort = 8888 - shellOptions.Directory = "/" + + shellOptions.FilerHost, shellOptions.FilerPort, shellOptions.Directory, err = util.ParseFilerUrl(filerURL) + if err != nil { + glog.V(0).Infof("failed to parse master.filer.default_filer_urll=%s : %v\n", filerURL, err) + return + } commandEnv := shell.NewCommandEnv(shellOptions) @@ -223,3 +247,24 @@ func (ms *MasterServer) startAdminScripts() { } }() } + +func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer { + var seq sequence.Sequencer + v := viper.GetViper() + seqType := strings.ToLower(v.GetString(SequencerType)) + glog.V(1).Infof("[%s] : [%s]", SequencerType, seqType) + switch strings.ToLower(seqType) { + case "etcd": + var err error + urls := v.GetString(SequencerEtcdUrls) + glog.V(0).Infof("[%s] : [%s]", SequencerEtcdUrls, urls) + seq, err = sequence.NewEtcdSequencer(urls, option.MetaFolder) + if err != nil { + glog.Error(err) + seq = nil + } + default: + seq = sequence.NewMemorySequencer() + } + return seq +} diff --git a/weed/server/master_server_handlers.go b/weed/server/master_server_handlers.go index 5c7ff41cf..514d86800 100644 --- a/weed/server/master_server_handlers.go +++ b/weed/server/master_server_handlers.go @@ -22,21 +22,7 @@ func (ms *MasterServer) lookupVolumeId(vids []string, collection string) (volume if _, ok := volumeLocations[vid]; ok { continue } - volumeId, err := needle.NewVolumeId(vid) - if err == nil { - machines := ms.Topo.Lookup(collection, volumeId) - if machines != nil { - var ret []operation.Location - for _, dn := range machines { - ret = append(ret, operation.Location{Url: dn.Url(), PublicUrl: dn.PublicUrl}) - } - volumeLocations[vid] = operation.LookupResult{VolumeId: vid, Locations: ret} - } else { - volumeLocations[vid] = operation.LookupResult{VolumeId: vid, Error: fmt.Sprintf("volumeId %s not found.", vid)} - } - } else { - volumeLocations[vid] = operation.LookupResult{VolumeId: vid, Error: fmt.Sprintf("Unknown volumeId format: %s", vid)} - } + volumeLocations[vid] = ms.findVolumeLocation(collection, vid) } return } @@ -59,12 +45,10 @@ func (ms *MasterServer) dirLookupHandler(w http.ResponseWriter, r *http.Request) vid = fileId[0:commaSep] } } - vids := []string{vid} collection := r.FormValue("collection") //optional, but can be faster if too many collections - volumeLocations := ms.lookupVolumeId(vids, collection) - location := volumeLocations[vid] + location := ms.findVolumeLocation(collection, vid) httpStatus := http.StatusOK - if location.Error != "" { + if location.Error != "" || location.Locations == nil { httpStatus = http.StatusNotFound } else { forRead := r.FormValue("read") @@ -74,6 +58,41 @@ func (ms *MasterServer) dirLookupHandler(w http.ResponseWriter, r *http.Request) writeJsonQuiet(w, r, httpStatus, location) } +// findVolumeLocation finds the volume location from master topo if it is leader, +// or from master client if not leader +func (ms *MasterServer) findVolumeLocation(collection, vid string) operation.LookupResult { + var locations []operation.Location + var err error + if ms.Topo.IsLeader() { + volumeId, newVolumeIdErr := needle.NewVolumeId(vid) + if newVolumeIdErr != nil { + err = fmt.Errorf("Unknown volume id %s", vid) + } else { + machines := ms.Topo.Lookup(collection, volumeId) + for _, loc := range machines { + locations = append(locations, operation.Location{Url: loc.Url(), PublicUrl: loc.PublicUrl}) + } + if locations == nil { + err = fmt.Errorf("volume id %s not found", vid) + } + } + } else { + machines, getVidLocationsErr := ms.MasterClient.GetVidLocations(vid) + for _, loc := range machines { + locations = append(locations, operation.Location{Url: loc.Url, PublicUrl: loc.PublicUrl}) + } + err = getVidLocationsErr + } + ret := operation.LookupResult{ + VolumeId: vid, + Locations: locations, + } + if err != nil { + ret.Error = err.Error() + } + return ret +} + func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request) { stats.AssignRequest() requestedCount, e := strconv.ParseUint(r.FormValue("count"), 10, 64) @@ -81,6 +100,11 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request) requestedCount = 1 } + writableVolumeCount, e := strconv.Atoi(r.FormValue("writableVolumeCount")) + if e != nil { + writableVolumeCount = 0 + } + option, err := ms.getVolumeGrowOption(r) if err != nil { writeJsonQuiet(w, r, http.StatusNotAcceptable, operation.AssignResult{Error: err.Error()}) @@ -95,7 +119,7 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request) ms.vgLock.Lock() defer ms.vgLock.Unlock() if !ms.Topo.HasWritableVolume(option) { - if _, err = ms.vg.AutomaticGrowByType(option, ms.grpcDialOpiton, ms.Topo); err != nil { + if _, err = ms.vg.AutomaticGrowByType(option, ms.grpcDialOption, ms.Topo, writableVolumeCount); err != nil { writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("Cannot grow volume group! %v", err)) return diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go index 343bcb8da..2965a4863 100644 --- a/weed/server/master_server_handlers_admin.go +++ b/weed/server/master_server_handlers_admin.go @@ -2,7 +2,6 @@ package weed_server import ( "context" - "errors" "fmt" "math/rand" "net/http" @@ -11,20 +10,22 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" - "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/backend/memory_map" "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" "github.com/chrislusf/seaweedfs/weed/topology" "github.com/chrislusf/seaweedfs/weed/util" ) func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.Request) { - collection, ok := ms.Topo.FindCollection(r.FormValue("collection")) + collectionName := r.FormValue("collection") + collection, ok := ms.Topo.FindCollection(collectionName) if !ok { - writeJsonError(w, r, http.StatusBadRequest, fmt.Errorf("collection %s does not exist", r.FormValue("collection"))) + writeJsonError(w, r, http.StatusBadRequest, fmt.Errorf("collection %s does not exist", collectionName)) return } for _, server := range collection.ListVolumeServers() { - err := operation.WithVolumeServerClient(server.Url(), ms.grpcDialOpiton, func(client volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(server.Url(), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { _, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{ Collection: collection.Name, }) @@ -35,7 +36,10 @@ func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.R return } } - ms.Topo.DeleteCollection(r.FormValue("collection")) + ms.Topo.DeleteCollection(collectionName) + + w.WriteHeader(http.StatusNoContent) + return } func (ms *MasterServer) dirStatusHandler(w http.ResponseWriter, r *http.Request) { @@ -53,11 +57,12 @@ func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Reque gcThreshold, err = strconv.ParseFloat(gcString, 32) if err != nil { glog.V(0).Infof("garbageThreshold %s is not a valid float number: %v", gcString, err) + writeJsonError(w, r, http.StatusNotAcceptable, fmt.Errorf("garbageThreshold %s is not a valid float number", gcString)) return } } glog.Infoln("garbageThreshold =", gcThreshold) - ms.Topo.Vacuum(ms.grpcDialOpiton, gcThreshold, ms.preallocateSize) + ms.Topo.Vacuum(ms.grpcDialOption, gcThreshold, ms.preallocateSize) ms.dirStatusHandler(w, r) } @@ -68,17 +73,17 @@ func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request writeJsonError(w, r, http.StatusNotAcceptable, err) return } - if err == nil { - if count, err = strconv.Atoi(r.FormValue("count")); err == nil { - if ms.Topo.FreeSpace() < int64(count*option.ReplicaPlacement.GetCopyCount()) { - err = fmt.Errorf("only %d volumes left, not enough for %d", ms.Topo.FreeSpace(), count*option.ReplicaPlacement.GetCopyCount()) - } else { - count, err = ms.vg.GrowByCountAndType(ms.grpcDialOpiton, count, option, ms.Topo) - } + + if count, err = strconv.Atoi(r.FormValue("count")); err == nil { + if ms.Topo.FreeSpace() < int64(count*option.ReplicaPlacement.GetCopyCount()) { + err = fmt.Errorf("only %d volumes left, not enough for %d", ms.Topo.FreeSpace(), count*option.ReplicaPlacement.GetCopyCount()) } else { - err = errors.New("parameter count is not found") + count, err = ms.vg.GrowByCountAndType(ms.grpcDialOption, count, option, ms.Topo) } + } else { + err = fmt.Errorf("can not parse parameter count %s", r.FormValue("count")) } + if err != nil { writeJsonError(w, r, http.StatusNotAcceptable, err) } else { @@ -95,23 +100,19 @@ func (ms *MasterServer) volumeStatusHandler(w http.ResponseWriter, r *http.Reque func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request) { vid, _, _, _, _ := parseURLPath(r.URL.Path) - volumeId, err := needle.NewVolumeId(vid) - if err != nil { - debug("parsing error:", err, r.URL.Path) - return - } collection := r.FormValue("collection") - machines := ms.Topo.Lookup(collection, volumeId) - if machines != nil && len(machines) > 0 { + location := ms.findVolumeLocation(collection, vid) + if location.Error == "" { + loc := location.Locations[rand.Intn(len(location.Locations))] var url string if r.URL.RawQuery != "" { - url = util.NormalizeUrl(machines[rand.Intn(len(machines))].PublicUrl) + r.URL.Path + "?" + r.URL.RawQuery + url = util.NormalizeUrl(loc.PublicUrl) + r.URL.Path + "?" + r.URL.RawQuery } else { - url = util.NormalizeUrl(machines[rand.Intn(len(machines))].PublicUrl) + r.URL.Path + url = util.NormalizeUrl(loc.PublicUrl) + r.URL.Path } http.Redirect(w, r, url, http.StatusMovedPermanently) } else { - writeJsonError(w, r, http.StatusNotFound, fmt.Errorf("volume id %d or collection %s not found", volumeId, collection)) + writeJsonError(w, r, http.StatusNotFound, fmt.Errorf("volume id %s not found: %s", vid, location.Error)) } } @@ -123,13 +124,13 @@ func (ms *MasterServer) selfUrl(r *http.Request) string { } func (ms *MasterServer) submitFromMasterServerHandler(w http.ResponseWriter, r *http.Request) { if ms.Topo.IsLeader() { - submitForClientHandler(w, r, ms.selfUrl(r), ms.grpcDialOpiton) + submitForClientHandler(w, r, ms.selfUrl(r), ms.grpcDialOption) } else { masterUrl, err := ms.Topo.Leader() if err != nil { writeJsonError(w, r, http.StatusInternalServerError, err) } else { - submitForClientHandler(w, r, masterUrl, ms.grpcDialOpiton) + submitForClientHandler(w, r, masterUrl, ms.grpcDialOption) } } } @@ -144,7 +145,7 @@ func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGr if replicationString == "" { replicationString = ms.option.DefaultReplicaPlacement } - replicaPlacement, err := storage.NewReplicaPlacementFromString(replicationString) + replicaPlacement, err := super_block.NewReplicaPlacementFromString(replicationString) if err != nil { return nil, err } @@ -152,6 +153,11 @@ func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGr if err != nil { return nil, err } + memoryMapMaxSizeMb, err := memory_map.ReadMemoryMapMaxSizeMb(r.FormValue("memoryMapMaxSizeMb")) + if err != nil { + return nil, err + } + preallocate := ms.preallocateSize if r.FormValue("preallocate") != "" { preallocate, err = strconv.ParseInt(r.FormValue("preallocate"), 10, 64) @@ -160,13 +166,14 @@ func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGr } } volumeGrowOption := &topology.VolumeGrowOption{ - Collection: r.FormValue("collection"), - ReplicaPlacement: replicaPlacement, - Ttl: ttl, - Prealloacte: preallocate, - DataCenter: r.FormValue("dataCenter"), - Rack: r.FormValue("rack"), - DataNode: r.FormValue("dataNode"), + Collection: r.FormValue("collection"), + ReplicaPlacement: replicaPlacement, + Ttl: ttl, + Prealloacte: preallocate, + DataCenter: r.FormValue("dataCenter"), + Rack: r.FormValue("rack"), + DataNode: r.FormValue("dataNode"), + MemoryMapMaxSizeMb: memoryMapMaxSizeMb, } return volumeGrowOption, nil } diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go index 88320ed98..53289f1c1 100644 --- a/weed/server/raft_server.go +++ b/weed/server/raft_server.go @@ -25,7 +25,7 @@ type RaftServer struct { *raft.GrpcServer } -func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr string, dataDir string, topo *topology.Topology, pulseSeconds int) *RaftServer { +func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, dataDir string, topo *topology.Topology, pulseSeconds int) *RaftServer { s := &RaftServer{ peers: peers, serverAddr: serverAddr, diff --git a/weed/server/volume_grpc_admin.go b/weed/server/volume_grpc_admin.go index 35c2508a6..c631d2535 100644 --- a/weed/server/volume_grpc_admin.go +++ b/weed/server/volume_grpc_admin.go @@ -35,6 +35,7 @@ func (vs *VolumeServer) AllocateVolume(ctx context.Context, req *volume_server_p req.Replication, req.Ttl, req.Preallocate, + req.MemoryMapMaxSizeMb, ) if err != nil { diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go index 731675b48..6038752d2 100644 --- a/weed/server/volume_grpc_client_to_master.go +++ b/weed/server/volume_grpc_client_to_master.go @@ -6,6 +6,7 @@ import ( "time" "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/storage/backend" "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" "github.com/spf13/viper" "google.golang.org/grpc" @@ -90,6 +91,9 @@ func (vs *VolumeServer) doHeartbeat(ctx context.Context, masterNode, masterGrpcA vs.MetricsAddress = in.GetMetricsAddress() vs.MetricsIntervalSec = int(in.GetMetricsIntervalSeconds()) } + if len(in.StorageBackends) > 0 { + backend.LoadFromPbStorageBackends(in.StorageBackends) + } } }() diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go index 8b39146ee..a54a1e343 100644 --- a/weed/server/volume_grpc_copy.go +++ b/weed/server/volume_grpc_copy.go @@ -55,11 +55,11 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo // println("source:", volFileInfoResp.String()) // copy ecx file - if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, volumeFileName, ".idx", false); err != nil { + if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, volumeFileName, ".idx", false, false); err != nil { return err } - if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".dat", false); err != nil { + if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".dat", false, true); err != nil { return err } @@ -95,15 +95,16 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo } func (vs *VolumeServer) doCopyFile(ctx context.Context, client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid uint32, - compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend bool) error { + compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend bool, ignoreSourceFileNotFound bool) error { copyFileClient, err := client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{ - VolumeId: vid, - Ext: ext, - CompactionRevision: compactRevision, - StopOffset: stopOffset, - Collection: collection, - IsEcVolume: isEcVolume, + VolumeId: vid, + Ext: ext, + CompactionRevision: compactRevision, + StopOffset: stopOffset, + Collection: collection, + IsEcVolume: isEcVolume, + IgnoreSourceFileNotFound: ignoreSourceFileNotFound, }) if err != nil { return fmt.Errorf("failed to start copying volume %d %s file: %v", vid, ext, err) @@ -213,6 +214,9 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v } } if fileName == "" { + if req.IgnoreSourceFileNotFound { + return nil + } return fmt.Errorf("CopyFile not found ec volume id %d", req.VolumeId) } } @@ -221,6 +225,9 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v file, err := os.Open(fileName) if err != nil { + if req.IgnoreSourceFileNotFound && err == os.ErrNotExist { + return nil + } return err } defer file.Close() @@ -257,7 +264,3 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v return nil } - -func (vs *VolumeServer) findVolumeOrEcVolumeLocation(volumeId needle.VolumeId) { - -} diff --git a/weed/server/volume_grpc_copy_incremental.go b/weed/server/volume_grpc_copy_incremental.go index f56fbeef4..6d6c3daa3 100644 --- a/weed/server/volume_grpc_copy_incremental.go +++ b/weed/server/volume_grpc_copy_incremental.go @@ -4,9 +4,9 @@ import ( "context" "fmt" "io" - "os" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/storage/backend" "github.com/chrislusf/seaweedfs/weed/storage/needle" ) @@ -30,7 +30,7 @@ func (vs *VolumeServer) VolumeIncrementalCopy(req *volume_server_pb.VolumeIncrem startOffset := foundOffset.ToAcutalOffset() buf := make([]byte, 1024*1024*2) - return sendFileContent(v.DataFile(), buf, startOffset, int64(stopOffset), stream) + return sendFileContent(v.DataBackend, buf, startOffset, int64(stopOffset), stream) } @@ -47,10 +47,10 @@ func (vs *VolumeServer) VolumeSyncStatus(ctx context.Context, req *volume_server } -func sendFileContent(datFile *os.File, buf []byte, startOffset, stopOffset int64, stream volume_server_pb.VolumeServer_VolumeIncrementalCopyServer) error { +func sendFileContent(datBackend backend.BackendStorageFile, buf []byte, startOffset, stopOffset int64, stream volume_server_pb.VolumeServer_VolumeIncrementalCopyServer) error { var blockSizeLimit = int64(len(buf)) for i := int64(0); i < stopOffset-startOffset; i += blockSizeLimit { - n, readErr := datFile.ReadAt(buf, startOffset+i) + n, readErr := datBackend.ReadAt(buf, startOffset+i) if readErr == nil || readErr == io.EOF { resp := &volume_server_pb.VolumeIncrementalCopyResponse{} resp.FileContent = buf[:int64(n)] diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go index 8140a06f6..4bca9948e 100644 --- a/weed/server/volume_grpc_erasure_coding.go +++ b/weed/server/volume_grpc_erasure_coding.go @@ -8,10 +8,12 @@ import ( "math" "os" "path" + "path/filepath" "strings" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" @@ -24,7 +26,7 @@ import ( Steps to apply erasure coding to .dat .idx files 0. ensure the volume is readonly -1. client call VolumeEcShardsGenerate to generate the .ecx and .ec01~.ec14 files +1. client call VolumeEcShardsGenerate to generate the .ecx and .ec00 ~ .ec13 files 2. client ask master for possible servers to hold the ec files, at least 4 servers 3. client call VolumeEcShardsCopy on above target servers to copy ec files from the source server 4. target servers report the new ec files to the master @@ -33,7 +35,7 @@ Steps to apply erasure coding to .dat .idx files */ -// VolumeEcShardsGenerate generates the .ecx and .ec01 ~ .ec14 files +// VolumeEcShardsGenerate generates the .ecx and .ec00 ~ .ec13 files func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_server_pb.VolumeEcShardsGenerateRequest) (*volume_server_pb.VolumeEcShardsGenerateResponse, error) { v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) @@ -47,19 +49,24 @@ func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_ } // write .ecx file - if err := erasure_coding.WriteSortedEcxFile(baseFileName); err != nil { - return nil, fmt.Errorf("WriteSortedEcxFile %s: %v", baseFileName, err) + if err := erasure_coding.WriteSortedFileFromIdx(baseFileName, ".ecx"); err != nil { + return nil, fmt.Errorf("WriteSortedFileFromIdx %s: %v", baseFileName, err) } - // write .ec01 ~ .ec14 files + // write .ec00 ~ .ec13 files if err := erasure_coding.WriteEcFiles(baseFileName); err != nil { return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err) } + // write .vif files + if err := pb.SaveVolumeInfo(baseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(v.Version())}); err != nil { + return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err) + } + return &volume_server_pb.VolumeEcShardsGenerateResponse{}, nil } -// VolumeEcShardsRebuild generates the any of the missing .ec01 ~ .ec14 files +// VolumeEcShardsRebuild generates the any of the missing .ec00 ~ .ec13 files func (vs *VolumeServer) VolumeEcShardsRebuild(ctx context.Context, req *volume_server_pb.VolumeEcShardsRebuildRequest) (*volume_server_pb.VolumeEcShardsRebuildResponse, error) { baseFileName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId)) @@ -68,7 +75,7 @@ func (vs *VolumeServer) VolumeEcShardsRebuild(ctx context.Context, req *volume_s for _, location := range vs.store.Locations { if util.FileExists(path.Join(location.Directory, baseFileName+".ecx")) { - // write .ec01 ~ .ec14 files + // write .ec00 ~ .ec13 files baseFileName = path.Join(location.Directory, baseFileName) if generatedShardIds, err := erasure_coding.RebuildEcFiles(baseFileName); err != nil { return nil, fmt.Errorf("RebuildEcFiles %s: %v", baseFileName, err) @@ -103,23 +110,32 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv // copy ec data slices for _, shardId := range req.ShardIds { - if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, erasure_coding.ToExt(int(shardId)), false); err != nil { + if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, erasure_coding.ToExt(int(shardId)), false, false); err != nil { return err } } - if !req.CopyEcxFile { + if req.CopyEcxFile { + + // copy ecx file + if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecx", false, false); err != nil { + return err + } return nil } - // copy ecx file - if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecx", false); err != nil { - return err + if req.CopyEcjFile { + // copy ecj file + if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecj", true, true); err != nil { + return err + } } - // copy ecj file - if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecj", true); err != nil { - return err + if req.CopyVifFile { + // copy vif file + if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".vif", false, true); err != nil { + return err + } } return nil @@ -137,6 +153,8 @@ func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_se baseFilename := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId)) + glog.V(0).Infof("ec volume %d shard delete %v", req.VolumeId, req.ShardIds) + found := false for _, location := range vs.store.Locations { if util.FileExists(path.Join(location.Directory, baseFilename+".ecx")) { @@ -153,21 +171,22 @@ func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_se return nil, nil } - // check whether to delete the ecx file also + // check whether to delete the .ecx and .ecj file also hasEcxFile := false existingShardCount := 0 + bName := filepath.Base(baseFilename) for _, location := range vs.store.Locations { fileInfos, err := ioutil.ReadDir(location.Directory) if err != nil { continue } for _, fileInfo := range fileInfos { - if fileInfo.Name() == baseFilename+".ecx" { + if fileInfo.Name() == bName+".ecx" || fileInfo.Name() == bName+".ecj" { hasEcxFile = true continue } - if strings.HasPrefix(fileInfo.Name(), baseFilename+".ec") { + if strings.HasPrefix(fileInfo.Name(), bName+".ec") { existingShardCount++ } } @@ -252,9 +271,14 @@ func (vs *VolumeServer) VolumeEcShardRead(req *volume_server_pb.VolumeEcShardRea startOffset, bytesToRead := req.Offset, req.Size for bytesToRead > 0 { - bytesread, err := ecShard.ReadAt(buffer, startOffset) + // min of bytesToRead and bufSize + bufferSize := bufSize + if bufferSize > bytesToRead { + bufferSize = bytesToRead + } + bytesread, err := ecShard.ReadAt(buffer[0:bufferSize], startOffset) - // println(fileName, "read", bytesread, "bytes, with target", bytesToRead) + // println("read", ecShard.FileName(), "startOffset", startOffset, bytesread, "bytes, with target", bufferSize) if bytesread > 0 { if int64(bytesread) > bytesToRead { @@ -268,6 +292,7 @@ func (vs *VolumeServer) VolumeEcShardRead(req *volume_server_pb.VolumeEcShardRea return err } + startOffset += int64(bytesread) bytesToRead -= int64(bytesread) } @@ -311,3 +336,35 @@ func (vs *VolumeServer) VolumeEcBlobDelete(ctx context.Context, req *volume_serv return resp, nil } + +// VolumeEcShardsToVolume generates the .idx, .dat files from .ecx, .ecj and .ec01 ~ .ec14 files +func (vs *VolumeServer) VolumeEcShardsToVolume(ctx context.Context, req *volume_server_pb.VolumeEcShardsToVolumeRequest) (*volume_server_pb.VolumeEcShardsToVolumeResponse, error) { + + v, found := vs.store.FindEcVolume(needle.VolumeId(req.VolumeId)) + if !found { + return nil, fmt.Errorf("ec volume %d not found", req.VolumeId) + } + baseFileName := v.FileName() + + if v.Collection != req.Collection { + return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection) + } + + // calculate .dat file size + datFileSize, err := erasure_coding.FindDatFileSize(baseFileName) + if err != nil { + return nil, fmt.Errorf("FindDatFileSize %s: %v", baseFileName, err) + } + + // write .dat file from .ec00 ~ .ec09 files + if err := erasure_coding.WriteDatFile(baseFileName, datFileSize); err != nil { + return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err) + } + + // write .idx file from .ecx and .ecj files + if err := erasure_coding.WriteIdxFileFromEcIndex(baseFileName); err != nil { + return nil, fmt.Errorf("WriteIdxFileFromEcIndex %s: %v", baseFileName, err) + } + + return &volume_server_pb.VolumeEcShardsToVolumeResponse{}, nil +} diff --git a/weed/server/volume_grpc_query.go b/weed/server/volume_grpc_query.go new file mode 100644 index 000000000..767e28e7b --- /dev/null +++ b/weed/server/volume_grpc_query.go @@ -0,0 +1,69 @@ +package weed_server + +import ( + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/query/json" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/tidwall/gjson" +) + +func (vs *VolumeServer) Query(req *volume_server_pb.QueryRequest, stream volume_server_pb.VolumeServer_QueryServer) error { + + for _, fid := range req.FromFileIds { + + vid, id_cookie, err := operation.ParseFileId(fid) + if err != nil { + glog.V(0).Infof("volume query failed to parse fid %s: %v", fid, err) + return err + } + + n := new(needle.Needle) + volumeId, _ := needle.NewVolumeId(vid) + n.ParsePath(id_cookie) + + cookie := n.Cookie + if _, err := vs.store.ReadVolumeNeedle(volumeId, n); err != nil { + glog.V(0).Infof("volume query failed to read fid %s: %v", fid, err) + return err + } + + if n.Cookie != cookie { + glog.V(0).Infof("volume query failed to read fid cookie %s: %v", fid, err) + return err + } + + if req.InputSerialization.CsvInput != nil { + + } + + if req.InputSerialization.JsonInput != nil { + + stripe := &volume_server_pb.QueriedStripe{ + Records: nil, + } + + filter := json.Query{ + Field: req.Filter.Field, + Op: req.Filter.Operand, + Value: req.Filter.Value, + } + gjson.ForEachLine(string(n.Data), func(line gjson.Result) bool { + passedFilter, values := json.QueryJson(line.Raw, req.Selections, filter) + if !passedFilter { + return true + } + stripe.Records = json.ToJson(stripe.Records, req.Selections, values) + return true + }) + err = stream.Send(stripe) + if err != nil { + return err + } + } + + } + + return nil +} diff --git a/weed/server/volume_grpc_tail.go b/weed/server/volume_grpc_tail.go index 34c55a599..c26d6ed8f 100644 --- a/weed/server/volume_grpc_tail.go +++ b/weed/server/volume_grpc_tail.go @@ -10,6 +10,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" ) func (vs *VolumeServer) VolumeTailSender(req *volume_server_pb.VolumeTailSenderRequest, stream volume_server_pb.VolumeServer_VolumeTailSenderServer) error { @@ -67,34 +68,13 @@ func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeTailSenderServe return lastTimestampNs, sendErr } - err = storage.ScanVolumeFileNeedleFrom(v.Version(), v.DataFile(), foundOffset.ToAcutalOffset(), func(needleHeader, needleBody []byte, needleAppendAtNs uint64) error { - - isLastChunk := false - - // need to send body by chunks - for i := 0; i < len(needleBody); i += BufferSizeLimit { - stopOffset := i + BufferSizeLimit - if stopOffset >= len(needleBody) { - isLastChunk = true - stopOffset = len(needleBody) - } - - sendErr := stream.Send(&volume_server_pb.VolumeTailSenderResponse{ - NeedleHeader: needleHeader, - NeedleBody: needleBody[i:stopOffset], - IsLastChunk: isLastChunk, - }) - if sendErr != nil { - return sendErr - } - } - - lastProcessedTimestampNs = needleAppendAtNs - return nil + scanner := &VolumeFileScanner4Tailing{ + stream: stream, + } - }) + err = storage.ScanVolumeFileFrom(v.Version(), v.DataBackend, foundOffset.ToAcutalOffset(), scanner) - return + return scanner.lastProcessedTimestampNs, err } @@ -115,3 +95,42 @@ func (vs *VolumeServer) VolumeTailReceiver(ctx context.Context, req *volume_serv }) } + +// generate the volume idx +type VolumeFileScanner4Tailing struct { + stream volume_server_pb.VolumeServer_VolumeTailSenderServer + lastProcessedTimestampNs uint64 +} + +func (scanner *VolumeFileScanner4Tailing) VisitSuperBlock(superBlock super_block.SuperBlock) error { + return nil + +} +func (scanner *VolumeFileScanner4Tailing) ReadNeedleBody() bool { + return true +} + +func (scanner *VolumeFileScanner4Tailing) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error { + isLastChunk := false + + // need to send body by chunks + for i := 0; i < len(needleBody); i += BufferSizeLimit { + stopOffset := i + BufferSizeLimit + if stopOffset >= len(needleBody) { + isLastChunk = true + stopOffset = len(needleBody) + } + + sendErr := scanner.stream.Send(&volume_server_pb.VolumeTailSenderResponse{ + NeedleHeader: needleHeader, + NeedleBody: needleBody[i:stopOffset], + IsLastChunk: isLastChunk, + }) + if sendErr != nil { + return sendErr + } + } + + scanner.lastProcessedTimestampNs = n.AppendAtNs + return nil +} diff --git a/weed/server/volume_grpc_tier_download.go b/weed/server/volume_grpc_tier_download.go new file mode 100644 index 000000000..7b3982e40 --- /dev/null +++ b/weed/server/volume_grpc_tier_download.go @@ -0,0 +1,85 @@ +package weed_server + +import ( + "fmt" + "time" + + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/storage/backend" + "github.com/chrislusf/seaweedfs/weed/storage/needle" +) + +// VolumeTierMoveDatFromRemote copy dat file from a remote tier to local volume server +func (vs *VolumeServer) VolumeTierMoveDatFromRemote(req *volume_server_pb.VolumeTierMoveDatFromRemoteRequest, stream volume_server_pb.VolumeServer_VolumeTierMoveDatFromRemoteServer) error { + + // find existing volume + v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) + if v == nil { + return fmt.Errorf("volume %d not found", req.VolumeId) + } + + // verify the collection + if v.Collection != req.Collection { + return fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection) + } + + // locate the disk file + storageName, storageKey := v.RemoteStorageNameKey() + if storageName == "" || storageKey == "" { + return fmt.Errorf("volume %d is already on local disk", req.VolumeId) + } + + // check whether the local .dat already exists + _, ok := v.DataBackend.(*backend.DiskFile) + if ok { + return fmt.Errorf("volume %d is already on local disk", req.VolumeId) + } + + // check valid storage backend type + backendStorage, found := backend.BackendStorages[storageName] + if !found { + var keys []string + for key := range backend.BackendStorages { + keys = append(keys, key) + } + return fmt.Errorf("remote storage %s not found from suppported: %v", storageName, keys) + } + + startTime := time.Now() + fn := func(progressed int64, percentage float32) error { + now := time.Now() + if now.Sub(startTime) < time.Second { + return nil + } + startTime = now + return stream.Send(&volume_server_pb.VolumeTierMoveDatFromRemoteResponse{ + Processed: progressed, + ProcessedPercentage: percentage, + }) + } + // copy the data file + _, err := backendStorage.DownloadFile(v.FileName()+".dat", storageKey, fn) + if err != nil { + return fmt.Errorf("backend %s copy file %s: %v", storageName, v.FileName()+".dat", err) + } + + if req.KeepRemoteDatFile { + return nil + } + + // remove remote file + if err := backendStorage.DeleteFile(storageKey); err != nil { + return fmt.Errorf("volume %d fail to delete remote file %s: %v", v.Id, storageKey, err) + } + + // forget remote file + v.GetVolumeInfo().Files = v.GetVolumeInfo().Files[1:] + if err := v.SaveVolumeInfo(); err != nil { + return fmt.Errorf("volume %d fail to save remote file info: %v", v.Id, err) + } + + v.DataBackend.Close() + v.DataBackend = nil + + return nil +} diff --git a/weed/server/volume_grpc_tier_upload.go b/weed/server/volume_grpc_tier_upload.go new file mode 100644 index 000000000..c9694df59 --- /dev/null +++ b/weed/server/volume_grpc_tier_upload.go @@ -0,0 +1,100 @@ +package weed_server + +import ( + "fmt" + "os" + "time" + + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/storage/backend" + "github.com/chrislusf/seaweedfs/weed/storage/needle" +) + +// VolumeTierMoveDatToRemote copy dat file to a remote tier +func (vs *VolumeServer) VolumeTierMoveDatToRemote(req *volume_server_pb.VolumeTierMoveDatToRemoteRequest, stream volume_server_pb.VolumeServer_VolumeTierMoveDatToRemoteServer) error { + + // find existing volume + v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) + if v == nil { + return fmt.Errorf("volume %d not found", req.VolumeId) + } + + // verify the collection + if v.Collection != req.Collection { + return fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection) + } + + // locate the disk file + diskFile, ok := v.DataBackend.(*backend.DiskFile) + if !ok { + return fmt.Errorf("volume %d is not on local disk", req.VolumeId) + } + + // check valid storage backend type + backendStorage, found := backend.BackendStorages[req.DestinationBackendName] + if !found { + var keys []string + for key := range backend.BackendStorages { + keys = append(keys, key) + } + return fmt.Errorf("destination %s not found, suppported: %v", req.DestinationBackendName, keys) + } + + // check whether the existing backend storage is the same as requested + // if same, skip + backendType, backendId := backend.BackendNameToTypeId(req.DestinationBackendName) + for _, remoteFile := range v.GetVolumeInfo().GetFiles() { + if remoteFile.BackendType == backendType && remoteFile.BackendId == backendId { + return fmt.Errorf("destination %s already exists", req.DestinationBackendName) + } + } + + startTime := time.Now() + fn := func(progressed int64, percentage float32) error { + now := time.Now() + if now.Sub(startTime) < time.Second { + return nil + } + startTime = now + return stream.Send(&volume_server_pb.VolumeTierMoveDatToRemoteResponse{ + Processed: progressed, + ProcessedPercentage: percentage, + }) + } + + // remember the file original source + attributes := make(map[string]string) + attributes["volumeId"] = v.Id.String() + attributes["collection"] = v.Collection + attributes["ext"] = ".dat" + // copy the data file + key, size, err := backendStorage.CopyFile(diskFile.File, attributes, fn) + if err != nil { + return fmt.Errorf("backend %s copy file %s: %v", req.DestinationBackendName, diskFile.Name(), err) + } + + // save the remote file to volume tier info + v.GetVolumeInfo().Files = append(v.GetVolumeInfo().GetFiles(), &volume_server_pb.RemoteFile{ + BackendType: backendType, + BackendId: backendId, + Key: key, + Offset: 0, + FileSize: uint64(size), + ModifiedTime: uint64(time.Now().Unix()), + Extension: ".dat", + }) + + if err := v.SaveVolumeInfo(); err != nil { + return fmt.Errorf("volume %d fail to save remote file info: %v", v.Id, err) + } + + if err := v.LoadRemoteFile(); err != nil { + return fmt.Errorf("volume %d fail to load remote file: %v", v.Id, err) + } + + if !req.KeepLocalDatFile { + os.Remove(v.FileName() + ".dat") + } + + return nil +} diff --git a/weed/server/volume_server_handlers_admin.go b/weed/server/volume_server_handlers_admin.go index 25b6582f7..1938a34c4 100644 --- a/weed/server/volume_server_handlers_admin.go +++ b/weed/server/volume_server_handlers_admin.go @@ -12,7 +12,7 @@ import ( func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) { m := make(map[string]interface{}) m["Version"] = util.VERSION - m["Volumes"] = vs.store.Status() + m["Volumes"] = vs.store.VolumeInfos() writeJsonQuiet(w, r, http.StatusOK, m) } diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go index f30ffefaf..cd11356b9 100644 --- a/weed/server/volume_server_handlers_read.go +++ b/weed/server/volume_server_handlers_read.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "errors" + "fmt" "io" "mime" "mime/multipart" @@ -66,7 +67,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) glog.V(2).Infoln("volume", volumeId, "found on", lookupResult, "error", err) if err == nil && len(lookupResult.Locations) > 0 { u, _ := url.Parse(util.NormalizeUrl(lookupResult.Locations[0].PublicUrl)) - u.Path = r.URL.Path + u.Path = fmt.Sprintf("%s/%s,%s", u.Path, vid, fid) arg := url.Values{} if c := r.FormValue("collection"); c != "" { arg.Set("collection", c) diff --git a/weed/server/volume_server_handlers_ui.go b/weed/server/volume_server_handlers_ui.go index 852f0b751..8d35c9c8b 100644 --- a/weed/server/volume_server_handlers_ui.go +++ b/weed/server/volume_server_handlers_ui.go @@ -8,6 +8,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" ui "github.com/chrislusf/seaweedfs/weed/server/volume_server_ui" "github.com/chrislusf/seaweedfs/weed/stats" + "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -20,19 +21,30 @@ func (vs *VolumeServer) uiStatusHandler(w http.ResponseWriter, r *http.Request) ds = append(ds, stats.NewDiskStatus(dir)) } } + volumeInfos := vs.store.VolumeInfos() + var normalVolumeInfos, remoteVolumeInfos []*storage.VolumeInfo + for _, vinfo := range volumeInfos { + if vinfo.IsRemote() { + remoteVolumeInfos = append(remoteVolumeInfos, vinfo) + } else { + normalVolumeInfos = append(normalVolumeInfos, vinfo) + } + } args := struct { - Version string - Masters []string - Volumes interface{} - EcVolumes interface{} - DiskStatuses interface{} - Stats interface{} - Counters *stats.ServerStats + Version string + Masters []string + Volumes interface{} + EcVolumes interface{} + RemoteVolumes interface{} + DiskStatuses interface{} + Stats interface{} + Counters *stats.ServerStats }{ util.VERSION, vs.SeedMasterNodes, - vs.store.Status(), + normalVolumeInfos, vs.store.EcVolumes(), + remoteVolumeInfos, ds, infos, serverStats, diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go index db8fcb555..05e21612b 100644 --- a/weed/server/volume_server_handlers_write.go +++ b/weed/server/volume_server_handlers_write.go @@ -51,10 +51,14 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) { ret := operation.UploadResult{} _, isUnchanged, writeError := topology.ReplicatedWrite(vs.GetMaster(), vs.store, volumeId, needle, r) - httpStatus := http.StatusCreated - if isUnchanged { - httpStatus = http.StatusNotModified + + // http 304 status code does not allow body + if writeError == nil && isUnchanged { + w.WriteHeader(http.StatusNotModified) + return } + + httpStatus := http.StatusCreated if writeError != nil { httpStatus = http.StatusInternalServerError ret.Error = writeError.Error() diff --git a/weed/server/volume_server_ui/templates.go b/weed/server/volume_server_ui/templates.go index eafc0aaeb..81496b1de 100644 --- a/weed/server/volume_server_ui/templates.go +++ b/weed/server/volume_server_ui/templates.go @@ -107,10 +107,11 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC <tr> <th>Id</th> <th>Collection</th> - <th>Size</th> + <th>Data Size</th> <th>Files</th> <th>Trash</th> <th>TTL</th> + <th>ReadOnly</th> </tr> </thead> <tbody> @@ -122,6 +123,37 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC <td>{{ .FileCount }}</td> <td>{{ .DeleteCount }} / {{.DeletedByteCount}} Bytes</td> <td>{{ .Ttl }}</td> + <td>{{ .ReadOnly }}</td> + </tr> + {{ end }} + </tbody> + </table> + </div> + + <div class="row"> + <h2>Remote Volumes</h2> + <table class="table table-striped"> + <thead> + <tr> + <th>Id</th> + <th>Collection</th> + <th>Size</th> + <th>Files</th> + <th>Trash</th> + <th>Remote</th> + <th>Key</th> + </tr> + </thead> + <tbody> + {{ range .RemoteVolumes }} + <tr> + <td><code>{{ .Id }}</code></td> + <td>{{ .Collection }}</td> + <td>{{ .Size }} Bytes</td> + <td>{{ .FileCount }}</td> + <td>{{ .DeleteCount }} / {{.DeletedByteCount}} Bytes</td> + <td>{{ .RemoteStorageName }}</td> + <td>{{ .RemoteStorageKey }}</td> </tr> {{ end }} </tbody> diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index 151b48a78..abd0b66eb 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -10,16 +10,18 @@ import ( "strings" "time" + "golang.org/x/net/webdav" + "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" - "golang.org/x/net/webdav" - "google.golang.org/grpc" + + "github.com/spf13/viper" "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/security" - "github.com/spf13/viper" ) type WebDavOption struct { @@ -163,7 +165,7 @@ func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, flag int, perm os.FileMode) (webdav.File, error) { - glog.V(2).Infof("WebDavFileSystem.OpenFile %v", fullFilePath) + glog.V(2).Infof("WebDavFileSystem.OpenFile %v %x", fullFilePath, flag) var err error if fullFilePath, err = clearName(fullFilePath); err != nil { @@ -175,12 +177,6 @@ func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, f if strings.HasSuffix(fullFilePath, "/") { return nil, os.ErrInvalid } - // based directory should be exists. - dir, _ := path.Split(fullFilePath) - _, err := fs.stat(ctx, dir) - if err != nil { - return nil, os.ErrInvalid - } _, err = fs.stat(ctx, fullFilePath) if err == nil { if flag&os.O_EXCL != 0 { @@ -412,7 +408,7 @@ func (f *WebDavFile) Write(buf []byte) (int, error) { fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) bufReader := bytes.NewReader(buf) - uploadResult, err := operation.Upload(fileUrl, f.name, bufReader, false, "application/octet-stream", nil, auth) + uploadResult, err := operation.Upload(fileUrl, f.name, bufReader, false, "", nil, auth) if err != nil { glog.V(0).Infof("upload data %v to %s: %v", f.name, fileUrl, err) return 0, fmt.Errorf("upload data: %v", err) @@ -448,9 +444,11 @@ func (f *WebDavFile) Write(buf []byte) (int, error) { return nil }) - if err != nil { + if err == nil { + glog.V(3).Infof("WebDavFileSystem.Write %v: written [%d,%d)", f.name, f.off, f.off+int64(len(buf))) f.off += int64(len(buf)) } + return len(buf), err } @@ -494,10 +492,13 @@ func (f *WebDavFile) Read(p []byte) (readSize int, err error) { } readSize = int(totalRead) + glog.V(3).Infof("WebDavFileSystem.Read %v: [%d,%d)", f.name, f.off, f.off+totalRead) + f.off += totalRead if readSize == 0 { return 0, io.EOF } + return } @@ -511,7 +512,7 @@ func (f *WebDavFile) Readdir(count int) (ret []os.FileInfo, err error) { dir = dir[:len(dir)-1] } - err = filer2.ReadDirAllEntries(ctx, f.fs, dir, func(entry *filer_pb.Entry) { + err = filer2.ReadDirAllEntries(ctx, f.fs, dir, "", func(entry *filer_pb.Entry, isLast bool) { fi := FileInfo{ size: int64(filer2.TotalSize(entry.GetChunks())), name: entry.Name, |
