aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/common.go12
-rw-r--r--weed/server/filer_grpc_server.go61
-rw-r--r--weed/server/filer_grpc_server_rename.go6
-rw-r--r--weed/server/filer_server.go3
-rw-r--r--weed/server/filer_server_handlers_read.go13
-rw-r--r--weed/server/filer_server_handlers_write.go26
-rw-r--r--weed/server/filer_server_handlers_write_autochunk.go4
-rw-r--r--weed/server/filer_ui/breadcrumb.go10
-rw-r--r--weed/server/filer_ui/templates.go13
-rw-r--r--weed/server/master_grpc_server.go36
-rw-r--r--weed/server/master_grpc_server_collection.go4
-rw-r--r--weed/server/master_grpc_server_volume.go24
-rw-r--r--weed/server/master_server.go79
-rw-r--r--weed/server/master_server_handlers.go64
-rw-r--r--weed/server/master_server_handlers_admin.go77
-rw-r--r--weed/server/raft_server.go2
-rw-r--r--weed/server/volume_grpc_admin.go1
-rw-r--r--weed/server/volume_grpc_client_to_master.go4
-rw-r--r--weed/server/volume_grpc_copy.go29
-rw-r--r--weed/server/volume_grpc_copy_incremental.go8
-rw-r--r--weed/server/volume_grpc_erasure_coding.go97
-rw-r--r--weed/server/volume_grpc_query.go69
-rw-r--r--weed/server/volume_grpc_tail.go71
-rw-r--r--weed/server/volume_grpc_tier_download.go85
-rw-r--r--weed/server/volume_grpc_tier_upload.go100
-rw-r--r--weed/server/volume_server_handlers_admin.go2
-rw-r--r--weed/server/volume_server_handlers_read.go3
-rw-r--r--weed/server/volume_server_handlers_ui.go28
-rw-r--r--weed/server/volume_server_handlers_write.go10
-rw-r--r--weed/server/volume_server_ui/templates.go34
-rw-r--r--weed/server/webdav_server.go27
31 files changed, 773 insertions, 229 deletions
diff --git a/weed/server/common.go b/weed/server/common.go
index e02ab38a6..888ddec49 100644
--- a/weed/server/common.go
+++ b/weed/server/common.go
@@ -11,17 +11,18 @@ import (
"strings"
"time"
- "github.com/chrislusf/seaweedfs/weed/storage/needle"
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/stats"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
- _ "github.com/chrislusf/seaweedfs/weed/statik"
"github.com/gorilla/mux"
statik "github.com/rakyll/statik/fs"
+
+ _ "github.com/chrislusf/seaweedfs/weed/statik"
)
var serverStats *stats.ServerStats
@@ -48,10 +49,16 @@ func writeJson(w http.ResponseWriter, r *http.Request, httpStatus int, obj inter
if callback == "" {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(httpStatus)
+ if httpStatus == http.StatusNotModified {
+ return
+ }
_, err = w.Write(bytes)
} else {
w.Header().Set("Content-Type", "application/javascript")
w.WriteHeader(httpStatus)
+ if httpStatus == http.StatusNotModified {
+ return
+ }
if _, err = w.Write([]uint8(callback)); err != nil {
return
}
@@ -109,6 +116,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st
}
ar := &operation.VolumeAssignRequest{
Count: count,
+ DataCenter: r.FormValue("dataCenter"),
Replication: r.FormValue("replication"),
Collection: r.FormValue("collection"),
Ttl: r.FormValue("ttl"),
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go
index 8eea2441e..a84feec2d 100644
--- a/weed/server/filer_grpc_server.go
+++ b/weed/server/filer_grpc_server.go
@@ -20,7 +20,7 @@ func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.L
entry, err := fs.filer.FindEntry(ctx, filer2.FullPath(filepath.ToSlash(filepath.Join(req.Directory, req.Name))))
if err != nil {
- return nil, fmt.Errorf("%s not found under %s: %v", req.Name, req.Directory, err)
+ return nil, err
}
return &filer_pb.LookupDirectoryEntryResponse{
@@ -29,27 +29,32 @@ func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.L
IsDirectory: entry.IsDirectory(),
Attributes: filer2.EntryAttributeToPb(entry),
Chunks: entry.Chunks,
+ Extended: entry.Extended,
},
}, nil
}
-func (fs *FilerServer) ListEntries(ctx context.Context, req *filer_pb.ListEntriesRequest) (*filer_pb.ListEntriesResponse, error) {
+func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream filer_pb.SeaweedFiler_ListEntriesServer) error {
limit := int(req.Limit)
if limit == 0 {
limit = fs.option.DirListingLimit
}
- resp := &filer_pb.ListEntriesResponse{}
+ paginationLimit := filer2.PaginationSize
+ if limit < paginationLimit {
+ paginationLimit = limit
+ }
+
lastFileName := req.StartFromFileName
includeLastFile := req.InclusiveStartFrom
for limit > 0 {
- entries, err := fs.filer.ListDirectoryEntries(ctx, filer2.FullPath(req.Directory), lastFileName, includeLastFile, 1024)
+ entries, err := fs.filer.ListDirectoryEntries(stream.Context(), filer2.FullPath(req.Directory), lastFileName, includeLastFile, paginationLimit)
if err != nil {
- return nil, err
+ return err
}
if len(entries) == 0 {
- return resp, nil
+ return nil
}
includeLastFile = false
@@ -64,22 +69,30 @@ func (fs *FilerServer) ListEntries(ctx context.Context, req *filer_pb.ListEntrie
}
}
- resp.Entries = append(resp.Entries, &filer_pb.Entry{
- Name: entry.Name(),
- IsDirectory: entry.IsDirectory(),
- Chunks: entry.Chunks,
- Attributes: filer2.EntryAttributeToPb(entry),
- })
+ if err := stream.Send(&filer_pb.ListEntriesResponse{
+ Entry: &filer_pb.Entry{
+ Name: entry.Name(),
+ IsDirectory: entry.IsDirectory(),
+ Chunks: entry.Chunks,
+ Attributes: filer2.EntryAttributeToPb(entry),
+ Extended: entry.Extended,
+ },
+ }); err != nil {
+ return err
+ }
limit--
+ if limit == 0 {
+ return nil
+ }
}
- if len(resp.Entries) < 1024 {
+ if len(entries) < paginationLimit {
break
}
}
- return resp, nil
+ return nil
}
func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVolumeRequest) (*filer_pb.LookupVolumeResponse, error) {
@@ -95,7 +108,11 @@ func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVol
return nil, err
}
var locs []*filer_pb.Location
- for _, loc := range fs.filer.MasterClient.GetLocations(uint32(vid)) {
+ locations, found := fs.filer.MasterClient.GetLocations(uint32(vid))
+ if !found {
+ continue
+ }
+ for _, loc := range locations {
locs = append(locs, &filer_pb.Location{
Url: loc.Url,
PublicUrl: loc.PublicUrl,
@@ -125,7 +142,7 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr
})
if err == nil {
- fs.filer.DeleteChunks(fullpath, garbages)
+ fs.filer.DeleteChunks(garbages)
}
return &filer_pb.CreateEntryResponse{}, err
@@ -147,12 +164,14 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr
newEntry := &filer2.Entry{
FullPath: filer2.FullPath(filepath.ToSlash(filepath.Join(req.Directory, req.Entry.Name))),
Attr: entry.Attr,
+ Extended: req.Entry.Extended,
Chunks: chunks,
}
- glog.V(3).Infof("updating %s: %+v, chunks %d: %v => %+v, chunks %d: %v",
+ glog.V(3).Infof("updating %s: %+v, chunks %d: %v => %+v, chunks %d: %v, extended: %v => %v",
fullpath, entry.Attr, len(entry.Chunks), entry.Chunks,
- req.Entry.Attributes, len(req.Entry.Chunks), req.Entry.Chunks)
+ req.Entry.Attributes, len(req.Entry.Chunks), req.Entry.Chunks,
+ entry.Extended, req.Entry.Extended)
if req.Entry.Attributes != nil {
if req.Entry.Attributes.Mtime != 0 {
@@ -174,8 +193,8 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr
}
if err = fs.filer.UpdateEntry(ctx, entry, newEntry); err == nil {
- fs.filer.DeleteChunks(entry.FullPath, unusedChunks)
- fs.filer.DeleteChunks(entry.FullPath, garbages)
+ fs.filer.DeleteChunks(unusedChunks)
+ fs.filer.DeleteChunks(garbages)
}
fs.filer.NotifyUpdateEvent(entry, newEntry, true)
@@ -184,7 +203,7 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr
}
func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntryRequest) (resp *filer_pb.DeleteEntryResponse, err error) {
- err = fs.filer.DeleteEntryMetaAndData(ctx, filer2.FullPath(filepath.ToSlash(filepath.Join(req.Directory, req.Name))), req.IsRecursive, req.IsDeleteData)
+ err = fs.filer.DeleteEntryMetaAndData(ctx, filer2.FullPath(filepath.ToSlash(filepath.Join(req.Directory, req.Name))), req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData)
return &filer_pb.DeleteEntryResponse{}, err
}
diff --git a/weed/server/filer_grpc_server_rename.go b/weed/server/filer_grpc_server_rename.go
index 7142f7606..dfa59e7fe 100644
--- a/weed/server/filer_grpc_server_rename.go
+++ b/weed/server/filer_grpc_server_rename.go
@@ -73,11 +73,11 @@ func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent filer
return err
}
- println("found", len(entries), "entries under", currentDirPath)
+ // println("found", len(entries), "entries under", currentDirPath)
for _, item := range entries {
lastFileName = item.Name()
- println("processing", lastFileName)
+ // println("processing", lastFileName)
err := fs.moveEntry(ctx, currentDirPath, item, newDirPath, item.Name(), events)
if err != nil {
return err
@@ -113,7 +113,7 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent filer2.FullP
}
// delete old entry
- deleteErr := fs.filer.DeleteEntryMetaAndData(ctx, oldPath, false, false)
+ deleteErr := fs.filer.DeleteEntryMetaAndData(ctx, oldPath, false, false, false)
if deleteErr != nil {
return deleteErr
}
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index b9e6aa23d..41ba81366 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -15,12 +15,13 @@ import (
"github.com/chrislusf/seaweedfs/weed/filer2"
_ "github.com/chrislusf/seaweedfs/weed/filer2/cassandra"
+ _ "github.com/chrislusf/seaweedfs/weed/filer2/etcd"
_ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb"
_ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb2"
- _ "github.com/chrislusf/seaweedfs/weed/filer2/memdb"
_ "github.com/chrislusf/seaweedfs/weed/filer2/mysql"
_ "github.com/chrislusf/seaweedfs/weed/filer2/postgres"
_ "github.com/chrislusf/seaweedfs/weed/filer2/redis"
+ _ "github.com/chrislusf/seaweedfs/weed/filer2/tikv"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/notification"
_ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs"
diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go
index 0edf501a8..ba21298ba 100644
--- a/weed/server/filer_server_handlers_read.go
+++ b/weed/server/filer_server_handlers_read.go
@@ -32,10 +32,15 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
fs.listDirectoryHandler(w, r)
return
}
- glog.V(1).Infof("Not found %s: %v", path, err)
-
- stats.FilerRequestCounter.WithLabelValues("read.notfound").Inc()
- w.WriteHeader(http.StatusNotFound)
+ if err == filer2.ErrNotFound {
+ glog.V(1).Infof("Not found %s: %v", path, err)
+ stats.FilerRequestCounter.WithLabelValues("read.notfound").Inc()
+ w.WriteHeader(http.StatusNotFound)
+ } else {
+ glog.V(0).Infof("Internal %s: %v", path, err)
+ stats.FilerRequestCounter.WithLabelValues("read.internalerror").Inc()
+ w.WriteHeader(http.StatusInternalServerError)
+ }
return
}
diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go
index 0bf1218ce..236e7027d 100644
--- a/weed/server/filer_server_handlers_write.go
+++ b/weed/server/filer_server_handlers_write.go
@@ -149,6 +149,16 @@ func (fs *FilerServer) updateFilerStore(ctx context.Context, r *http.Request, w
stats.FilerRequestHistogram.WithLabelValues("postStoreWrite").Observe(time.Since(start).Seconds())
}()
+ modeStr := r.URL.Query().Get("mode")
+ if modeStr == "" {
+ modeStr = "0660"
+ }
+ mode, err := strconv.ParseUint(modeStr, 8, 32)
+ if err != nil {
+ glog.Errorf("Invalid mode format: %s, use 0660 by default", modeStr)
+ mode = 0660
+ }
+
path := r.URL.Path
if strings.HasSuffix(path, "/") {
if ret.Name != "" {
@@ -165,7 +175,7 @@ func (fs *FilerServer) updateFilerStore(ctx context.Context, r *http.Request, w
Attr: filer2.Attr{
Mtime: time.Now(),
Crtime: crTime,
- Mode: 0660,
+ Mode: os.FileMode(mode),
Uid: OS_UID,
Gid: OS_GID,
Replication: replication,
@@ -184,7 +194,7 @@ func (fs *FilerServer) updateFilerStore(ctx context.Context, r *http.Request, w
}
// glog.V(4).Infof("saving %s => %+v", path, entry)
if dbErr := fs.filer.CreateEntry(ctx, entry); dbErr != nil {
- fs.filer.DeleteChunks(entry.FullPath, entry.Chunks)
+ fs.filer.DeleteChunks(entry.Chunks)
glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr)
writeJsonError(w, r, http.StatusInternalServerError, dbErr)
err = dbErr
@@ -269,14 +279,22 @@ func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth se
// curl -X DELETE http://localhost:8888/path/to
// curl -X DELETE http://localhost:8888/path/to?recursive=true
+// curl -X DELETE http://localhost:8888/path/to?recursive=true&ignoreRecursiveError=true
+// curl -X DELETE http://localhost:8888/path/to?recursive=true&skipChunkDeletion=true
func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
isRecursive := r.FormValue("recursive") == "true"
+ ignoreRecursiveError := r.FormValue("ignoreRecursiveError") == "true"
+ skipChunkDeletion := r.FormValue("skipChunkDeletion") == "true"
- err := fs.filer.DeleteEntryMetaAndData(context.Background(), filer2.FullPath(r.URL.Path), isRecursive, true)
+ err := fs.filer.DeleteEntryMetaAndData(context.Background(), filer2.FullPath(r.URL.Path), isRecursive, ignoreRecursiveError, !skipChunkDeletion)
if err != nil {
glog.V(1).Infoln("deleting", r.URL.Path, ":", err.Error())
- writeJsonError(w, r, http.StatusInternalServerError, err)
+ httpStatus := http.StatusInternalServerError
+ if err == filer2.ErrNotFound {
+ httpStatus = http.StatusNotFound
+ }
+ writeJsonError(w, r, httpStatus, err)
return
}
diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go
index 492b55943..8ff7ab2c0 100644
--- a/weed/server/filer_server_handlers_write_autochunk.go
+++ b/weed/server/filer_server_handlers_write_autochunk.go
@@ -123,7 +123,7 @@ func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r
// upload the chunk to the volume server
chunkName := fileName + "_chunk_" + strconv.FormatInt(int64(len(fileChunks)+1), 10)
- uploadErr := fs.doUpload(urlLocation, w, r, chunkBuf[0:chunkBufOffset], chunkName, "application/octet-stream", fileId, auth)
+ uploadErr := fs.doUpload(urlLocation, w, r, chunkBuf[0:chunkBufOffset], chunkName, "", fileId, auth)
if uploadErr != nil {
return nil, uploadErr
}
@@ -177,7 +177,7 @@ func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r
Chunks: fileChunks,
}
if dbErr := fs.filer.CreateEntry(ctx, entry); dbErr != nil {
- fs.filer.DeleteChunks(entry.FullPath, entry.Chunks)
+ fs.filer.DeleteChunks(entry.Chunks)
replyerr = dbErr
filerResult.Error = dbErr.Error()
glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr)
diff --git a/weed/server/filer_ui/breadcrumb.go b/weed/server/filer_ui/breadcrumb.go
index 55a1909a8..2f0df7f91 100644
--- a/weed/server/filer_ui/breadcrumb.go
+++ b/weed/server/filer_ui/breadcrumb.go
@@ -14,10 +14,14 @@ func ToBreadcrumb(fullpath string) (crumbs []Breadcrumb) {
parts := strings.Split(fullpath, "/")
for i := 0; i < len(parts); i++ {
- crumbs = append(crumbs, Breadcrumb{
- Name: parts[i] + "/",
+ crumb := Breadcrumb{
+ Name: parts[i] + " /",
Link: "/" + filepath.ToSlash(filepath.Join(parts[0:i+1]...)),
- })
+ }
+ if !strings.HasSuffix(crumb.Link, "/") {
+ crumb.Link += "/"
+ }
+ crumbs = append(crumbs, crumb)
}
return
diff --git a/weed/server/filer_ui/templates.go b/weed/server/filer_ui/templates.go
index 884798936..e532b27e2 100644
--- a/weed/server/filer_ui/templates.go
+++ b/weed/server/filer_ui/templates.go
@@ -50,7 +50,7 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC
<div class="row">
<div>
{{ range $entry := .Breadcrumbs }}
- <a href={{ $entry.Link }} >
+ <a href="{{ $entry.Link }}" >
{{ $entry.Name }}
</a>
{{ end }}
@@ -78,20 +78,19 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC
</a>
{{end}}
</td>
- <td align="right">
+ <td align="right" nowrap>
{{if $entry.IsDirectory}}
{{else}}
- {{ $entry.Mime }}
+ {{ $entry.Mime }}&nbsp;
{{end}}
</td>
- <td align="right">
+ <td align="right" nowrap>
{{if $entry.IsDirectory}}
{{else}}
- {{ $entry.Size | humanizeBytes }}
- &nbsp;&nbsp;&nbsp;
+ {{ $entry.Size | humanizeBytes }}&nbsp;
{{end}}
</td>
- <td>
+ <td nowrap>
{{ $entry.Timestamp.Format "2006-01-02 15:04" }}
</td>
</tr>
diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go
index e0d1fd174..fcfd98f7b 100644
--- a/weed/server/master_grpc_server.go
+++ b/weed/server/master_grpc_server.go
@@ -9,6 +9,7 @@ import (
"github.com/chrislusf/raft"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/topology"
"google.golang.org/grpc/peer"
@@ -49,6 +50,11 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
for {
heartbeat, err := stream.Recv()
if err != nil {
+ if dn != nil {
+ glog.Warningf("SendHeartbeat.Recv server %s:%d : %v", dn.Ip, dn.Port, err)
+ } else {
+ glog.Warningf("SendHeartbeat.Recv: %v", err)
+ }
return err
}
@@ -71,8 +77,12 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
int64(heartbeat.MaxVolumeCount))
glog.V(0).Infof("added volume server %v:%d", heartbeat.GetIp(), heartbeat.GetPort())
if err := stream.Send(&master_pb.HeartbeatResponse{
- VolumeSizeLimit: uint64(ms.option.VolumeSizeLimitMB) * 1024 * 1024,
+ VolumeSizeLimit: uint64(ms.option.VolumeSizeLimitMB) * 1024 * 1024,
+ MetricsAddress: ms.option.MetricsAddress,
+ MetricsIntervalSeconds: uint32(ms.option.MetricsIntervalSec),
+ StorageBackends: backend.ToPbStorageBackends(),
}); err != nil {
+ glog.Warningf("SendHeartbeat.Send volume size to %s:%d %v", dn.Ip, dn.Port, err)
return err
}
}
@@ -154,13 +164,13 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
// tell the volume servers about the leader
newLeader, err := t.Leader()
if err != nil {
+ glog.Warningf("SendHeartbeat find leader: %v", err)
return err
}
if err := stream.Send(&master_pb.HeartbeatResponse{
- Leader: newLeader,
- MetricsAddress: ms.option.MetricsAddress,
- MetricsIntervalSeconds: uint32(ms.option.MetricsIntervalSec),
+ Leader: newLeader,
}); err != nil {
+ glog.Warningf("SendHeartbeat.Send response to to %s:%d %v", dn.Ip, dn.Port, err)
return err
}
}
@@ -176,7 +186,7 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ
}
if !ms.Topo.IsLeader() {
- return raft.NotLeaderError
+ return ms.informNewLeader(stream)
}
// remember client address
@@ -236,7 +246,7 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ
}
case <-ticker.C:
if !ms.Topo.IsLeader() {
- return raft.NotLeaderError
+ return ms.informNewLeader(stream)
}
case <-stopChan:
return nil
@@ -245,3 +255,17 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ
return nil
}
+
+func (ms *MasterServer) informNewLeader(stream master_pb.Seaweed_KeepConnectedServer) error {
+ leader, err := ms.Topo.Leader()
+ if err != nil {
+ glog.Errorf("topo leader: %v", err)
+ return raft.NotLeaderError
+ }
+ if err := stream.Send(&master_pb.VolumeLocation{
+ Leader: leader,
+ }); err != nil {
+ return err
+ }
+ return nil
+}
diff --git a/weed/server/master_grpc_server_collection.go b/weed/server/master_grpc_server_collection.go
index a50cfa192..f8e0785f6 100644
--- a/weed/server/master_grpc_server_collection.go
+++ b/weed/server/master_grpc_server_collection.go
@@ -57,7 +57,7 @@ func (ms *MasterServer) doDeleteNormalCollection(collectionName string) error {
}
for _, server := range collection.ListVolumeServers() {
- err := operation.WithVolumeServerClient(server.Url(), ms.grpcDialOpiton, func(client volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(server.Url(), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
_, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{
Collection: collectionName,
})
@@ -77,7 +77,7 @@ func (ms *MasterServer) doDeleteEcCollection(collectionName string) error {
listOfEcServers := ms.Topo.ListEcServersByCollection(collectionName)
for _, server := range listOfEcServers {
- err := operation.WithVolumeServerClient(server, ms.grpcDialOpiton, func(client volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(server, ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
_, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{
Collection: collectionName,
})
diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go
index 19064bcde..856c07890 100644
--- a/weed/server/master_grpc_server_volume.go
+++ b/weed/server/master_grpc_server_volume.go
@@ -5,10 +5,11 @@ import (
"fmt"
"github.com/chrislusf/raft"
+
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/security"
- "github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
"github.com/chrislusf/seaweedfs/weed/topology"
)
@@ -52,7 +53,7 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
if req.Replication == "" {
req.Replication = ms.option.DefaultReplicaPlacement
}
- replicaPlacement, err := storage.NewReplicaPlacementFromString(req.Replication)
+ replicaPlacement, err := super_block.NewReplicaPlacementFromString(req.Replication)
if err != nil {
return nil, err
}
@@ -62,13 +63,14 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
}
option := &topology.VolumeGrowOption{
- Collection: req.Collection,
- ReplicaPlacement: replicaPlacement,
- Ttl: ttl,
- Prealloacte: ms.preallocateSize,
- DataCenter: req.DataCenter,
- Rack: req.Rack,
- DataNode: req.DataNode,
+ Collection: req.Collection,
+ ReplicaPlacement: replicaPlacement,
+ Ttl: ttl,
+ Prealloacte: ms.preallocateSize,
+ DataCenter: req.DataCenter,
+ Rack: req.Rack,
+ DataNode: req.DataNode,
+ MemoryMapMaxSizeMb: req.MemoryMapMaxSizeMb,
}
if !ms.Topo.HasWritableVolume(option) {
@@ -77,7 +79,7 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
}
ms.vgLock.Lock()
if !ms.Topo.HasWritableVolume(option) {
- if _, err = ms.vg.AutomaticGrowByType(option, ms.grpcDialOpiton, ms.Topo); err != nil {
+ if _, err = ms.vg.AutomaticGrowByType(option, ms.grpcDialOption, ms.Topo, int(req.WritableVolumeCount)); err != nil {
ms.vgLock.Unlock()
return nil, fmt.Errorf("Cannot grow volume group! %v", err)
}
@@ -107,7 +109,7 @@ func (ms *MasterServer) Statistics(ctx context.Context, req *master_pb.Statistic
if req.Replication == "" {
req.Replication = ms.option.DefaultReplicaPlacement
}
- replicaPlacement, err := storage.NewReplicaPlacementFromString(req.Replication)
+ replicaPlacement, err := super_block.NewReplicaPlacementFromString(req.Replication)
if err != nil {
return nil, err
}
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index 3689b5495..33a5129da 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -1,9 +1,8 @@
package weed_server
import (
+ "context"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/shell"
- "google.golang.org/grpc"
"net/http"
"net/http/httputil"
"net/url"
@@ -19,10 +18,18 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/sequence"
+ "github.com/chrislusf/seaweedfs/weed/shell"
"github.com/chrislusf/seaweedfs/weed/topology"
"github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/chrislusf/seaweedfs/weed/wdclient"
"github.com/gorilla/mux"
"github.com/spf13/viper"
+ "google.golang.org/grpc"
+)
+
+const (
+ SequencerType = "master.sequencer.type"
+ SequencerEtcdUrls = "master.sequencer.sequencer_etcd_urls"
)
type MasterOption struct {
@@ -55,10 +62,12 @@ type MasterServer struct {
clientChansLock sync.RWMutex
clientChans map[string]chan *master_pb.VolumeLocation
- grpcDialOpiton grpc.DialOption
+ grpcDialOption grpc.DialOption
+
+ MasterClient *wdclient.MasterClient
}
-func NewMasterServer(r *mux.Router, option *MasterOption) *MasterServer {
+func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *MasterServer {
v := viper.GetViper()
signingKey := v.GetString("jwt.signing.key")
@@ -73,14 +82,21 @@ func NewMasterServer(r *mux.Router, option *MasterOption) *MasterServer {
if option.VolumePreallocate {
preallocateSize = int64(option.VolumeSizeLimitMB) * (1 << 20)
}
+
+ grpcDialOption := security.LoadClientTLS(v.Sub("grpc"), "master")
ms := &MasterServer{
option: option,
preallocateSize: preallocateSize,
clientChans: make(map[string]chan *master_pb.VolumeLocation),
- grpcDialOpiton: security.LoadClientTLS(v.Sub("grpc"), "master"),
+ grpcDialOption: grpcDialOption,
+ MasterClient: wdclient.NewMasterClient(context.Background(), grpcDialOption, "master", peers),
}
ms.bounedLeaderChan = make(chan int, 16)
- seq := sequence.NewMemorySequencer()
+
+ seq := ms.createSequencer(option)
+ if nil == seq {
+ glog.Fatalf("create sequencer failed.")
+ }
ms.Topo = topology.NewTopology("topo", seq, uint64(ms.option.VolumeSizeLimitMB)*1024*1024, ms.option.PulseSeconds)
ms.vg = topology.NewDefaultVolumeGrowth()
glog.V(0).Infoln("Volume Size Limit is", ms.option.VolumeSizeLimitMB, "MB")
@@ -92,7 +108,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption) *MasterServer {
r.HandleFunc("/", ms.proxyToLeader(ms.uiStatusHandler))
r.HandleFunc("/ui/index.html", ms.uiStatusHandler)
r.HandleFunc("/dir/assign", ms.proxyToLeader(ms.guard.WhiteList(ms.dirAssignHandler)))
- r.HandleFunc("/dir/lookup", ms.proxyToLeader(ms.guard.WhiteList(ms.dirLookupHandler)))
+ r.HandleFunc("/dir/lookup", ms.guard.WhiteList(ms.dirLookupHandler))
r.HandleFunc("/dir/status", ms.proxyToLeader(ms.guard.WhiteList(ms.dirStatusHandler)))
r.HandleFunc("/col/delete", ms.proxyToLeader(ms.guard.WhiteList(ms.collectionDeleteHandler)))
r.HandleFunc("/vol/grow", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeGrowHandler)))
@@ -102,10 +118,10 @@ func NewMasterServer(r *mux.Router, option *MasterOption) *MasterServer {
r.HandleFunc("/stats/health", ms.guard.WhiteList(statsHealthHandler))
r.HandleFunc("/stats/counter", ms.guard.WhiteList(statsCounterHandler))
r.HandleFunc("/stats/memory", ms.guard.WhiteList(statsMemoryHandler))
- r.HandleFunc("/{fileId}", ms.proxyToLeader(ms.redirectHandler))
+ r.HandleFunc("/{fileId}", ms.redirectHandler)
}
- ms.Topo.StartRefreshWritableVolumes(ms.grpcDialOpiton, ms.option.GarbageThreshold, ms.preallocateSize)
+ ms.Topo.StartRefreshWritableVolumes(ms.grpcDialOption, ms.option.GarbageThreshold, ms.preallocateSize)
ms.startAdminScripts()
@@ -158,23 +174,28 @@ func (ms *MasterServer) proxyToLeader(f func(w http.ResponseWriter, r *http.Requ
proxy.Transport = util.Transport
proxy.ServeHTTP(w, r)
} else {
- //drop it to the floor
- //writeJsonError(w, r, errors.New(ms.Topo.RaftServer.Name()+" does not know Leader yet:"+ms.Topo.RaftServer.Leader()))
+ // drop it to the floor
+ // writeJsonError(w, r, errors.New(ms.Topo.RaftServer.Name()+" does not know Leader yet:"+ms.Topo.RaftServer.Leader()))
}
}
}
func (ms *MasterServer) startAdminScripts() {
+ var err error
+
v := viper.GetViper()
adminScripts := v.GetString("master.maintenance.scripts")
- v.SetDefault("master.maintenance.sleep_minutes", 17)
- sleepMinutes := v.GetInt("master.maintenance.sleep_minutes")
-
glog.V(0).Infof("adminScripts:\n%v", adminScripts)
if adminScripts == "" {
return
}
+ v.SetDefault("master.maintenance.sleep_minutes", 17)
+ sleepMinutes := v.GetInt("master.maintenance.sleep_minutes")
+
+ v.SetDefault("master.filer.default_filer_url", "http://localhost:8888/")
+ filerURL := v.GetString("master.filer.default_filer_url")
+
scriptLines := strings.Split(adminScripts, "\n")
masterAddress := "localhost:" + strconv.Itoa(ms.option.Port)
@@ -182,9 +203,12 @@ func (ms *MasterServer) startAdminScripts() {
var shellOptions shell.ShellOptions
shellOptions.GrpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "master")
shellOptions.Masters = &masterAddress
- shellOptions.FilerHost = "localhost"
- shellOptions.FilerPort = 8888
- shellOptions.Directory = "/"
+
+ shellOptions.FilerHost, shellOptions.FilerPort, shellOptions.Directory, err = util.ParseFilerUrl(filerURL)
+ if err != nil {
+ glog.V(0).Infof("failed to parse master.filer.default_filer_urll=%s : %v\n", filerURL, err)
+ return
+ }
commandEnv := shell.NewCommandEnv(shellOptions)
@@ -223,3 +247,24 @@ func (ms *MasterServer) startAdminScripts() {
}
}()
}
+
+func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer {
+ var seq sequence.Sequencer
+ v := viper.GetViper()
+ seqType := strings.ToLower(v.GetString(SequencerType))
+ glog.V(1).Infof("[%s] : [%s]", SequencerType, seqType)
+ switch strings.ToLower(seqType) {
+ case "etcd":
+ var err error
+ urls := v.GetString(SequencerEtcdUrls)
+ glog.V(0).Infof("[%s] : [%s]", SequencerEtcdUrls, urls)
+ seq, err = sequence.NewEtcdSequencer(urls, option.MetaFolder)
+ if err != nil {
+ glog.Error(err)
+ seq = nil
+ }
+ default:
+ seq = sequence.NewMemorySequencer()
+ }
+ return seq
+}
diff --git a/weed/server/master_server_handlers.go b/weed/server/master_server_handlers.go
index 5c7ff41cf..514d86800 100644
--- a/weed/server/master_server_handlers.go
+++ b/weed/server/master_server_handlers.go
@@ -22,21 +22,7 @@ func (ms *MasterServer) lookupVolumeId(vids []string, collection string) (volume
if _, ok := volumeLocations[vid]; ok {
continue
}
- volumeId, err := needle.NewVolumeId(vid)
- if err == nil {
- machines := ms.Topo.Lookup(collection, volumeId)
- if machines != nil {
- var ret []operation.Location
- for _, dn := range machines {
- ret = append(ret, operation.Location{Url: dn.Url(), PublicUrl: dn.PublicUrl})
- }
- volumeLocations[vid] = operation.LookupResult{VolumeId: vid, Locations: ret}
- } else {
- volumeLocations[vid] = operation.LookupResult{VolumeId: vid, Error: fmt.Sprintf("volumeId %s not found.", vid)}
- }
- } else {
- volumeLocations[vid] = operation.LookupResult{VolumeId: vid, Error: fmt.Sprintf("Unknown volumeId format: %s", vid)}
- }
+ volumeLocations[vid] = ms.findVolumeLocation(collection, vid)
}
return
}
@@ -59,12 +45,10 @@ func (ms *MasterServer) dirLookupHandler(w http.ResponseWriter, r *http.Request)
vid = fileId[0:commaSep]
}
}
- vids := []string{vid}
collection := r.FormValue("collection") //optional, but can be faster if too many collections
- volumeLocations := ms.lookupVolumeId(vids, collection)
- location := volumeLocations[vid]
+ location := ms.findVolumeLocation(collection, vid)
httpStatus := http.StatusOK
- if location.Error != "" {
+ if location.Error != "" || location.Locations == nil {
httpStatus = http.StatusNotFound
} else {
forRead := r.FormValue("read")
@@ -74,6 +58,41 @@ func (ms *MasterServer) dirLookupHandler(w http.ResponseWriter, r *http.Request)
writeJsonQuiet(w, r, httpStatus, location)
}
+// findVolumeLocation finds the volume location from master topo if it is leader,
+// or from master client if not leader
+func (ms *MasterServer) findVolumeLocation(collection, vid string) operation.LookupResult {
+ var locations []operation.Location
+ var err error
+ if ms.Topo.IsLeader() {
+ volumeId, newVolumeIdErr := needle.NewVolumeId(vid)
+ if newVolumeIdErr != nil {
+ err = fmt.Errorf("Unknown volume id %s", vid)
+ } else {
+ machines := ms.Topo.Lookup(collection, volumeId)
+ for _, loc := range machines {
+ locations = append(locations, operation.Location{Url: loc.Url(), PublicUrl: loc.PublicUrl})
+ }
+ if locations == nil {
+ err = fmt.Errorf("volume id %s not found", vid)
+ }
+ }
+ } else {
+ machines, getVidLocationsErr := ms.MasterClient.GetVidLocations(vid)
+ for _, loc := range machines {
+ locations = append(locations, operation.Location{Url: loc.Url, PublicUrl: loc.PublicUrl})
+ }
+ err = getVidLocationsErr
+ }
+ ret := operation.LookupResult{
+ VolumeId: vid,
+ Locations: locations,
+ }
+ if err != nil {
+ ret.Error = err.Error()
+ }
+ return ret
+}
+
func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request) {
stats.AssignRequest()
requestedCount, e := strconv.ParseUint(r.FormValue("count"), 10, 64)
@@ -81,6 +100,11 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request)
requestedCount = 1
}
+ writableVolumeCount, e := strconv.Atoi(r.FormValue("writableVolumeCount"))
+ if e != nil {
+ writableVolumeCount = 0
+ }
+
option, err := ms.getVolumeGrowOption(r)
if err != nil {
writeJsonQuiet(w, r, http.StatusNotAcceptable, operation.AssignResult{Error: err.Error()})
@@ -95,7 +119,7 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request)
ms.vgLock.Lock()
defer ms.vgLock.Unlock()
if !ms.Topo.HasWritableVolume(option) {
- if _, err = ms.vg.AutomaticGrowByType(option, ms.grpcDialOpiton, ms.Topo); err != nil {
+ if _, err = ms.vg.AutomaticGrowByType(option, ms.grpcDialOption, ms.Topo, writableVolumeCount); err != nil {
writeJsonError(w, r, http.StatusInternalServerError,
fmt.Errorf("Cannot grow volume group! %v", err))
return
diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go
index 343bcb8da..2965a4863 100644
--- a/weed/server/master_server_handlers_admin.go
+++ b/weed/server/master_server_handlers_admin.go
@@ -2,7 +2,6 @@ package weed_server
import (
"context"
- "errors"
"fmt"
"math/rand"
"net/http"
@@ -11,20 +10,22 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
- "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend/memory_map"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
"github.com/chrislusf/seaweedfs/weed/topology"
"github.com/chrislusf/seaweedfs/weed/util"
)
func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.Request) {
- collection, ok := ms.Topo.FindCollection(r.FormValue("collection"))
+ collectionName := r.FormValue("collection")
+ collection, ok := ms.Topo.FindCollection(collectionName)
if !ok {
- writeJsonError(w, r, http.StatusBadRequest, fmt.Errorf("collection %s does not exist", r.FormValue("collection")))
+ writeJsonError(w, r, http.StatusBadRequest, fmt.Errorf("collection %s does not exist", collectionName))
return
}
for _, server := range collection.ListVolumeServers() {
- err := operation.WithVolumeServerClient(server.Url(), ms.grpcDialOpiton, func(client volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(server.Url(), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
_, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{
Collection: collection.Name,
})
@@ -35,7 +36,10 @@ func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.R
return
}
}
- ms.Topo.DeleteCollection(r.FormValue("collection"))
+ ms.Topo.DeleteCollection(collectionName)
+
+ w.WriteHeader(http.StatusNoContent)
+ return
}
func (ms *MasterServer) dirStatusHandler(w http.ResponseWriter, r *http.Request) {
@@ -53,11 +57,12 @@ func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Reque
gcThreshold, err = strconv.ParseFloat(gcString, 32)
if err != nil {
glog.V(0).Infof("garbageThreshold %s is not a valid float number: %v", gcString, err)
+ writeJsonError(w, r, http.StatusNotAcceptable, fmt.Errorf("garbageThreshold %s is not a valid float number", gcString))
return
}
}
glog.Infoln("garbageThreshold =", gcThreshold)
- ms.Topo.Vacuum(ms.grpcDialOpiton, gcThreshold, ms.preallocateSize)
+ ms.Topo.Vacuum(ms.grpcDialOption, gcThreshold, ms.preallocateSize)
ms.dirStatusHandler(w, r)
}
@@ -68,17 +73,17 @@ func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request
writeJsonError(w, r, http.StatusNotAcceptable, err)
return
}
- if err == nil {
- if count, err = strconv.Atoi(r.FormValue("count")); err == nil {
- if ms.Topo.FreeSpace() < int64(count*option.ReplicaPlacement.GetCopyCount()) {
- err = fmt.Errorf("only %d volumes left, not enough for %d", ms.Topo.FreeSpace(), count*option.ReplicaPlacement.GetCopyCount())
- } else {
- count, err = ms.vg.GrowByCountAndType(ms.grpcDialOpiton, count, option, ms.Topo)
- }
+
+ if count, err = strconv.Atoi(r.FormValue("count")); err == nil {
+ if ms.Topo.FreeSpace() < int64(count*option.ReplicaPlacement.GetCopyCount()) {
+ err = fmt.Errorf("only %d volumes left, not enough for %d", ms.Topo.FreeSpace(), count*option.ReplicaPlacement.GetCopyCount())
} else {
- err = errors.New("parameter count is not found")
+ count, err = ms.vg.GrowByCountAndType(ms.grpcDialOption, count, option, ms.Topo)
}
+ } else {
+ err = fmt.Errorf("can not parse parameter count %s", r.FormValue("count"))
}
+
if err != nil {
writeJsonError(w, r, http.StatusNotAcceptable, err)
} else {
@@ -95,23 +100,19 @@ func (ms *MasterServer) volumeStatusHandler(w http.ResponseWriter, r *http.Reque
func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request) {
vid, _, _, _, _ := parseURLPath(r.URL.Path)
- volumeId, err := needle.NewVolumeId(vid)
- if err != nil {
- debug("parsing error:", err, r.URL.Path)
- return
- }
collection := r.FormValue("collection")
- machines := ms.Topo.Lookup(collection, volumeId)
- if machines != nil && len(machines) > 0 {
+ location := ms.findVolumeLocation(collection, vid)
+ if location.Error == "" {
+ loc := location.Locations[rand.Intn(len(location.Locations))]
var url string
if r.URL.RawQuery != "" {
- url = util.NormalizeUrl(machines[rand.Intn(len(machines))].PublicUrl) + r.URL.Path + "?" + r.URL.RawQuery
+ url = util.NormalizeUrl(loc.PublicUrl) + r.URL.Path + "?" + r.URL.RawQuery
} else {
- url = util.NormalizeUrl(machines[rand.Intn(len(machines))].PublicUrl) + r.URL.Path
+ url = util.NormalizeUrl(loc.PublicUrl) + r.URL.Path
}
http.Redirect(w, r, url, http.StatusMovedPermanently)
} else {
- writeJsonError(w, r, http.StatusNotFound, fmt.Errorf("volume id %d or collection %s not found", volumeId, collection))
+ writeJsonError(w, r, http.StatusNotFound, fmt.Errorf("volume id %s not found: %s", vid, location.Error))
}
}
@@ -123,13 +124,13 @@ func (ms *MasterServer) selfUrl(r *http.Request) string {
}
func (ms *MasterServer) submitFromMasterServerHandler(w http.ResponseWriter, r *http.Request) {
if ms.Topo.IsLeader() {
- submitForClientHandler(w, r, ms.selfUrl(r), ms.grpcDialOpiton)
+ submitForClientHandler(w, r, ms.selfUrl(r), ms.grpcDialOption)
} else {
masterUrl, err := ms.Topo.Leader()
if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err)
} else {
- submitForClientHandler(w, r, masterUrl, ms.grpcDialOpiton)
+ submitForClientHandler(w, r, masterUrl, ms.grpcDialOption)
}
}
}
@@ -144,7 +145,7 @@ func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGr
if replicationString == "" {
replicationString = ms.option.DefaultReplicaPlacement
}
- replicaPlacement, err := storage.NewReplicaPlacementFromString(replicationString)
+ replicaPlacement, err := super_block.NewReplicaPlacementFromString(replicationString)
if err != nil {
return nil, err
}
@@ -152,6 +153,11 @@ func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGr
if err != nil {
return nil, err
}
+ memoryMapMaxSizeMb, err := memory_map.ReadMemoryMapMaxSizeMb(r.FormValue("memoryMapMaxSizeMb"))
+ if err != nil {
+ return nil, err
+ }
+
preallocate := ms.preallocateSize
if r.FormValue("preallocate") != "" {
preallocate, err = strconv.ParseInt(r.FormValue("preallocate"), 10, 64)
@@ -160,13 +166,14 @@ func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGr
}
}
volumeGrowOption := &topology.VolumeGrowOption{
- Collection: r.FormValue("collection"),
- ReplicaPlacement: replicaPlacement,
- Ttl: ttl,
- Prealloacte: preallocate,
- DataCenter: r.FormValue("dataCenter"),
- Rack: r.FormValue("rack"),
- DataNode: r.FormValue("dataNode"),
+ Collection: r.FormValue("collection"),
+ ReplicaPlacement: replicaPlacement,
+ Ttl: ttl,
+ Prealloacte: preallocate,
+ DataCenter: r.FormValue("dataCenter"),
+ Rack: r.FormValue("rack"),
+ DataNode: r.FormValue("dataNode"),
+ MemoryMapMaxSizeMb: memoryMapMaxSizeMb,
}
return volumeGrowOption, nil
}
diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go
index 88320ed98..53289f1c1 100644
--- a/weed/server/raft_server.go
+++ b/weed/server/raft_server.go
@@ -25,7 +25,7 @@ type RaftServer struct {
*raft.GrpcServer
}
-func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr string, dataDir string, topo *topology.Topology, pulseSeconds int) *RaftServer {
+func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, dataDir string, topo *topology.Topology, pulseSeconds int) *RaftServer {
s := &RaftServer{
peers: peers,
serverAddr: serverAddr,
diff --git a/weed/server/volume_grpc_admin.go b/weed/server/volume_grpc_admin.go
index 35c2508a6..c631d2535 100644
--- a/weed/server/volume_grpc_admin.go
+++ b/weed/server/volume_grpc_admin.go
@@ -35,6 +35,7 @@ func (vs *VolumeServer) AllocateVolume(ctx context.Context, req *volume_server_p
req.Replication,
req.Ttl,
req.Preallocate,
+ req.MemoryMapMaxSizeMb,
)
if err != nil {
diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go
index 731675b48..6038752d2 100644
--- a/weed/server/volume_grpc_client_to_master.go
+++ b/weed/server/volume_grpc_client_to_master.go
@@ -6,6 +6,7 @@ import (
"time"
"github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
"github.com/spf13/viper"
"google.golang.org/grpc"
@@ -90,6 +91,9 @@ func (vs *VolumeServer) doHeartbeat(ctx context.Context, masterNode, masterGrpcA
vs.MetricsAddress = in.GetMetricsAddress()
vs.MetricsIntervalSec = int(in.GetMetricsIntervalSeconds())
}
+ if len(in.StorageBackends) > 0 {
+ backend.LoadFromPbStorageBackends(in.StorageBackends)
+ }
}
}()
diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go
index 8b39146ee..a54a1e343 100644
--- a/weed/server/volume_grpc_copy.go
+++ b/weed/server/volume_grpc_copy.go
@@ -55,11 +55,11 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
// println("source:", volFileInfoResp.String())
// copy ecx file
- if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, volumeFileName, ".idx", false); err != nil {
+ if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, volumeFileName, ".idx", false, false); err != nil {
return err
}
- if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".dat", false); err != nil {
+ if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".dat", false, true); err != nil {
return err
}
@@ -95,15 +95,16 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
}
func (vs *VolumeServer) doCopyFile(ctx context.Context, client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid uint32,
- compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend bool) error {
+ compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend bool, ignoreSourceFileNotFound bool) error {
copyFileClient, err := client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{
- VolumeId: vid,
- Ext: ext,
- CompactionRevision: compactRevision,
- StopOffset: stopOffset,
- Collection: collection,
- IsEcVolume: isEcVolume,
+ VolumeId: vid,
+ Ext: ext,
+ CompactionRevision: compactRevision,
+ StopOffset: stopOffset,
+ Collection: collection,
+ IsEcVolume: isEcVolume,
+ IgnoreSourceFileNotFound: ignoreSourceFileNotFound,
})
if err != nil {
return fmt.Errorf("failed to start copying volume %d %s file: %v", vid, ext, err)
@@ -213,6 +214,9 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v
}
}
if fileName == "" {
+ if req.IgnoreSourceFileNotFound {
+ return nil
+ }
return fmt.Errorf("CopyFile not found ec volume id %d", req.VolumeId)
}
}
@@ -221,6 +225,9 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v
file, err := os.Open(fileName)
if err != nil {
+ if req.IgnoreSourceFileNotFound && err == os.ErrNotExist {
+ return nil
+ }
return err
}
defer file.Close()
@@ -257,7 +264,3 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v
return nil
}
-
-func (vs *VolumeServer) findVolumeOrEcVolumeLocation(volumeId needle.VolumeId) {
-
-}
diff --git a/weed/server/volume_grpc_copy_incremental.go b/weed/server/volume_grpc_copy_incremental.go
index f56fbeef4..6d6c3daa3 100644
--- a/weed/server/volume_grpc_copy_incremental.go
+++ b/weed/server/volume_grpc_copy_incremental.go
@@ -4,9 +4,9 @@ import (
"context"
"fmt"
"io"
- "os"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
)
@@ -30,7 +30,7 @@ func (vs *VolumeServer) VolumeIncrementalCopy(req *volume_server_pb.VolumeIncrem
startOffset := foundOffset.ToAcutalOffset()
buf := make([]byte, 1024*1024*2)
- return sendFileContent(v.DataFile(), buf, startOffset, int64(stopOffset), stream)
+ return sendFileContent(v.DataBackend, buf, startOffset, int64(stopOffset), stream)
}
@@ -47,10 +47,10 @@ func (vs *VolumeServer) VolumeSyncStatus(ctx context.Context, req *volume_server
}
-func sendFileContent(datFile *os.File, buf []byte, startOffset, stopOffset int64, stream volume_server_pb.VolumeServer_VolumeIncrementalCopyServer) error {
+func sendFileContent(datBackend backend.BackendStorageFile, buf []byte, startOffset, stopOffset int64, stream volume_server_pb.VolumeServer_VolumeIncrementalCopyServer) error {
var blockSizeLimit = int64(len(buf))
for i := int64(0); i < stopOffset-startOffset; i += blockSizeLimit {
- n, readErr := datFile.ReadAt(buf, startOffset+i)
+ n, readErr := datBackend.ReadAt(buf, startOffset+i)
if readErr == nil || readErr == io.EOF {
resp := &volume_server_pb.VolumeIncrementalCopyResponse{}
resp.FileContent = buf[:int64(n)]
diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go
index 8140a06f6..4bca9948e 100644
--- a/weed/server/volume_grpc_erasure_coding.go
+++ b/weed/server/volume_grpc_erasure_coding.go
@@ -8,10 +8,12 @@ import (
"math"
"os"
"path"
+ "path/filepath"
"strings"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
@@ -24,7 +26,7 @@ import (
Steps to apply erasure coding to .dat .idx files
0. ensure the volume is readonly
-1. client call VolumeEcShardsGenerate to generate the .ecx and .ec01~.ec14 files
+1. client call VolumeEcShardsGenerate to generate the .ecx and .ec00 ~ .ec13 files
2. client ask master for possible servers to hold the ec files, at least 4 servers
3. client call VolumeEcShardsCopy on above target servers to copy ec files from the source server
4. target servers report the new ec files to the master
@@ -33,7 +35,7 @@ Steps to apply erasure coding to .dat .idx files
*/
-// VolumeEcShardsGenerate generates the .ecx and .ec01 ~ .ec14 files
+// VolumeEcShardsGenerate generates the .ecx and .ec00 ~ .ec13 files
func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_server_pb.VolumeEcShardsGenerateRequest) (*volume_server_pb.VolumeEcShardsGenerateResponse, error) {
v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
@@ -47,19 +49,24 @@ func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_
}
// write .ecx file
- if err := erasure_coding.WriteSortedEcxFile(baseFileName); err != nil {
- return nil, fmt.Errorf("WriteSortedEcxFile %s: %v", baseFileName, err)
+ if err := erasure_coding.WriteSortedFileFromIdx(baseFileName, ".ecx"); err != nil {
+ return nil, fmt.Errorf("WriteSortedFileFromIdx %s: %v", baseFileName, err)
}
- // write .ec01 ~ .ec14 files
+ // write .ec00 ~ .ec13 files
if err := erasure_coding.WriteEcFiles(baseFileName); err != nil {
return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err)
}
+ // write .vif files
+ if err := pb.SaveVolumeInfo(baseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(v.Version())}); err != nil {
+ return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err)
+ }
+
return &volume_server_pb.VolumeEcShardsGenerateResponse{}, nil
}
-// VolumeEcShardsRebuild generates the any of the missing .ec01 ~ .ec14 files
+// VolumeEcShardsRebuild generates the any of the missing .ec00 ~ .ec13 files
func (vs *VolumeServer) VolumeEcShardsRebuild(ctx context.Context, req *volume_server_pb.VolumeEcShardsRebuildRequest) (*volume_server_pb.VolumeEcShardsRebuildResponse, error) {
baseFileName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId))
@@ -68,7 +75,7 @@ func (vs *VolumeServer) VolumeEcShardsRebuild(ctx context.Context, req *volume_s
for _, location := range vs.store.Locations {
if util.FileExists(path.Join(location.Directory, baseFileName+".ecx")) {
- // write .ec01 ~ .ec14 files
+ // write .ec00 ~ .ec13 files
baseFileName = path.Join(location.Directory, baseFileName)
if generatedShardIds, err := erasure_coding.RebuildEcFiles(baseFileName); err != nil {
return nil, fmt.Errorf("RebuildEcFiles %s: %v", baseFileName, err)
@@ -103,23 +110,32 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv
// copy ec data slices
for _, shardId := range req.ShardIds {
- if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, erasure_coding.ToExt(int(shardId)), false); err != nil {
+ if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, erasure_coding.ToExt(int(shardId)), false, false); err != nil {
return err
}
}
- if !req.CopyEcxFile {
+ if req.CopyEcxFile {
+
+ // copy ecx file
+ if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecx", false, false); err != nil {
+ return err
+ }
return nil
}
- // copy ecx file
- if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecx", false); err != nil {
- return err
+ if req.CopyEcjFile {
+ // copy ecj file
+ if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecj", true, true); err != nil {
+ return err
+ }
}
- // copy ecj file
- if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecj", true); err != nil {
- return err
+ if req.CopyVifFile {
+ // copy vif file
+ if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".vif", false, true); err != nil {
+ return err
+ }
}
return nil
@@ -137,6 +153,8 @@ func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_se
baseFilename := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId))
+ glog.V(0).Infof("ec volume %d shard delete %v", req.VolumeId, req.ShardIds)
+
found := false
for _, location := range vs.store.Locations {
if util.FileExists(path.Join(location.Directory, baseFilename+".ecx")) {
@@ -153,21 +171,22 @@ func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_se
return nil, nil
}
- // check whether to delete the ecx file also
+ // check whether to delete the .ecx and .ecj file also
hasEcxFile := false
existingShardCount := 0
+ bName := filepath.Base(baseFilename)
for _, location := range vs.store.Locations {
fileInfos, err := ioutil.ReadDir(location.Directory)
if err != nil {
continue
}
for _, fileInfo := range fileInfos {
- if fileInfo.Name() == baseFilename+".ecx" {
+ if fileInfo.Name() == bName+".ecx" || fileInfo.Name() == bName+".ecj" {
hasEcxFile = true
continue
}
- if strings.HasPrefix(fileInfo.Name(), baseFilename+".ec") {
+ if strings.HasPrefix(fileInfo.Name(), bName+".ec") {
existingShardCount++
}
}
@@ -252,9 +271,14 @@ func (vs *VolumeServer) VolumeEcShardRead(req *volume_server_pb.VolumeEcShardRea
startOffset, bytesToRead := req.Offset, req.Size
for bytesToRead > 0 {
- bytesread, err := ecShard.ReadAt(buffer, startOffset)
+ // min of bytesToRead and bufSize
+ bufferSize := bufSize
+ if bufferSize > bytesToRead {
+ bufferSize = bytesToRead
+ }
+ bytesread, err := ecShard.ReadAt(buffer[0:bufferSize], startOffset)
- // println(fileName, "read", bytesread, "bytes, with target", bytesToRead)
+ // println("read", ecShard.FileName(), "startOffset", startOffset, bytesread, "bytes, with target", bufferSize)
if bytesread > 0 {
if int64(bytesread) > bytesToRead {
@@ -268,6 +292,7 @@ func (vs *VolumeServer) VolumeEcShardRead(req *volume_server_pb.VolumeEcShardRea
return err
}
+ startOffset += int64(bytesread)
bytesToRead -= int64(bytesread)
}
@@ -311,3 +336,35 @@ func (vs *VolumeServer) VolumeEcBlobDelete(ctx context.Context, req *volume_serv
return resp, nil
}
+
+// VolumeEcShardsToVolume generates the .idx, .dat files from .ecx, .ecj and .ec01 ~ .ec14 files
+func (vs *VolumeServer) VolumeEcShardsToVolume(ctx context.Context, req *volume_server_pb.VolumeEcShardsToVolumeRequest) (*volume_server_pb.VolumeEcShardsToVolumeResponse, error) {
+
+ v, found := vs.store.FindEcVolume(needle.VolumeId(req.VolumeId))
+ if !found {
+ return nil, fmt.Errorf("ec volume %d not found", req.VolumeId)
+ }
+ baseFileName := v.FileName()
+
+ if v.Collection != req.Collection {
+ return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
+ }
+
+ // calculate .dat file size
+ datFileSize, err := erasure_coding.FindDatFileSize(baseFileName)
+ if err != nil {
+ return nil, fmt.Errorf("FindDatFileSize %s: %v", baseFileName, err)
+ }
+
+ // write .dat file from .ec00 ~ .ec09 files
+ if err := erasure_coding.WriteDatFile(baseFileName, datFileSize); err != nil {
+ return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err)
+ }
+
+ // write .idx file from .ecx and .ecj files
+ if err := erasure_coding.WriteIdxFileFromEcIndex(baseFileName); err != nil {
+ return nil, fmt.Errorf("WriteIdxFileFromEcIndex %s: %v", baseFileName, err)
+ }
+
+ return &volume_server_pb.VolumeEcShardsToVolumeResponse{}, nil
+}
diff --git a/weed/server/volume_grpc_query.go b/weed/server/volume_grpc_query.go
new file mode 100644
index 000000000..767e28e7b
--- /dev/null
+++ b/weed/server/volume_grpc_query.go
@@ -0,0 +1,69 @@
+package weed_server
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/query/json"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/tidwall/gjson"
+)
+
+func (vs *VolumeServer) Query(req *volume_server_pb.QueryRequest, stream volume_server_pb.VolumeServer_QueryServer) error {
+
+ for _, fid := range req.FromFileIds {
+
+ vid, id_cookie, err := operation.ParseFileId(fid)
+ if err != nil {
+ glog.V(0).Infof("volume query failed to parse fid %s: %v", fid, err)
+ return err
+ }
+
+ n := new(needle.Needle)
+ volumeId, _ := needle.NewVolumeId(vid)
+ n.ParsePath(id_cookie)
+
+ cookie := n.Cookie
+ if _, err := vs.store.ReadVolumeNeedle(volumeId, n); err != nil {
+ glog.V(0).Infof("volume query failed to read fid %s: %v", fid, err)
+ return err
+ }
+
+ if n.Cookie != cookie {
+ glog.V(0).Infof("volume query failed to read fid cookie %s: %v", fid, err)
+ return err
+ }
+
+ if req.InputSerialization.CsvInput != nil {
+
+ }
+
+ if req.InputSerialization.JsonInput != nil {
+
+ stripe := &volume_server_pb.QueriedStripe{
+ Records: nil,
+ }
+
+ filter := json.Query{
+ Field: req.Filter.Field,
+ Op: req.Filter.Operand,
+ Value: req.Filter.Value,
+ }
+ gjson.ForEachLine(string(n.Data), func(line gjson.Result) bool {
+ passedFilter, values := json.QueryJson(line.Raw, req.Selections, filter)
+ if !passedFilter {
+ return true
+ }
+ stripe.Records = json.ToJson(stripe.Records, req.Selections, values)
+ return true
+ })
+ err = stream.Send(stripe)
+ if err != nil {
+ return err
+ }
+ }
+
+ }
+
+ return nil
+}
diff --git a/weed/server/volume_grpc_tail.go b/weed/server/volume_grpc_tail.go
index 34c55a599..c26d6ed8f 100644
--- a/weed/server/volume_grpc_tail.go
+++ b/weed/server/volume_grpc_tail.go
@@ -10,6 +10,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
)
func (vs *VolumeServer) VolumeTailSender(req *volume_server_pb.VolumeTailSenderRequest, stream volume_server_pb.VolumeServer_VolumeTailSenderServer) error {
@@ -67,34 +68,13 @@ func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeTailSenderServe
return lastTimestampNs, sendErr
}
- err = storage.ScanVolumeFileNeedleFrom(v.Version(), v.DataFile(), foundOffset.ToAcutalOffset(), func(needleHeader, needleBody []byte, needleAppendAtNs uint64) error {
-
- isLastChunk := false
-
- // need to send body by chunks
- for i := 0; i < len(needleBody); i += BufferSizeLimit {
- stopOffset := i + BufferSizeLimit
- if stopOffset >= len(needleBody) {
- isLastChunk = true
- stopOffset = len(needleBody)
- }
-
- sendErr := stream.Send(&volume_server_pb.VolumeTailSenderResponse{
- NeedleHeader: needleHeader,
- NeedleBody: needleBody[i:stopOffset],
- IsLastChunk: isLastChunk,
- })
- if sendErr != nil {
- return sendErr
- }
- }
-
- lastProcessedTimestampNs = needleAppendAtNs
- return nil
+ scanner := &VolumeFileScanner4Tailing{
+ stream: stream,
+ }
- })
+ err = storage.ScanVolumeFileFrom(v.Version(), v.DataBackend, foundOffset.ToAcutalOffset(), scanner)
- return
+ return scanner.lastProcessedTimestampNs, err
}
@@ -115,3 +95,42 @@ func (vs *VolumeServer) VolumeTailReceiver(ctx context.Context, req *volume_serv
})
}
+
+// generate the volume idx
+type VolumeFileScanner4Tailing struct {
+ stream volume_server_pb.VolumeServer_VolumeTailSenderServer
+ lastProcessedTimestampNs uint64
+}
+
+func (scanner *VolumeFileScanner4Tailing) VisitSuperBlock(superBlock super_block.SuperBlock) error {
+ return nil
+
+}
+func (scanner *VolumeFileScanner4Tailing) ReadNeedleBody() bool {
+ return true
+}
+
+func (scanner *VolumeFileScanner4Tailing) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error {
+ isLastChunk := false
+
+ // need to send body by chunks
+ for i := 0; i < len(needleBody); i += BufferSizeLimit {
+ stopOffset := i + BufferSizeLimit
+ if stopOffset >= len(needleBody) {
+ isLastChunk = true
+ stopOffset = len(needleBody)
+ }
+
+ sendErr := scanner.stream.Send(&volume_server_pb.VolumeTailSenderResponse{
+ NeedleHeader: needleHeader,
+ NeedleBody: needleBody[i:stopOffset],
+ IsLastChunk: isLastChunk,
+ })
+ if sendErr != nil {
+ return sendErr
+ }
+ }
+
+ scanner.lastProcessedTimestampNs = n.AppendAtNs
+ return nil
+}
diff --git a/weed/server/volume_grpc_tier_download.go b/weed/server/volume_grpc_tier_download.go
new file mode 100644
index 000000000..7b3982e40
--- /dev/null
+++ b/weed/server/volume_grpc_tier_download.go
@@ -0,0 +1,85 @@
+package weed_server
+
+import (
+ "fmt"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+)
+
+// VolumeTierMoveDatFromRemote copy dat file from a remote tier to local volume server
+func (vs *VolumeServer) VolumeTierMoveDatFromRemote(req *volume_server_pb.VolumeTierMoveDatFromRemoteRequest, stream volume_server_pb.VolumeServer_VolumeTierMoveDatFromRemoteServer) error {
+
+ // find existing volume
+ v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
+ if v == nil {
+ return fmt.Errorf("volume %d not found", req.VolumeId)
+ }
+
+ // verify the collection
+ if v.Collection != req.Collection {
+ return fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
+ }
+
+ // locate the disk file
+ storageName, storageKey := v.RemoteStorageNameKey()
+ if storageName == "" || storageKey == "" {
+ return fmt.Errorf("volume %d is already on local disk", req.VolumeId)
+ }
+
+ // check whether the local .dat already exists
+ _, ok := v.DataBackend.(*backend.DiskFile)
+ if ok {
+ return fmt.Errorf("volume %d is already on local disk", req.VolumeId)
+ }
+
+ // check valid storage backend type
+ backendStorage, found := backend.BackendStorages[storageName]
+ if !found {
+ var keys []string
+ for key := range backend.BackendStorages {
+ keys = append(keys, key)
+ }
+ return fmt.Errorf("remote storage %s not found from suppported: %v", storageName, keys)
+ }
+
+ startTime := time.Now()
+ fn := func(progressed int64, percentage float32) error {
+ now := time.Now()
+ if now.Sub(startTime) < time.Second {
+ return nil
+ }
+ startTime = now
+ return stream.Send(&volume_server_pb.VolumeTierMoveDatFromRemoteResponse{
+ Processed: progressed,
+ ProcessedPercentage: percentage,
+ })
+ }
+ // copy the data file
+ _, err := backendStorage.DownloadFile(v.FileName()+".dat", storageKey, fn)
+ if err != nil {
+ return fmt.Errorf("backend %s copy file %s: %v", storageName, v.FileName()+".dat", err)
+ }
+
+ if req.KeepRemoteDatFile {
+ return nil
+ }
+
+ // remove remote file
+ if err := backendStorage.DeleteFile(storageKey); err != nil {
+ return fmt.Errorf("volume %d fail to delete remote file %s: %v", v.Id, storageKey, err)
+ }
+
+ // forget remote file
+ v.GetVolumeInfo().Files = v.GetVolumeInfo().Files[1:]
+ if err := v.SaveVolumeInfo(); err != nil {
+ return fmt.Errorf("volume %d fail to save remote file info: %v", v.Id, err)
+ }
+
+ v.DataBackend.Close()
+ v.DataBackend = nil
+
+ return nil
+}
diff --git a/weed/server/volume_grpc_tier_upload.go b/weed/server/volume_grpc_tier_upload.go
new file mode 100644
index 000000000..c9694df59
--- /dev/null
+++ b/weed/server/volume_grpc_tier_upload.go
@@ -0,0 +1,100 @@
+package weed_server
+
+import (
+ "fmt"
+ "os"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+)
+
+// VolumeTierMoveDatToRemote copy dat file to a remote tier
+func (vs *VolumeServer) VolumeTierMoveDatToRemote(req *volume_server_pb.VolumeTierMoveDatToRemoteRequest, stream volume_server_pb.VolumeServer_VolumeTierMoveDatToRemoteServer) error {
+
+ // find existing volume
+ v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
+ if v == nil {
+ return fmt.Errorf("volume %d not found", req.VolumeId)
+ }
+
+ // verify the collection
+ if v.Collection != req.Collection {
+ return fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
+ }
+
+ // locate the disk file
+ diskFile, ok := v.DataBackend.(*backend.DiskFile)
+ if !ok {
+ return fmt.Errorf("volume %d is not on local disk", req.VolumeId)
+ }
+
+ // check valid storage backend type
+ backendStorage, found := backend.BackendStorages[req.DestinationBackendName]
+ if !found {
+ var keys []string
+ for key := range backend.BackendStorages {
+ keys = append(keys, key)
+ }
+ return fmt.Errorf("destination %s not found, suppported: %v", req.DestinationBackendName, keys)
+ }
+
+ // check whether the existing backend storage is the same as requested
+ // if same, skip
+ backendType, backendId := backend.BackendNameToTypeId(req.DestinationBackendName)
+ for _, remoteFile := range v.GetVolumeInfo().GetFiles() {
+ if remoteFile.BackendType == backendType && remoteFile.BackendId == backendId {
+ return fmt.Errorf("destination %s already exists", req.DestinationBackendName)
+ }
+ }
+
+ startTime := time.Now()
+ fn := func(progressed int64, percentage float32) error {
+ now := time.Now()
+ if now.Sub(startTime) < time.Second {
+ return nil
+ }
+ startTime = now
+ return stream.Send(&volume_server_pb.VolumeTierMoveDatToRemoteResponse{
+ Processed: progressed,
+ ProcessedPercentage: percentage,
+ })
+ }
+
+ // remember the file original source
+ attributes := make(map[string]string)
+ attributes["volumeId"] = v.Id.String()
+ attributes["collection"] = v.Collection
+ attributes["ext"] = ".dat"
+ // copy the data file
+ key, size, err := backendStorage.CopyFile(diskFile.File, attributes, fn)
+ if err != nil {
+ return fmt.Errorf("backend %s copy file %s: %v", req.DestinationBackendName, diskFile.Name(), err)
+ }
+
+ // save the remote file to volume tier info
+ v.GetVolumeInfo().Files = append(v.GetVolumeInfo().GetFiles(), &volume_server_pb.RemoteFile{
+ BackendType: backendType,
+ BackendId: backendId,
+ Key: key,
+ Offset: 0,
+ FileSize: uint64(size),
+ ModifiedTime: uint64(time.Now().Unix()),
+ Extension: ".dat",
+ })
+
+ if err := v.SaveVolumeInfo(); err != nil {
+ return fmt.Errorf("volume %d fail to save remote file info: %v", v.Id, err)
+ }
+
+ if err := v.LoadRemoteFile(); err != nil {
+ return fmt.Errorf("volume %d fail to load remote file: %v", v.Id, err)
+ }
+
+ if !req.KeepLocalDatFile {
+ os.Remove(v.FileName() + ".dat")
+ }
+
+ return nil
+}
diff --git a/weed/server/volume_server_handlers_admin.go b/weed/server/volume_server_handlers_admin.go
index 25b6582f7..1938a34c4 100644
--- a/weed/server/volume_server_handlers_admin.go
+++ b/weed/server/volume_server_handlers_admin.go
@@ -12,7 +12,7 @@ import (
func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) {
m := make(map[string]interface{})
m["Version"] = util.VERSION
- m["Volumes"] = vs.store.Status()
+ m["Volumes"] = vs.store.VolumeInfos()
writeJsonQuiet(w, r, http.StatusOK, m)
}
diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go
index f30ffefaf..cd11356b9 100644
--- a/weed/server/volume_server_handlers_read.go
+++ b/weed/server/volume_server_handlers_read.go
@@ -4,6 +4,7 @@ import (
"bytes"
"context"
"errors"
+ "fmt"
"io"
"mime"
"mime/multipart"
@@ -66,7 +67,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
glog.V(2).Infoln("volume", volumeId, "found on", lookupResult, "error", err)
if err == nil && len(lookupResult.Locations) > 0 {
u, _ := url.Parse(util.NormalizeUrl(lookupResult.Locations[0].PublicUrl))
- u.Path = r.URL.Path
+ u.Path = fmt.Sprintf("%s/%s,%s", u.Path, vid, fid)
arg := url.Values{}
if c := r.FormValue("collection"); c != "" {
arg.Set("collection", c)
diff --git a/weed/server/volume_server_handlers_ui.go b/weed/server/volume_server_handlers_ui.go
index 852f0b751..8d35c9c8b 100644
--- a/weed/server/volume_server_handlers_ui.go
+++ b/weed/server/volume_server_handlers_ui.go
@@ -8,6 +8,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
ui "github.com/chrislusf/seaweedfs/weed/server/volume_server_ui"
"github.com/chrislusf/seaweedfs/weed/stats"
+ "github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -20,19 +21,30 @@ func (vs *VolumeServer) uiStatusHandler(w http.ResponseWriter, r *http.Request)
ds = append(ds, stats.NewDiskStatus(dir))
}
}
+ volumeInfos := vs.store.VolumeInfos()
+ var normalVolumeInfos, remoteVolumeInfos []*storage.VolumeInfo
+ for _, vinfo := range volumeInfos {
+ if vinfo.IsRemote() {
+ remoteVolumeInfos = append(remoteVolumeInfos, vinfo)
+ } else {
+ normalVolumeInfos = append(normalVolumeInfos, vinfo)
+ }
+ }
args := struct {
- Version string
- Masters []string
- Volumes interface{}
- EcVolumes interface{}
- DiskStatuses interface{}
- Stats interface{}
- Counters *stats.ServerStats
+ Version string
+ Masters []string
+ Volumes interface{}
+ EcVolumes interface{}
+ RemoteVolumes interface{}
+ DiskStatuses interface{}
+ Stats interface{}
+ Counters *stats.ServerStats
}{
util.VERSION,
vs.SeedMasterNodes,
- vs.store.Status(),
+ normalVolumeInfos,
vs.store.EcVolumes(),
+ remoteVolumeInfos,
ds,
infos,
serverStats,
diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go
index db8fcb555..05e21612b 100644
--- a/weed/server/volume_server_handlers_write.go
+++ b/weed/server/volume_server_handlers_write.go
@@ -51,10 +51,14 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
ret := operation.UploadResult{}
_, isUnchanged, writeError := topology.ReplicatedWrite(vs.GetMaster(), vs.store, volumeId, needle, r)
- httpStatus := http.StatusCreated
- if isUnchanged {
- httpStatus = http.StatusNotModified
+
+ // http 304 status code does not allow body
+ if writeError == nil && isUnchanged {
+ w.WriteHeader(http.StatusNotModified)
+ return
}
+
+ httpStatus := http.StatusCreated
if writeError != nil {
httpStatus = http.StatusInternalServerError
ret.Error = writeError.Error()
diff --git a/weed/server/volume_server_ui/templates.go b/weed/server/volume_server_ui/templates.go
index eafc0aaeb..81496b1de 100644
--- a/weed/server/volume_server_ui/templates.go
+++ b/weed/server/volume_server_ui/templates.go
@@ -107,10 +107,11 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC
<tr>
<th>Id</th>
<th>Collection</th>
- <th>Size</th>
+ <th>Data Size</th>
<th>Files</th>
<th>Trash</th>
<th>TTL</th>
+ <th>ReadOnly</th>
</tr>
</thead>
<tbody>
@@ -122,6 +123,37 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC
<td>{{ .FileCount }}</td>
<td>{{ .DeleteCount }} / {{.DeletedByteCount}} Bytes</td>
<td>{{ .Ttl }}</td>
+ <td>{{ .ReadOnly }}</td>
+ </tr>
+ {{ end }}
+ </tbody>
+ </table>
+ </div>
+
+ <div class="row">
+ <h2>Remote Volumes</h2>
+ <table class="table table-striped">
+ <thead>
+ <tr>
+ <th>Id</th>
+ <th>Collection</th>
+ <th>Size</th>
+ <th>Files</th>
+ <th>Trash</th>
+ <th>Remote</th>
+ <th>Key</th>
+ </tr>
+ </thead>
+ <tbody>
+ {{ range .RemoteVolumes }}
+ <tr>
+ <td><code>{{ .Id }}</code></td>
+ <td>{{ .Collection }}</td>
+ <td>{{ .Size }} Bytes</td>
+ <td>{{ .FileCount }}</td>
+ <td>{{ .DeleteCount }} / {{.DeletedByteCount}} Bytes</td>
+ <td>{{ .RemoteStorageName }}</td>
+ <td>{{ .RemoteStorageKey }}</td>
</tr>
{{ end }}
</tbody>
diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go
index 151b48a78..abd0b66eb 100644
--- a/weed/server/webdav_server.go
+++ b/weed/server/webdav_server.go
@@ -10,16 +10,18 @@ import (
"strings"
"time"
+ "golang.org/x/net/webdav"
+ "google.golang.org/grpc"
+
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
- "golang.org/x/net/webdav"
- "google.golang.org/grpc"
+
+ "github.com/spf13/viper"
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/security"
- "github.com/spf13/viper"
)
type WebDavOption struct {
@@ -163,7 +165,7 @@ func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm
func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, flag int, perm os.FileMode) (webdav.File, error) {
- glog.V(2).Infof("WebDavFileSystem.OpenFile %v", fullFilePath)
+ glog.V(2).Infof("WebDavFileSystem.OpenFile %v %x", fullFilePath, flag)
var err error
if fullFilePath, err = clearName(fullFilePath); err != nil {
@@ -175,12 +177,6 @@ func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, f
if strings.HasSuffix(fullFilePath, "/") {
return nil, os.ErrInvalid
}
- // based directory should be exists.
- dir, _ := path.Split(fullFilePath)
- _, err := fs.stat(ctx, dir)
- if err != nil {
- return nil, os.ErrInvalid
- }
_, err = fs.stat(ctx, fullFilePath)
if err == nil {
if flag&os.O_EXCL != 0 {
@@ -412,7 +408,7 @@ func (f *WebDavFile) Write(buf []byte) (int, error) {
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
bufReader := bytes.NewReader(buf)
- uploadResult, err := operation.Upload(fileUrl, f.name, bufReader, false, "application/octet-stream", nil, auth)
+ uploadResult, err := operation.Upload(fileUrl, f.name, bufReader, false, "", nil, auth)
if err != nil {
glog.V(0).Infof("upload data %v to %s: %v", f.name, fileUrl, err)
return 0, fmt.Errorf("upload data: %v", err)
@@ -448,9 +444,11 @@ func (f *WebDavFile) Write(buf []byte) (int, error) {
return nil
})
- if err != nil {
+ if err == nil {
+ glog.V(3).Infof("WebDavFileSystem.Write %v: written [%d,%d)", f.name, f.off, f.off+int64(len(buf)))
f.off += int64(len(buf))
}
+
return len(buf), err
}
@@ -494,10 +492,13 @@ func (f *WebDavFile) Read(p []byte) (readSize int, err error) {
}
readSize = int(totalRead)
+ glog.V(3).Infof("WebDavFileSystem.Read %v: [%d,%d)", f.name, f.off, f.off+totalRead)
+
f.off += totalRead
if readSize == 0 {
return 0, io.EOF
}
+
return
}
@@ -511,7 +512,7 @@ func (f *WebDavFile) Readdir(count int) (ret []os.FileInfo, err error) {
dir = dir[:len(dir)-1]
}
- err = filer2.ReadDirAllEntries(ctx, f.fs, dir, func(entry *filer_pb.Entry) {
+ err = filer2.ReadDirAllEntries(ctx, f.fs, dir, "", func(entry *filer_pb.Entry, isLast bool) {
fi := FileInfo{
size: int64(filer2.TotalSize(entry.GetChunks())),
name: entry.Name,