aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
authorHongyanShen <763987993@qq.com>2020-03-11 12:55:24 +0800
committerGitHub <noreply@github.com>2020-03-11 12:55:24 +0800
commit03529fc0c29072f6f26e11ffbd7229cf92dc71ce (patch)
treeed8833386a712c850dcef0815509774681a6ab56 /weed/server
parent0fca1ae776783b37481549df40f477b7d9248d3c (diff)
parent60f5f05c78a2918d5219c925cea5847759281a2c (diff)
downloadseaweedfs-03529fc0c29072f6f26e11ffbd7229cf92dc71ce.tar.xz
seaweedfs-03529fc0c29072f6f26e11ffbd7229cf92dc71ce.zip
Merge pull request #1 from chrislusf/master
sync
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/common.go130
-rw-r--r--weed/server/filer_grpc_server.go131
-rw-r--r--weed/server/filer_grpc_server_rename.go2
-rw-r--r--weed/server/filer_server.go26
-rw-r--r--weed/server/filer_server_handlers_read.go190
-rw-r--r--weed/server/filer_server_handlers_write.go146
-rw-r--r--weed/server/filer_server_handlers_write_autochunk.go123
-rw-r--r--weed/server/filer_server_handlers_write_cipher.go98
-rw-r--r--weed/server/filer_ui/breadcrumb.go10
-rw-r--r--weed/server/filer_ui/templates.go13
-rw-r--r--weed/server/master_grpc_server.go104
-rw-r--r--weed/server/master_grpc_server_collection.go1
-rw-r--r--weed/server/master_grpc_server_volume.go7
-rw-r--r--weed/server/master_server.go29
-rw-r--r--weed/server/master_server_handlers_admin.go6
-rw-r--r--weed/server/msg_broker_grpc_server.go23
-rw-r--r--weed/server/msg_broker_server.go121
-rw-r--r--weed/server/raft_server.go11
-rw-r--r--weed/server/volume_grpc_admin.go55
-rw-r--r--weed/server/volume_grpc_client_to_master.go24
-rw-r--r--weed/server/volume_grpc_copy.go45
-rw-r--r--weed/server/volume_grpc_copy_incremental.go2
-rw-r--r--weed/server/volume_grpc_erasure_coding.go96
-rw-r--r--weed/server/volume_grpc_file.go129
-rw-r--r--weed/server/volume_grpc_tail.go3
-rw-r--r--weed/server/volume_grpc_tier_download.go85
-rw-r--r--weed/server/volume_grpc_tier_upload.go100
-rw-r--r--weed/server/volume_server.go20
-rw-r--r--weed/server/volume_server_handlers_admin.go9
-rw-r--r--weed/server/volume_server_handlers_read.go126
-rw-r--r--weed/server/volume_server_handlers_ui.go28
-rw-r--r--weed/server/volume_server_handlers_write.go17
-rw-r--r--weed/server/volume_server_ui/templates.go74
-rw-r--r--weed/server/webdav_server.go102
34 files changed, 1438 insertions, 648 deletions
diff --git a/weed/server/common.go b/weed/server/common.go
index d50c283f2..e06142d7f 100644
--- a/weed/server/common.go
+++ b/weed/server/common.go
@@ -1,27 +1,29 @@
package weed_server
import (
- "bytes"
"encoding/json"
"errors"
"fmt"
+ "io"
+ "mime/multipart"
"net/http"
"path/filepath"
"strconv"
"strings"
"time"
- "github.com/chrislusf/seaweedfs/weed/storage/needle"
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/stats"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
- _ "github.com/chrislusf/seaweedfs/weed/statik"
"github.com/gorilla/mux"
statik "github.com/rakyll/statik/fs"
+
+ _ "github.com/chrislusf/seaweedfs/weed/statik"
)
var serverStats *stats.ServerStats
@@ -76,7 +78,8 @@ func writeJson(w http.ResponseWriter, r *http.Request, httpStatus int, obj inter
// wrapper for writeJson - just logs errors
func writeJsonQuiet(w http.ResponseWriter, r *http.Request, httpStatus int, obj interface{}) {
if err := writeJson(w, r, httpStatus, obj); err != nil {
- glog.V(0).Infof("error writing JSON %+v status %d: %v", obj, httpStatus, err)
+ glog.V(0).Infof("error writing JSON status %d: %v", httpStatus, err)
+ glog.V(1).Infof("JSON content: %+v", obj)
}
}
func writeJsonError(w http.ResponseWriter, r *http.Request, httpStatus int, err error) {
@@ -97,13 +100,13 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st
}
debug("parsing upload file...")
- fname, data, mimeType, pairMap, isGzipped, originalDataSize, lastModified, _, _, pe := needle.ParseUpload(r)
+ pu, pe := needle.ParseUpload(r, 256*1024*1024)
if pe != nil {
writeJsonError(w, r, http.StatusBadRequest, pe)
return
}
- debug("assigning file id for", fname)
+ debug("assigning file id for", pu.FileName)
r.ParseForm()
count := uint64(1)
if r.FormValue("count") != "" {
@@ -115,6 +118,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st
}
ar := &operation.VolumeAssignRequest{
Count: count,
+ DataCenter: r.FormValue("dataCenter"),
Replication: r.FormValue("replication"),
Collection: r.FormValue("collection"),
Ttl: r.FormValue("ttl"),
@@ -126,21 +130,21 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st
}
url := "http://" + assignResult.Url + "/" + assignResult.Fid
- if lastModified != 0 {
- url = url + "?ts=" + strconv.FormatUint(lastModified, 10)
+ if pu.ModifiedTime != 0 {
+ url = url + "?ts=" + strconv.FormatUint(pu.ModifiedTime, 10)
}
debug("upload file to store", url)
- uploadResult, err := operation.Upload(url, fname, bytes.NewReader(data), isGzipped, mimeType, pairMap, assignResult.Auth)
+ uploadResult, err := operation.UploadData(url, pu.FileName, false, pu.Data, pu.IsGzipped, pu.MimeType, pu.PairMap, assignResult.Auth)
if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err)
return
}
- m["fileName"] = fname
+ m["fileName"] = pu.FileName
m["fid"] = assignResult.Fid
m["fileUrl"] = assignResult.PublicUrl + "/" + assignResult.Fid
- m["size"] = originalDataSize
+ m["size"] = pu.OriginalDataSize
m["eTag"] = uploadResult.ETag
writeJsonQuiet(w, r, http.StatusCreated, m)
return
@@ -207,3 +211,107 @@ func handleStaticResources2(r *mux.Router) {
r.Handle("/favicon.ico", http.FileServer(statikFS))
r.PathPrefix("/seaweedfsstatic/").Handler(http.StripPrefix("/seaweedfsstatic", http.FileServer(statikFS)))
}
+
+func adjustHeadersAfterHEAD(w http.ResponseWriter, r *http.Request, filename string) {
+ if filename != "" {
+ contentDisposition := "inline"
+ if r.FormValue("dl") != "" {
+ if dl, _ := strconv.ParseBool(r.FormValue("dl")); dl {
+ contentDisposition = "attachment"
+ }
+ }
+ w.Header().Set("Content-Disposition", contentDisposition+`; filename="`+fileNameEscaper.Replace(filename)+`"`)
+ }
+}
+
+func processRangeRequst(r *http.Request, w http.ResponseWriter, totalSize int64, mimeType string, writeFn func(writer io.Writer, offset int64, size int64) error) {
+ rangeReq := r.Header.Get("Range")
+
+ if rangeReq == "" {
+ w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10))
+ if err := writeFn(w, 0, totalSize); err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ return
+ }
+
+ //the rest is dealing with partial content request
+ //mostly copy from src/pkg/net/http/fs.go
+ ranges, err := parseRange(rangeReq, totalSize)
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusRequestedRangeNotSatisfiable)
+ return
+ }
+ if sumRangesSize(ranges) > totalSize {
+ // The total number of bytes in all the ranges
+ // is larger than the size of the file by
+ // itself, so this is probably an attack, or a
+ // dumb client. Ignore the range request.
+ return
+ }
+ if len(ranges) == 0 {
+ return
+ }
+ if len(ranges) == 1 {
+ // RFC 2616, Section 14.16:
+ // "When an HTTP message includes the content of a single
+ // range (for example, a response to a request for a
+ // single range, or to a request for a set of ranges
+ // that overlap without any holes), this content is
+ // transmitted with a Content-Range header, and a
+ // Content-Length header showing the number of bytes
+ // actually transferred.
+ // ...
+ // A response to a request for a single range MUST NOT
+ // be sent using the multipart/byteranges media type."
+ ra := ranges[0]
+ w.Header().Set("Content-Length", strconv.FormatInt(ra.length, 10))
+ w.Header().Set("Content-Range", ra.contentRange(totalSize))
+ w.WriteHeader(http.StatusPartialContent)
+
+ err = writeFn(w, ra.start, ra.length)
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ return
+ }
+
+ // process multiple ranges
+ for _, ra := range ranges {
+ if ra.start > totalSize {
+ http.Error(w, "Out of Range", http.StatusRequestedRangeNotSatisfiable)
+ return
+ }
+ }
+ sendSize := rangesMIMESize(ranges, mimeType, totalSize)
+ pr, pw := io.Pipe()
+ mw := multipart.NewWriter(pw)
+ w.Header().Set("Content-Type", "multipart/byteranges; boundary="+mw.Boundary())
+ sendContent := pr
+ defer pr.Close() // cause writing goroutine to fail and exit if CopyN doesn't finish.
+ go func() {
+ for _, ra := range ranges {
+ part, e := mw.CreatePart(ra.mimeHeader(mimeType, totalSize))
+ if e != nil {
+ pw.CloseWithError(e)
+ return
+ }
+ if e = writeFn(part, ra.start, ra.length); e != nil {
+ pw.CloseWithError(e)
+ return
+ }
+ }
+ mw.Close()
+ pw.Close()
+ }()
+ if w.Header().Get("Content-Encoding") == "" {
+ w.Header().Set("Content-Length", strconv.FormatInt(sendSize, 10))
+ }
+ w.WriteHeader(http.StatusPartialContent)
+ if _, err := io.CopyN(w, sendContent, sendSize); err != nil {
+ http.Error(w, "Internal Error", http.StatusInternalServerError)
+ return
+ }
+}
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go
index feab11c79..b904c1393 100644
--- a/weed/server/filer_grpc_server.go
+++ b/weed/server/filer_grpc_server.go
@@ -19,7 +19,11 @@ import (
func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.LookupDirectoryEntryRequest) (*filer_pb.LookupDirectoryEntryResponse, error) {
entry, err := fs.filer.FindEntry(ctx, filer2.FullPath(filepath.ToSlash(filepath.Join(req.Directory, req.Name))))
+ if err == filer_pb.ErrNotFound {
+ return &filer_pb.LookupDirectoryEntryResponse{}, nil
+ }
if err != nil {
+ glog.V(3).Infof("LookupDirectoryEntry %s: %+v, ", filepath.Join(req.Directory, req.Name), err)
return nil, err
}
@@ -29,32 +33,33 @@ func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.L
IsDirectory: entry.IsDirectory(),
Attributes: filer2.EntryAttributeToPb(entry),
Chunks: entry.Chunks,
+ Extended: entry.Extended,
},
}, nil
}
-func (fs *FilerServer) ListEntries(ctx context.Context, req *filer_pb.ListEntriesRequest) (*filer_pb.ListEntriesResponse, error) {
+func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream filer_pb.SeaweedFiler_ListEntriesServer) error {
limit := int(req.Limit)
if limit == 0 {
limit = fs.option.DirListingLimit
}
- paginationLimit := 1024
+ paginationLimit := filer2.PaginationSize
if limit < paginationLimit {
paginationLimit = limit
}
- resp := &filer_pb.ListEntriesResponse{}
lastFileName := req.StartFromFileName
includeLastFile := req.InclusiveStartFrom
for limit > 0 {
- entries, err := fs.filer.ListDirectoryEntries(ctx, filer2.FullPath(req.Directory), lastFileName, includeLastFile, paginationLimit)
+ entries, err := fs.filer.ListDirectoryEntries(stream.Context(), filer2.FullPath(req.Directory), lastFileName, includeLastFile, paginationLimit)
+
if err != nil {
- return nil, err
+ return err
}
if len(entries) == 0 {
- return resp, nil
+ return nil
}
includeLastFile = false
@@ -69,15 +74,21 @@ func (fs *FilerServer) ListEntries(ctx context.Context, req *filer_pb.ListEntrie
}
}
- resp.Entries = append(resp.Entries, &filer_pb.Entry{
- Name: entry.Name(),
- IsDirectory: entry.IsDirectory(),
- Chunks: entry.Chunks,
- Attributes: filer2.EntryAttributeToPb(entry),
- })
+ if err := stream.Send(&filer_pb.ListEntriesResponse{
+ Entry: &filer_pb.Entry{
+ Name: entry.Name(),
+ IsDirectory: entry.IsDirectory(),
+ Chunks: entry.Chunks,
+ Attributes: filer2.EntryAttributeToPb(entry),
+ Extended: entry.Extended,
+ },
+ }); err != nil {
+ return err
+ }
+
limit--
if limit == 0 {
- return resp, nil
+ return nil
}
}
@@ -87,7 +98,7 @@ func (fs *FilerServer) ListEntries(ctx context.Context, req *filer_pb.ListEntrie
}
- return resp, nil
+ return nil
}
func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVolumeRequest) (*filer_pb.LookupVolumeResponse, error) {
@@ -123,24 +134,31 @@ func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVol
func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntryRequest) (resp *filer_pb.CreateEntryResponse, err error) {
+ resp = &filer_pb.CreateEntryResponse{}
+
fullpath := filer2.FullPath(filepath.ToSlash(filepath.Join(req.Directory, req.Entry.Name)))
chunks, garbages := filer2.CompactFileChunks(req.Entry.Chunks)
if req.Entry.Attributes == nil {
- return nil, fmt.Errorf("can not create entry with empty attributes")
+ glog.V(3).Infof("CreateEntry %s: nil attributes", filepath.Join(req.Directory, req.Entry.Name))
+ resp.Error = fmt.Sprintf("can not create entry with empty attributes")
+ return
}
- err = fs.filer.CreateEntry(ctx, &filer2.Entry{
+ createErr := fs.filer.CreateEntry(ctx, &filer2.Entry{
FullPath: fullpath,
Attr: filer2.PbToEntryAttribute(req.Entry.Attributes),
Chunks: chunks,
- })
+ }, req.OExcl)
- if err == nil {
- fs.filer.DeleteChunks(fullpath, garbages)
+ if createErr == nil {
+ fs.filer.DeleteChunks(garbages)
+ } else {
+ glog.V(3).Infof("CreateEntry %s: %v", filepath.Join(req.Directory, req.Entry.Name), createErr)
+ resp.Error = createErr.Error()
}
- return &filer_pb.CreateEntryResponse{}, err
+ return
}
func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntryRequest) (*filer_pb.UpdateEntryResponse, error) {
@@ -159,12 +177,14 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr
newEntry := &filer2.Entry{
FullPath: filer2.FullPath(filepath.ToSlash(filepath.Join(req.Directory, req.Entry.Name))),
Attr: entry.Attr,
+ Extended: req.Entry.Extended,
Chunks: chunks,
}
- glog.V(3).Infof("updating %s: %+v, chunks %d: %v => %+v, chunks %d: %v",
+ glog.V(3).Infof("updating %s: %+v, chunks %d: %v => %+v, chunks %d: %v, extended: %v => %v",
fullpath, entry.Attr, len(entry.Chunks), entry.Chunks,
- req.Entry.Attributes, len(req.Entry.Chunks), req.Entry.Chunks)
+ req.Entry.Attributes, len(req.Entry.Chunks), req.Entry.Chunks,
+ entry.Extended, req.Entry.Extended)
if req.Entry.Attributes != nil {
if req.Entry.Attributes.Mtime != 0 {
@@ -186,8 +206,10 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr
}
if err = fs.filer.UpdateEntry(ctx, entry, newEntry); err == nil {
- fs.filer.DeleteChunks(entry.FullPath, unusedChunks)
- fs.filer.DeleteChunks(entry.FullPath, garbages)
+ fs.filer.DeleteChunks(unusedChunks)
+ fs.filer.DeleteChunks(garbages)
+ } else {
+ glog.V(3).Infof("UpdateEntry %s: %v", filepath.Join(req.Directory, req.Entry.Name), err)
}
fs.filer.NotifyUpdateEvent(entry, newEntry, true)
@@ -197,7 +219,30 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr
func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntryRequest) (resp *filer_pb.DeleteEntryResponse, err error) {
err = fs.filer.DeleteEntryMetaAndData(ctx, filer2.FullPath(filepath.ToSlash(filepath.Join(req.Directory, req.Name))), req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData)
- return &filer_pb.DeleteEntryResponse{}, err
+ resp = &filer_pb.DeleteEntryResponse{}
+ if err != nil {
+ resp.Error = err.Error()
+ }
+ return resp, nil
+}
+
+func (fs *FilerServer) StreamDeleteEntries(stream filer_pb.SeaweedFiler_StreamDeleteEntriesServer) error {
+ for {
+ req, err := stream.Recv()
+ if err != nil {
+ return fmt.Errorf("receive delete entry request: %v", err)
+ }
+ fullpath := filer2.FullPath(filepath.ToSlash(filepath.Join(req.Directory, req.Name)))
+ err = fs.filer.DeleteEntryMetaAndData(context.Background(), fullpath, req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData)
+ resp := &filer_pb.DeleteEntryResponse{}
+ if err != nil {
+ resp.Error = err.Error()
+ }
+ if err := stream.Send(resp); err != nil {
+ return err
+ }
+ }
+ return nil
}
func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVolumeRequest) (resp *filer_pb.AssignVolumeResponse, err error) {
@@ -206,6 +251,7 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol
if req.TtlSec > 0 {
ttlStr = strconv.Itoa(int(req.TtlSec))
}
+ collection, replication := fs.detectCollection(req.ParentPath, req.Collection, req.Replication)
var altRequest *operation.VolumeAssignRequest
@@ -216,41 +262,45 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol
assignRequest := &operation.VolumeAssignRequest{
Count: uint64(req.Count),
- Replication: req.Replication,
- Collection: req.Collection,
+ Replication: replication,
+ Collection: collection,
Ttl: ttlStr,
DataCenter: dataCenter,
}
if dataCenter != "" {
altRequest = &operation.VolumeAssignRequest{
Count: uint64(req.Count),
- Replication: req.Replication,
- Collection: req.Collection,
+ Replication: replication,
+ Collection: collection,
Ttl: ttlStr,
DataCenter: "",
}
}
assignResult, err := operation.Assign(fs.filer.GetMaster(), fs.grpcDialOption, assignRequest, altRequest)
if err != nil {
- return nil, fmt.Errorf("assign volume: %v", err)
+ glog.V(3).Infof("AssignVolume: %v", err)
+ return &filer_pb.AssignVolumeResponse{Error: fmt.Sprintf("assign volume: %v", err)}, nil
}
if assignResult.Error != "" {
- return nil, fmt.Errorf("assign volume result: %v", assignResult.Error)
+ glog.V(3).Infof("AssignVolume error: %v", assignResult.Error)
+ return &filer_pb.AssignVolumeResponse{Error: fmt.Sprintf("assign volume result: %v", assignResult.Error)}, nil
}
return &filer_pb.AssignVolumeResponse{
- FileId: assignResult.Fid,
- Count: int32(assignResult.Count),
- Url: assignResult.Url,
- PublicUrl: assignResult.PublicUrl,
- Auth: string(assignResult.Auth),
- }, err
+ FileId: assignResult.Fid,
+ Count: int32(assignResult.Count),
+ Url: assignResult.Url,
+ PublicUrl: assignResult.PublicUrl,
+ Auth: string(assignResult.Auth),
+ Collection: collection,
+ Replication: replication,
+ }, nil
}
func (fs *FilerServer) DeleteCollection(ctx context.Context, req *filer_pb.DeleteCollectionRequest) (resp *filer_pb.DeleteCollectionResponse, err error) {
- err = fs.filer.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
- _, err := client.CollectionDelete(ctx, &master_pb.CollectionDeleteRequest{
+ err = fs.filer.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ _, err := client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{
Name: req.GetCollection(),
})
return err
@@ -286,5 +336,8 @@ func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb.
Collection: fs.option.Collection,
Replication: fs.option.DefaultReplication,
MaxMb: uint32(fs.option.MaxMB),
+ DirBuckets: fs.filer.DirBucketsPath,
+ DirQueues: fs.filer.DirQueuesPath,
+ Cipher: fs.filer.Cipher,
}, nil
}
diff --git a/weed/server/filer_grpc_server_rename.go b/weed/server/filer_grpc_server_rename.go
index dfa59e7fe..0669a26f1 100644
--- a/weed/server/filer_grpc_server_rename.go
+++ b/weed/server/filer_grpc_server_rename.go
@@ -107,7 +107,7 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent filer2.FullP
Attr: entry.Attr,
Chunks: entry.Chunks,
}
- createErr := fs.filer.CreateEntry(ctx, newEntry)
+ createErr := fs.filer.CreateEntry(ctx, newEntry, false)
if createErr != nil {
return createErr
}
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index 2cf26b1bb..70da9094b 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -7,18 +7,18 @@ import (
"os"
"time"
+ "google.golang.org/grpc"
+
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/util"
- "google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/filer2"
_ "github.com/chrislusf/seaweedfs/weed/filer2/cassandra"
_ "github.com/chrislusf/seaweedfs/weed/filer2/etcd"
_ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb"
_ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb2"
- _ "github.com/chrislusf/seaweedfs/weed/filer2/memdb"
_ "github.com/chrislusf/seaweedfs/weed/filer2/mysql"
_ "github.com/chrislusf/seaweedfs/weed/filer2/postgres"
_ "github.com/chrislusf/seaweedfs/weed/filer2/redis"
@@ -31,21 +31,21 @@ import (
_ "github.com/chrislusf/seaweedfs/weed/notification/kafka"
_ "github.com/chrislusf/seaweedfs/weed/notification/log"
"github.com/chrislusf/seaweedfs/weed/security"
- "github.com/spf13/viper"
)
type FilerOption struct {
Masters []string
Collection string
DefaultReplication string
- RedirectOnRead bool
DisableDirListing bool
MaxMB int
DirListingLimit int
DataCenter string
DefaultLevelDbDir string
DisableHttp bool
- Port int
+ Port uint32
+ recursiveDelete bool
+ Cipher bool
}
type FilerServer struct {
@@ -59,18 +59,19 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
fs = &FilerServer{
option: option,
- grpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "filer"),
+ grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
}
if len(option.Masters) == 0 {
glog.Fatal("master list is required!")
}
- fs.filer = filer2.NewFiler(option.Masters, fs.grpcDialOption)
+ fs.filer = filer2.NewFiler(option.Masters, fs.grpcDialOption, option.Port+10000)
+ fs.filer.Cipher = option.Cipher
go fs.filer.KeepConnectedToMaster()
- v := viper.GetViper()
+ v := util.GetViper()
if !util.LoadConfiguration("filer", false) {
v.Set("leveldb2.enabled", true)
v.Set("leveldb2.dir", option.DefaultLevelDbDir)
@@ -81,9 +82,14 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
}
util.LoadConfiguration("notification", false)
+ fs.option.recursiveDelete = v.GetBool("filer.options.recursive_delete")
+ v.Set("filer.option.buckets_folder", "/buckets")
+ v.Set("filer.option.queues_folder", "/queues")
+ fs.filer.DirBucketsPath = v.GetString("filer.option.buckets_folder")
+ fs.filer.DirQueuesPath = v.GetString("filer.option.queues_folder")
fs.filer.LoadConfiguration(v)
- notification.LoadConfiguration(v.Sub("notification"))
+ notification.LoadConfiguration(v, "notification.")
handleStaticResources(defaultMux)
if !option.DisableHttp {
@@ -93,6 +99,8 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
readonlyMux.HandleFunc("/", fs.readonlyFilerHandler)
}
+ fs.filer.LoadBuckets(fs.filer.DirBucketsPath)
+
maybeStartMetrics(fs, option)
return fs, nil
diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go
index ba21298ba..5967535b8 100644
--- a/weed/server/filer_server_handlers_read.go
+++ b/weed/server/filer_server_handlers_read.go
@@ -3,19 +3,16 @@ package weed_server
import (
"context"
"io"
- "io/ioutil"
"mime"
- "mime/multipart"
"net/http"
- "net/url"
- "path"
+ "path/filepath"
"strconv"
"strings"
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/stats"
- "github.com/chrislusf/seaweedfs/weed/util"
)
func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, isGetMethod bool) {
@@ -32,7 +29,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
fs.listDirectoryHandler(w, r)
return
}
- if err == filer2.ErrNotFound {
+ if err == filer_pb.ErrNotFound {
glog.V(1).Infof("Not found %s: %v", path, err)
stats.FilerRequestCounter.WithLabelValues("read.notfound").Inc()
w.WriteHeader(http.StatusNotFound)
@@ -66,188 +63,35 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
}
w.Header().Set("Accept-Ranges", "bytes")
- if r.Method == "HEAD" {
- w.Header().Set("Content-Length", strconv.FormatInt(int64(filer2.TotalSize(entry.Chunks)), 10))
- w.Header().Set("Last-Modified", entry.Attr.Mtime.Format(http.TimeFormat))
- setEtag(w, filer2.ETag(entry.Chunks))
- return
- }
-
- if len(entry.Chunks) == 1 {
- fs.handleSingleChunk(w, r, entry)
- return
- }
-
- fs.handleMultipleChunks(w, r, entry)
-
-}
-
-func (fs *FilerServer) handleSingleChunk(w http.ResponseWriter, r *http.Request, entry *filer2.Entry) {
-
- fileId := entry.Chunks[0].GetFileIdString()
-
- urlString, err := fs.filer.MasterClient.LookupFileId(fileId)
- if err != nil {
- glog.V(1).Infof("operation LookupFileId %s failed, err: %v", fileId, err)
- w.WriteHeader(http.StatusNotFound)
- return
- }
-
- if fs.option.RedirectOnRead {
- stats.FilerRequestCounter.WithLabelValues("redirect").Inc()
- http.Redirect(w, r, urlString, http.StatusFound)
- return
- }
-
- u, _ := url.Parse(urlString)
- q := u.Query()
- for key, values := range r.URL.Query() {
- for _, value := range values {
- q.Add(key, value)
- }
- }
- u.RawQuery = q.Encode()
- request := &http.Request{
- Method: r.Method,
- URL: u,
- Proto: r.Proto,
- ProtoMajor: r.ProtoMajor,
- ProtoMinor: r.ProtoMinor,
- Header: r.Header,
- Body: r.Body,
- Host: r.Host,
- ContentLength: r.ContentLength,
- }
- glog.V(3).Infoln("retrieving from", u)
- resp, do_err := util.Do(request)
- if do_err != nil {
- glog.V(0).Infoln("failing to connect to volume server", do_err.Error())
- writeJsonError(w, r, http.StatusInternalServerError, do_err)
- return
- }
- defer func() {
- io.Copy(ioutil.Discard, resp.Body)
- resp.Body.Close()
- }()
- for k, v := range resp.Header {
- w.Header()[k] = v
- }
- if entry.Attr.Mime != "" {
- w.Header().Set("Content-Type", entry.Attr.Mime)
- }
- w.WriteHeader(resp.StatusCode)
- io.Copy(w, resp.Body)
-}
-
-func (fs *FilerServer) handleMultipleChunks(w http.ResponseWriter, r *http.Request, entry *filer2.Entry) {
+ w.Header().Set("Last-Modified", entry.Attr.Mtime.Format(http.TimeFormat))
+ // mime type
mimeType := entry.Attr.Mime
if mimeType == "" {
- if ext := path.Ext(entry.Name()); ext != "" {
+ if ext := filepath.Ext(entry.Name()); ext != "" {
mimeType = mime.TypeByExtension(ext)
}
}
if mimeType != "" {
w.Header().Set("Content-Type", mimeType)
}
- setEtag(w, filer2.ETag(entry.Chunks))
- totalSize := int64(filer2.TotalSize(entry.Chunks))
-
- rangeReq := r.Header.Get("Range")
-
- if rangeReq == "" {
- w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10))
- if err := fs.writeContent(w, entry, 0, int(totalSize)); err != nil {
- http.Error(w, err.Error(), http.StatusInternalServerError)
- return
- }
- return
- }
-
- //the rest is dealing with partial content request
- //mostly copy from src/pkg/net/http/fs.go
- ranges, err := parseRange(rangeReq, totalSize)
- if err != nil {
- http.Error(w, err.Error(), http.StatusRequestedRangeNotSatisfiable)
- return
- }
- if sumRangesSize(ranges) > totalSize {
- // The total number of bytes in all the ranges
- // is larger than the size of the file by
- // itself, so this is probably an attack, or a
- // dumb client. Ignore the range request.
- return
- }
- if len(ranges) == 0 {
- return
- }
- if len(ranges) == 1 {
- // RFC 2616, Section 14.16:
- // "When an HTTP message includes the content of a single
- // range (for example, a response to a request for a
- // single range, or to a request for a set of ranges
- // that overlap without any holes), this content is
- // transmitted with a Content-Range header, and a
- // Content-Length header showing the number of bytes
- // actually transferred.
- // ...
- // A response to a request for a single range MUST NOT
- // be sent using the multipart/byteranges media type."
- ra := ranges[0]
- w.Header().Set("Content-Length", strconv.FormatInt(ra.length, 10))
- w.Header().Set("Content-Range", ra.contentRange(totalSize))
- w.WriteHeader(http.StatusPartialContent)
-
- err = fs.writeContent(w, entry, ra.start, int(ra.length))
- if err != nil {
- http.Error(w, err.Error(), http.StatusInternalServerError)
- return
- }
- return
- }
+ // set etag
+ setEtag(w, filer2.ETag(entry.Chunks))
- // process multiple ranges
- for _, ra := range ranges {
- if ra.start > totalSize {
- http.Error(w, "Out of Range", http.StatusRequestedRangeNotSatisfiable)
- return
- }
- }
- sendSize := rangesMIMESize(ranges, mimeType, totalSize)
- pr, pw := io.Pipe()
- mw := multipart.NewWriter(pw)
- w.Header().Set("Content-Type", "multipart/byteranges; boundary="+mw.Boundary())
- sendContent := pr
- defer pr.Close() // cause writing goroutine to fail and exit if CopyN doesn't finish.
- go func() {
- for _, ra := range ranges {
- part, e := mw.CreatePart(ra.mimeHeader(mimeType, totalSize))
- if e != nil {
- pw.CloseWithError(e)
- return
- }
- if e = fs.writeContent(part, entry, ra.start, int(ra.length)); e != nil {
- pw.CloseWithError(e)
- return
- }
- }
- mw.Close()
- pw.Close()
- }()
- if w.Header().Get("Content-Encoding") == "" {
- w.Header().Set("Content-Length", strconv.FormatInt(sendSize, 10))
- }
- w.WriteHeader(http.StatusPartialContent)
- if _, err := io.CopyN(w, sendContent, sendSize); err != nil {
- http.Error(w, "Internal Error", http.StatusInternalServerError)
+ if r.Method == "HEAD" {
+ w.Header().Set("Content-Length", strconv.FormatInt(int64(filer2.TotalSize(entry.Chunks)), 10))
return
}
-}
+ filename := entry.Name()
+ adjustHeadersAfterHEAD(w, r, filename)
-func (fs *FilerServer) writeContent(w io.Writer, entry *filer2.Entry, offset int64, size int) error {
+ totalSize := int64(filer2.TotalSize(entry.Chunks))
- return filer2.StreamContent(fs.filer.MasterClient, w, entry.Chunks, offset, size)
+ processRangeRequst(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error {
+ return filer2.StreamContent(fs.filer.MasterClient, w, entry.Chunks, offset, int(size))
+ })
}
+
diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go
index 5d95a5d7e..5cd174b17 100644
--- a/weed/server/filer_server_handlers_write.go
+++ b/weed/server/filer_server_handlers_write.go
@@ -2,6 +2,7 @@ package weed_server
import (
"context"
+ "crypto/md5"
"encoding/json"
"errors"
"fmt"
@@ -22,6 +23,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/stats"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -32,13 +34,13 @@ var (
type FilerPostResult struct {
Name string `json:"name,omitempty"`
- Size uint32 `json:"size,omitempty"`
+ Size int64 `json:"size,omitempty"`
Error string `json:"error,omitempty"`
Fid string `json:"fid,omitempty"`
Url string `json:"url,omitempty"`
}
-func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, replication, collection string, dataCenter string) (fileId, urlLocation string, auth security.EncodedJwt, err error) {
+func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, replication, collection, dataCenter, ttlString string) (fileId, urlLocation string, auth security.EncodedJwt, err error) {
stats.FilerRequestCounter.WithLabelValues("assign").Inc()
start := time.Now()
@@ -48,7 +50,7 @@ func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request,
Count: 1,
Replication: replication,
Collection: collection,
- Ttl: r.URL.Query().Get("ttl"),
+ Ttl: ttlString,
DataCenter: dataCenter,
}
var altRequest *operation.VolumeAssignRequest
@@ -57,7 +59,7 @@ func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request,
Count: 1,
Replication: replication,
Collection: collection,
- Ttl: r.URL.Query().Get("ttl"),
+ Ttl: ttlString,
DataCenter: "",
}
}
@@ -80,57 +82,59 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
ctx := context.Background()
query := r.URL.Query()
- replication := query.Get("replication")
- if replication == "" {
- replication = fs.option.DefaultReplication
- }
- collection := query.Get("collection")
- if collection == "" {
- collection = fs.option.Collection
- }
+ collection, replication := fs.detectCollection(r.RequestURI, query.Get("collection"), query.Get("replication"))
dataCenter := query.Get("dataCenter")
if dataCenter == "" {
dataCenter = fs.option.DataCenter
}
+ ttlString := r.URL.Query().Get("ttl")
+
+ // read ttl in seconds
+ ttl, err := needle.ReadTTL(ttlString)
+ ttlSeconds := int32(0)
+ if err == nil {
+ ttlSeconds = int32(ttl.Minutes()) * 60
+ }
+
+ if autoChunked := fs.autoChunk(ctx, w, r, replication, collection, dataCenter, ttlSeconds, ttlString); autoChunked {
+ return
+ }
+
+ if fs.option.Cipher {
+ reply, err := fs.encrypt(ctx, w, r, replication, collection, dataCenter, ttlSeconds, ttlString)
+ if err != nil {
+ writeJsonError(w, r, http.StatusInternalServerError, err)
+ } else if reply != nil {
+ writeJsonQuiet(w, r, http.StatusCreated, reply)
+ }
- if autoChunked := fs.autoChunk(ctx, w, r, replication, collection, dataCenter); autoChunked {
return
}
- fileId, urlLocation, auth, err := fs.assignNewFileInfo(w, r, replication, collection, dataCenter)
+ fileId, urlLocation, auth, err := fs.assignNewFileInfo(w, r, replication, collection, dataCenter, ttlString)
if err != nil || fileId == "" || urlLocation == "" {
glog.V(0).Infof("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter)
+ writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter))
return
}
glog.V(4).Infof("write %s to %v", r.URL.Path, urlLocation)
u, _ := url.Parse(urlLocation)
-
- // This allows a client to generate a chunk manifest and submit it to the filer -- it is a little off
- // because they need to provide FIDs instead of file paths...
- cm, _ := strconv.ParseBool(query.Get("cm"))
- if cm {
- q := u.Query()
- q.Set("cm", "true")
- u.RawQuery = q.Encode()
- }
- glog.V(4).Infoln("post to", u)
-
ret, err := fs.uploadToVolumeServer(r, u, auth, w, fileId)
if err != nil {
return
}
- if err = fs.updateFilerStore(ctx, r, w, replication, collection, ret, fileId); err != nil {
+ if err = fs.updateFilerStore(ctx, r, w, replication, collection, ret, fileId, ttlSeconds); err != nil {
return
}
// send back post result
reply := FilerPostResult{
Name: ret.Name,
- Size: ret.Size,
+ Size: int64(ret.Size),
Error: ret.Error,
Fid: fileId,
Url: urlLocation,
@@ -141,7 +145,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
// update metadata in filer store
func (fs *FilerServer) updateFilerStore(ctx context.Context, r *http.Request, w http.ResponseWriter,
- replication string, collection string, ret operation.UploadResult, fileId string) (err error) {
+ replication string, collection string, ret *operation.UploadResult, fileId string, ttlSeconds int32) (err error) {
stats.FilerRequestCounter.WithLabelValues("postStoreWrite").Inc()
start := time.Now()
@@ -149,6 +153,16 @@ func (fs *FilerServer) updateFilerStore(ctx context.Context, r *http.Request, w
stats.FilerRequestHistogram.WithLabelValues("postStoreWrite").Observe(time.Since(start).Seconds())
}()
+ modeStr := r.URL.Query().Get("mode")
+ if modeStr == "" {
+ modeStr = "0660"
+ }
+ mode, err := strconv.ParseUint(modeStr, 8, 32)
+ if err != nil {
+ glog.Errorf("Invalid mode format: %s, use 0660 by default", modeStr)
+ mode = 0660
+ }
+
path := r.URL.Path
if strings.HasSuffix(path, "/") {
if ret.Name != "" {
@@ -165,12 +179,13 @@ func (fs *FilerServer) updateFilerStore(ctx context.Context, r *http.Request, w
Attr: filer2.Attr{
Mtime: time.Now(),
Crtime: crTime,
- Mode: 0660,
+ Mode: os.FileMode(mode),
Uid: OS_UID,
Gid: OS_GID,
Replication: replication,
Collection: collection,
- TtlSec: int32(util.ParseInt(r.URL.Query().Get("ttl"), 0)),
+ TtlSec: ttlSeconds,
+ Mime: ret.Mime,
},
Chunks: []*filer_pb.FileChunk{{
FileId: fileId,
@@ -179,12 +194,14 @@ func (fs *FilerServer) updateFilerStore(ctx context.Context, r *http.Request, w
ETag: ret.ETag,
}},
}
- if ext := filenamePath.Ext(path); ext != "" {
- entry.Attr.Mime = mime.TypeByExtension(ext)
+ if entry.Attr.Mime == "" {
+ if ext := filenamePath.Ext(path); ext != "" {
+ entry.Attr.Mime = mime.TypeByExtension(ext)
+ }
}
// glog.V(4).Infof("saving %s => %+v", path, entry)
- if dbErr := fs.filer.CreateEntry(ctx, entry); dbErr != nil {
- fs.filer.DeleteChunks(entry.FullPath, entry.Chunks)
+ if dbErr := fs.filer.CreateEntry(ctx, entry, false); dbErr != nil {
+ fs.filer.DeleteChunks(entry.Chunks)
glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr)
writeJsonError(w, r, http.StatusInternalServerError, dbErr)
err = dbErr
@@ -195,12 +212,16 @@ func (fs *FilerServer) updateFilerStore(ctx context.Context, r *http.Request, w
}
// send request to volume server
-func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth security.EncodedJwt, w http.ResponseWriter, fileId string) (ret operation.UploadResult, err error) {
+func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth security.EncodedJwt, w http.ResponseWriter, fileId string) (ret *operation.UploadResult, err error) {
stats.FilerRequestCounter.WithLabelValues("postUpload").Inc()
start := time.Now()
defer func() { stats.FilerRequestHistogram.WithLabelValues("postUpload").Observe(time.Since(start).Seconds()) }()
+ ret = &operation.UploadResult{}
+ hash := md5.New()
+ var body = ioutil.NopCloser(io.TeeReader(r.Body, hash))
+
request := &http.Request{
Method: r.Method,
URL: u,
@@ -208,10 +229,11 @@ func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth se
ProtoMajor: r.ProtoMajor,
ProtoMinor: r.ProtoMinor,
Header: r.Header,
- Body: r.Body,
+ Body: body,
Host: r.Host,
ContentLength: r.ContentLength,
}
+
if auth != "" {
request.Header.Set("Authorization", "BEARER "+string(auth))
}
@@ -226,7 +248,7 @@ func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth se
io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()
}()
- etag := resp.Header.Get("ETag")
+
respBody, raErr := ioutil.ReadAll(resp.Body)
if raErr != nil {
glog.V(0).Infoln("failing to upload to volume server", r.RequestURI, raErr.Error())
@@ -234,6 +256,7 @@ func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth se
err = raErr
return
}
+
glog.V(4).Infoln("post result", string(respBody))
unmarshalErr := json.Unmarshal(respBody, &ret)
if unmarshalErr != nil {
@@ -261,26 +284,65 @@ func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth se
return
}
}
- if etag != "" {
- ret.ETag = etag
- }
+ // use filer calculated md5 ETag, instead of the volume server crc ETag
+ ret.ETag = fmt.Sprintf("%x", hash.Sum(nil))
return
}
// curl -X DELETE http://localhost:8888/path/to
// curl -X DELETE http://localhost:8888/path/to?recursive=true
// curl -X DELETE http://localhost:8888/path/to?recursive=true&ignoreRecursiveError=true
+// curl -X DELETE http://localhost:8888/path/to?recursive=true&skipChunkDeletion=true
func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
isRecursive := r.FormValue("recursive") == "true"
+ if !isRecursive && fs.option.recursiveDelete {
+ if r.FormValue("recursive") != "false" {
+ isRecursive = true
+ }
+ }
ignoreRecursiveError := r.FormValue("ignoreRecursiveError") == "true"
+ skipChunkDeletion := r.FormValue("skipChunkDeletion") == "true"
- err := fs.filer.DeleteEntryMetaAndData(context.Background(), filer2.FullPath(r.URL.Path), isRecursive, ignoreRecursiveError, true)
+ err := fs.filer.DeleteEntryMetaAndData(context.Background(), filer2.FullPath(r.URL.Path), isRecursive, ignoreRecursiveError, !skipChunkDeletion)
if err != nil {
glog.V(1).Infoln("deleting", r.URL.Path, ":", err.Error())
- writeJsonError(w, r, http.StatusInternalServerError, err)
+ httpStatus := http.StatusInternalServerError
+ if err == filer_pb.ErrNotFound {
+ httpStatus = http.StatusNotFound
+ }
+ writeJsonError(w, r, httpStatus, err)
return
}
w.WriteHeader(http.StatusNoContent)
}
+
+func (fs *FilerServer) detectCollection(requestURI, qCollection, qReplication string) (collection, replication string) {
+ // default
+ collection = fs.option.Collection
+ replication = fs.option.DefaultReplication
+
+ // get default collection settings
+ if qCollection != "" {
+ collection = qCollection
+ }
+ if qReplication != "" {
+ replication = qReplication
+ }
+
+ // required by buckets folder
+ if strings.HasPrefix(requestURI, fs.filer.DirBucketsPath+"/") {
+ bucketAndObjectKey := requestURI[len(fs.filer.DirBucketsPath)+1:]
+ t := strings.Index(bucketAndObjectKey, "/")
+ if t < 0 {
+ collection = bucketAndObjectKey
+ }
+ if t > 0 {
+ collection = bucketAndObjectKey[:t]
+ }
+ replication = fs.filer.ReadBucketOption(collection)
+ }
+
+ return
+}
diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go
index 492b55943..a2672b836 100644
--- a/weed/server/filer_server_handlers_write_autochunk.go
+++ b/weed/server/filer_server_handlers_write_autochunk.go
@@ -1,10 +1,9 @@
package weed_server
import (
- "bytes"
"context"
+ "fmt"
"io"
- "io/ioutil"
"net/http"
"path"
"strconv"
@@ -17,11 +16,10 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/stats"
- "github.com/chrislusf/seaweedfs/weed/util"
)
func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request,
- replication string, collection string, dataCenter string) bool {
+ replication string, collection string, dataCenter string, ttlSec int32, ttlString string) bool {
if r.Method != "POST" {
glog.V(4).Infoln("AutoChunking not supported for method", r.Method)
return false
@@ -57,7 +55,7 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *
return false
}
- reply, err := fs.doAutoChunk(ctx, w, r, contentLength, chunkSize, replication, collection, dataCenter)
+ reply, err := fs.doAutoChunk(ctx, w, r, contentLength, chunkSize, replication, collection, dataCenter, ttlSec, ttlString)
if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err)
} else if reply != nil {
@@ -67,7 +65,7 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *
}
func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request,
- contentLength int64, chunkSize int32, replication string, collection string, dataCenter string) (filerResult *FilerPostResult, replyerr error) {
+ contentLength int64, chunkSize int32, replication string, collection string, dataCenter string, ttlSec int32, ttlString string) (filerResult *FilerPostResult, replyerr error) {
stats.FilerRequestCounter.WithLabelValues("postAutoChunk").Inc()
start := time.Now()
@@ -89,68 +87,55 @@ func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r
if fileName != "" {
fileName = path.Base(fileName)
}
+ contentType := part1.Header.Get("Content-Type")
+
+ fmt.Printf("autochunk part header: %+v\n", part1.Header)
var fileChunks []*filer_pb.FileChunk
- totalBytesRead := int64(0)
- tmpBufferSize := int32(1024 * 1024)
- tmpBuffer := bytes.NewBuffer(make([]byte, 0, tmpBufferSize))
- chunkBuf := make([]byte, chunkSize+tmpBufferSize, chunkSize+tmpBufferSize) // chunk size plus a little overflow
- chunkBufOffset := int32(0)
chunkOffset := int64(0)
- writtenChunks := 0
- filerResult = &FilerPostResult{
- Name: fileName,
- }
+ for chunkOffset < contentLength {
+ limitedReader := io.LimitReader(part1, int64(chunkSize))
- for totalBytesRead < contentLength {
- tmpBuffer.Reset()
- bytesRead, readErr := io.CopyN(tmpBuffer, part1, int64(tmpBufferSize))
- readFully := readErr != nil && readErr == io.EOF
- tmpBuf := tmpBuffer.Bytes()
- bytesToCopy := tmpBuf[0:int(bytesRead)]
-
- copy(chunkBuf[chunkBufOffset:chunkBufOffset+int32(bytesRead)], bytesToCopy)
- chunkBufOffset = chunkBufOffset + int32(bytesRead)
-
- if chunkBufOffset >= chunkSize || readFully || (chunkBufOffset > 0 && bytesRead == 0) {
- writtenChunks = writtenChunks + 1
- fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(w, r, replication, collection, dataCenter)
- if assignErr != nil {
- return nil, assignErr
- }
-
- // upload the chunk to the volume server
- chunkName := fileName + "_chunk_" + strconv.FormatInt(int64(len(fileChunks)+1), 10)
- uploadErr := fs.doUpload(urlLocation, w, r, chunkBuf[0:chunkBufOffset], chunkName, "application/octet-stream", fileId, auth)
- if uploadErr != nil {
- return nil, uploadErr
- }
-
- // Save to chunk manifest structure
- fileChunks = append(fileChunks,
- &filer_pb.FileChunk{
- FileId: fileId,
- Offset: chunkOffset,
- Size: uint64(chunkBufOffset),
- Mtime: time.Now().UnixNano(),
- },
- )
-
- // reset variables for the next chunk
- chunkBufOffset = 0
- chunkOffset = totalBytesRead + int64(bytesRead)
+ // assign one file id for one chunk
+ fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(w, r, replication, collection, dataCenter, ttlString)
+ if assignErr != nil {
+ return nil, assignErr
}
- totalBytesRead = totalBytesRead + int64(bytesRead)
+ // upload the chunk to the volume server
+ uploadResult, uploadErr := fs.doUpload(urlLocation, w, r, limitedReader, fileName, contentType, nil, auth)
+ if uploadErr != nil {
+ return nil, uploadErr
+ }
- if bytesRead == 0 || readFully {
+ // if last chunk exhausted the reader exactly at the border
+ if uploadResult.Size == 0 {
break
}
- if readErr != nil {
- return nil, readErr
+ // Save to chunk manifest structure
+ fileChunks = append(fileChunks,
+ &filer_pb.FileChunk{
+ FileId: fileId,
+ Offset: chunkOffset,
+ Size: uint64(uploadResult.Size),
+ Mtime: time.Now().UnixNano(),
+ ETag: uploadResult.ETag,
+ CipherKey: uploadResult.CipherKey,
+ IsGzipped: uploadResult.Gzip > 0,
+ },
+ )
+
+ glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d) of %d", fileName, len(fileChunks), fileId, chunkOffset, chunkOffset+int64(uploadResult.Size), contentLength)
+
+ // reset variables for the next chunk
+ chunkOffset = chunkOffset + int64(uploadResult.Size)
+
+ // if last chunk was not at full chunk size, but already exhausted the reader
+ if int64(uploadResult.Size) < int64(chunkSize) {
+ break
}
}
@@ -172,12 +157,19 @@ func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r
Gid: OS_GID,
Replication: replication,
Collection: collection,
- TtlSec: int32(util.ParseInt(r.URL.Query().Get("ttl"), 0)),
+ TtlSec: ttlSec,
+ Mime: contentType,
},
Chunks: fileChunks,
}
- if dbErr := fs.filer.CreateEntry(ctx, entry); dbErr != nil {
- fs.filer.DeleteChunks(entry.FullPath, entry.Chunks)
+
+ filerResult = &FilerPostResult{
+ Name: fileName,
+ Size: chunkOffset,
+ }
+
+ if dbErr := fs.filer.CreateEntry(ctx, entry, false); dbErr != nil {
+ fs.filer.DeleteChunks(entry.Chunks)
replyerr = dbErr
filerResult.Error = dbErr.Error()
glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr)
@@ -187,8 +179,7 @@ func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r
return
}
-func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request,
- chunkBuf []byte, fileName string, contentType string, fileId string, auth security.EncodedJwt) (err error) {
+func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error) {
stats.FilerRequestCounter.WithLabelValues("postAutoChunkUpload").Inc()
start := time.Now()
@@ -196,13 +187,5 @@ func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *ht
stats.FilerRequestHistogram.WithLabelValues("postAutoChunkUpload").Observe(time.Since(start).Seconds())
}()
- ioReader := ioutil.NopCloser(bytes.NewBuffer(chunkBuf))
- uploadResult, uploadError := operation.Upload(urlLocation, fileName, ioReader, false, contentType, nil, auth)
- if uploadResult != nil {
- glog.V(0).Infoln("Chunk upload result. Name:", uploadResult.Name, "Fid:", fileId, "Size:", uploadResult.Size)
- }
- if uploadError != nil {
- err = uploadError
- }
- return
+ return operation.Upload(urlLocation, fileName, fs.option.Cipher, limitedReader, false, contentType, pairMap, auth)
}
diff --git a/weed/server/filer_server_handlers_write_cipher.go b/weed/server/filer_server_handlers_write_cipher.go
new file mode 100644
index 000000000..06670399c
--- /dev/null
+++ b/weed/server/filer_server_handlers_write_cipher.go
@@ -0,0 +1,98 @@
+package weed_server
+
+import (
+ "context"
+ "fmt"
+ "net/http"
+ "strings"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+)
+
+// handling single chunk POST or PUT upload
+func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *http.Request,
+ replication string, collection string, dataCenter string, ttlSeconds int32, ttlString string) (filerResult *FilerPostResult, err error) {
+
+ fileId, urlLocation, auth, err := fs.assignNewFileInfo(w, r, replication, collection, dataCenter, ttlString)
+
+ if err != nil || fileId == "" || urlLocation == "" {
+ return nil, fmt.Errorf("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter)
+ }
+
+ glog.V(4).Infof("write %s to %v", r.URL.Path, urlLocation)
+
+ // Note: encrypt(gzip(data)), encrypt data first, then gzip
+
+ sizeLimit := int64(fs.option.MaxMB) * 1024 * 1024
+
+ pu, err := needle.ParseUpload(r, sizeLimit)
+ uncompressedData := pu.Data
+ if pu.IsGzipped {
+ uncompressedData = pu.UncompressedData
+ }
+ if pu.MimeType == "" {
+ pu.MimeType = http.DetectContentType(uncompressedData)
+ }
+
+ uploadResult, uploadError := operation.UploadData(urlLocation, pu.FileName, true, uncompressedData, false, pu.MimeType, pu.PairMap, auth)
+ if uploadError != nil {
+ return nil, fmt.Errorf("upload to volume server: %v", uploadError)
+ }
+
+ // Save to chunk manifest structure
+ fileChunks := []*filer_pb.FileChunk{
+ {
+ FileId: fileId,
+ Offset: 0,
+ Size: uint64(uploadResult.Size),
+ Mtime: time.Now().UnixNano(),
+ ETag: uploadResult.Md5,
+ CipherKey: uploadResult.CipherKey,
+ IsGzipped: uploadResult.Gzip > 0,
+ },
+ }
+
+ fmt.Printf("uploaded: %+v\n", uploadResult)
+
+ path := r.URL.Path
+ if strings.HasSuffix(path, "/") {
+ if pu.FileName != "" {
+ path += pu.FileName
+ }
+ }
+
+ entry := &filer2.Entry{
+ FullPath: filer2.FullPath(path),
+ Attr: filer2.Attr{
+ Mtime: time.Now(),
+ Crtime: time.Now(),
+ Mode: 0660,
+ Uid: OS_UID,
+ Gid: OS_GID,
+ Replication: replication,
+ Collection: collection,
+ TtlSec: ttlSeconds,
+ Mime: pu.MimeType,
+ },
+ Chunks: fileChunks,
+ }
+
+ filerResult = &FilerPostResult{
+ Name: pu.FileName,
+ Size: int64(pu.OriginalDataSize),
+ }
+
+ if dbErr := fs.filer.CreateEntry(ctx, entry, false); dbErr != nil {
+ fs.filer.DeleteChunks(entry.Chunks)
+ err = dbErr
+ filerResult.Error = dbErr.Error()
+ return
+ }
+
+ return
+}
diff --git a/weed/server/filer_ui/breadcrumb.go b/weed/server/filer_ui/breadcrumb.go
index 55a1909a8..2f0df7f91 100644
--- a/weed/server/filer_ui/breadcrumb.go
+++ b/weed/server/filer_ui/breadcrumb.go
@@ -14,10 +14,14 @@ func ToBreadcrumb(fullpath string) (crumbs []Breadcrumb) {
parts := strings.Split(fullpath, "/")
for i := 0; i < len(parts); i++ {
- crumbs = append(crumbs, Breadcrumb{
- Name: parts[i] + "/",
+ crumb := Breadcrumb{
+ Name: parts[i] + " /",
Link: "/" + filepath.ToSlash(filepath.Join(parts[0:i+1]...)),
- })
+ }
+ if !strings.HasSuffix(crumb.Link, "/") {
+ crumb.Link += "/"
+ }
+ crumbs = append(crumbs, crumb)
}
return
diff --git a/weed/server/filer_ui/templates.go b/weed/server/filer_ui/templates.go
index 884798936..e532b27e2 100644
--- a/weed/server/filer_ui/templates.go
+++ b/weed/server/filer_ui/templates.go
@@ -50,7 +50,7 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC
<div class="row">
<div>
{{ range $entry := .Breadcrumbs }}
- <a href={{ $entry.Link }} >
+ <a href="{{ $entry.Link }}" >
{{ $entry.Name }}
</a>
{{ end }}
@@ -78,20 +78,19 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC
</a>
{{end}}
</td>
- <td align="right">
+ <td align="right" nowrap>
{{if $entry.IsDirectory}}
{{else}}
- {{ $entry.Mime }}
+ {{ $entry.Mime }}&nbsp;
{{end}}
</td>
- <td align="right">
+ <td align="right" nowrap>
{{if $entry.IsDirectory}}
{{else}}
- {{ $entry.Size | humanizeBytes }}
- &nbsp;&nbsp;&nbsp;
+ {{ $entry.Size | humanizeBytes }}&nbsp;
{{end}}
</td>
- <td>
+ <td nowrap>
{{ $entry.Timestamp.Format "2006-01-02 15:04" }}
</td>
</tr>
diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go
index 82a190e39..84087df8b 100644
--- a/weed/server/master_grpc_server.go
+++ b/weed/server/master_grpc_server.go
@@ -1,17 +1,20 @@
package weed_server
import (
+ "context"
"fmt"
"net"
"strings"
"time"
"github.com/chrislusf/raft"
+ "google.golang.org/grpc/peer"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/topology"
- "google.golang.org/grpc/peer"
)
func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServer) error {
@@ -60,14 +63,6 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
t.Sequence.SetMax(heartbeat.MaxFileKey)
if dn == nil {
- if heartbeat.Ip == "" {
- if pr, ok := peer.FromContext(stream.Context()); ok {
- if pr.Addr != net.Addr(nil) {
- heartbeat.Ip = pr.Addr.String()[0:strings.LastIndex(pr.Addr.String(), ":")]
- glog.V(0).Infof("remote IP address is detected as %v", heartbeat.Ip)
- }
- }
- }
dcName, rackName := t.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack)
dc := t.GetOrCreateDataCenter(dcName)
rack := dc.GetOrCreateRack(rackName)
@@ -76,7 +71,10 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
int64(heartbeat.MaxVolumeCount))
glog.V(0).Infof("added volume server %v:%d", heartbeat.GetIp(), heartbeat.GetPort())
if err := stream.Send(&master_pb.HeartbeatResponse{
- VolumeSizeLimit: uint64(ms.option.VolumeSizeLimitMB) * 1024 * 1024,
+ VolumeSizeLimit: uint64(ms.option.VolumeSizeLimitMB) * 1024 * 1024,
+ MetricsAddress: ms.option.MetricsAddress,
+ MetricsIntervalSeconds: uint32(ms.option.MetricsIntervalSec),
+ StorageBackends: backend.ToPbStorageBackends(),
}); err != nil {
glog.Warningf("SendHeartbeat.Send volume size to %s:%d %v", dn.Ip, dn.Port, err)
return err
@@ -164,9 +162,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
return err
}
if err := stream.Send(&master_pb.HeartbeatResponse{
- Leader: newLeader,
- MetricsAddress: ms.option.MetricsAddress,
- MetricsIntervalSeconds: uint32(ms.option.MetricsIntervalSec),
+ Leader: newLeader,
}); err != nil {
glog.Warningf("SendHeartbeat.Send response to to %s:%d %v", dn.Ip, dn.Port, err)
return err
@@ -187,35 +183,13 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ
return ms.informNewLeader(stream)
}
- // remember client address
- ctx := stream.Context()
- // fmt.Printf("FromContext %+v\n", ctx)
- pr, ok := peer.FromContext(ctx)
- if !ok {
- glog.Error("failed to get peer from ctx")
- return fmt.Errorf("failed to get peer from ctx")
- }
- if pr.Addr == net.Addr(nil) {
- glog.Error("failed to get peer address")
- return fmt.Errorf("failed to get peer address")
- }
+ peerAddress := findClientAddress(stream.Context(), req.GrpcPort)
- clientName := req.Name + pr.Addr.String()
- glog.V(0).Infof("+ client %v", clientName)
-
- messageChan := make(chan *master_pb.VolumeLocation)
stopChan := make(chan bool)
- ms.clientChansLock.Lock()
- ms.clientChans[clientName] = messageChan
- ms.clientChansLock.Unlock()
+ clientName, messageChan := ms.addClient(req.Name, peerAddress)
- defer func() {
- glog.V(0).Infof("- client %v", clientName)
- ms.clientChansLock.Lock()
- delete(ms.clientChans, clientName)
- ms.clientChansLock.Unlock()
- }()
+ defer ms.deleteClient(clientName)
for _, message := range ms.Topo.ToVolumeLocations() {
if err := stream.Send(message); err != nil {
@@ -267,3 +241,57 @@ func (ms *MasterServer) informNewLeader(stream master_pb.Seaweed_KeepConnectedSe
}
return nil
}
+
+func (ms *MasterServer) addClient(clientType string, clientAddress string) (clientName string, messageChan chan *master_pb.VolumeLocation) {
+ clientName = clientType + "@" + clientAddress
+ glog.V(0).Infof("+ client %v", clientName)
+
+ messageChan = make(chan *master_pb.VolumeLocation)
+
+ ms.clientChansLock.Lock()
+ ms.clientChans[clientName] = messageChan
+ ms.clientChansLock.Unlock()
+ return
+}
+
+func (ms *MasterServer) deleteClient(clientName string) {
+ glog.V(0).Infof("- client %v", clientName)
+ ms.clientChansLock.Lock()
+ delete(ms.clientChans, clientName)
+ ms.clientChansLock.Unlock()
+}
+
+func findClientAddress(ctx context.Context, grpcPort uint32) string {
+ // fmt.Printf("FromContext %+v\n", ctx)
+ pr, ok := peer.FromContext(ctx)
+ if !ok {
+ glog.Error("failed to get peer from ctx")
+ return ""
+ }
+ if pr.Addr == net.Addr(nil) {
+ glog.Error("failed to get peer address")
+ return ""
+ }
+ if grpcPort == 0 {
+ return pr.Addr.String()
+ }
+ if tcpAddr, ok := pr.Addr.(*net.TCPAddr); ok {
+ externalIP := tcpAddr.IP
+ return fmt.Sprintf("%s:%d", externalIP, grpcPort)
+ }
+ return pr.Addr.String()
+
+}
+
+func (ms *MasterServer) ListMasterClients(ctx context.Context, req *master_pb.ListMasterClientsRequest) (*master_pb.ListMasterClientsResponse, error) {
+ resp := &master_pb.ListMasterClientsResponse{}
+ ms.clientChansLock.RLock()
+ defer ms.clientChansLock.RUnlock()
+
+ for k := range ms.clientChans {
+ if strings.HasPrefix(k, req.ClientType+"@") {
+ resp.GrpcAddresses = append(resp.GrpcAddresses, k[len(req.ClientType)+1:])
+ }
+ }
+ return resp, nil
+}
diff --git a/weed/server/master_grpc_server_collection.go b/weed/server/master_grpc_server_collection.go
index f8e0785f6..b92d6bcbe 100644
--- a/weed/server/master_grpc_server_collection.go
+++ b/weed/server/master_grpc_server_collection.go
@@ -4,6 +4,7 @@ import (
"context"
"github.com/chrislusf/raft"
+
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go
index 8fc56e9b8..856c07890 100644
--- a/weed/server/master_grpc_server_volume.go
+++ b/weed/server/master_grpc_server_volume.go
@@ -5,10 +5,11 @@ import (
"fmt"
"github.com/chrislusf/raft"
+
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/security"
- "github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
"github.com/chrislusf/seaweedfs/weed/topology"
)
@@ -52,7 +53,7 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
if req.Replication == "" {
req.Replication = ms.option.DefaultReplicaPlacement
}
- replicaPlacement, err := storage.NewReplicaPlacementFromString(req.Replication)
+ replicaPlacement, err := super_block.NewReplicaPlacementFromString(req.Replication)
if err != nil {
return nil, err
}
@@ -108,7 +109,7 @@ func (ms *MasterServer) Statistics(ctx context.Context, req *master_pb.Statistic
if req.Replication == "" {
req.Replication = ms.option.DefaultReplicaPlacement
}
- replicaPlacement, err := storage.NewReplicaPlacementFromString(req.Replication)
+ replicaPlacement, err := super_block.NewReplicaPlacementFromString(req.Replication)
if err != nil {
return nil, err
}
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index 33a5129da..a9ae6b888 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -1,7 +1,6 @@
package weed_server
import (
- "context"
"fmt"
"net/http"
"net/http/httputil"
@@ -14,6 +13,9 @@ import (
"time"
"github.com/chrislusf/raft"
+ "github.com/gorilla/mux"
+ "google.golang.org/grpc"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/security"
@@ -22,9 +24,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/topology"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/wdclient"
- "github.com/gorilla/mux"
- "github.com/spf13/viper"
- "google.golang.org/grpc"
)
const (
@@ -69,7 +68,7 @@ type MasterServer struct {
func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *MasterServer {
- v := viper.GetViper()
+ v := util.GetViper()
signingKey := v.GetString("jwt.signing.key")
v.SetDefault("jwt.signing.expires_after_seconds", 10)
expiresAfterSec := v.GetInt("jwt.signing.expires_after_seconds")
@@ -83,13 +82,13 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste
preallocateSize = int64(option.VolumeSizeLimitMB) * (1 << 20)
}
- grpcDialOption := security.LoadClientTLS(v.Sub("grpc"), "master")
+ grpcDialOption := security.LoadClientTLS(v, "grpc.master")
ms := &MasterServer{
option: option,
preallocateSize: preallocateSize,
clientChans: make(map[string]chan *master_pb.VolumeLocation),
grpcDialOption: grpcDialOption,
- MasterClient: wdclient.NewMasterClient(context.Background(), grpcDialOption, "master", peers),
+ MasterClient: wdclient.NewMasterClient(grpcDialOption, "master", 0, peers),
}
ms.bounedLeaderChan = make(chan int, 16)
@@ -115,9 +114,11 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste
r.HandleFunc("/vol/status", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeStatusHandler)))
r.HandleFunc("/vol/vacuum", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeVacuumHandler)))
r.HandleFunc("/submit", ms.guard.WhiteList(ms.submitFromMasterServerHandler))
- r.HandleFunc("/stats/health", ms.guard.WhiteList(statsHealthHandler))
- r.HandleFunc("/stats/counter", ms.guard.WhiteList(statsCounterHandler))
- r.HandleFunc("/stats/memory", ms.guard.WhiteList(statsMemoryHandler))
+ /*
+ r.HandleFunc("/stats/health", ms.guard.WhiteList(statsHealthHandler))
+ r.HandleFunc("/stats/counter", ms.guard.WhiteList(statsCounterHandler))
+ r.HandleFunc("/stats/memory", ms.guard.WhiteList(statsMemoryHandler))
+ */
r.HandleFunc("/{fileId}", ms.redirectHandler)
}
@@ -183,7 +184,7 @@ func (ms *MasterServer) proxyToLeader(f func(w http.ResponseWriter, r *http.Requ
func (ms *MasterServer) startAdminScripts() {
var err error
- v := viper.GetViper()
+ v := util.GetViper()
adminScripts := v.GetString("master.maintenance.scripts")
glog.V(0).Infof("adminScripts:\n%v", adminScripts)
if adminScripts == "" {
@@ -201,7 +202,7 @@ func (ms *MasterServer) startAdminScripts() {
masterAddress := "localhost:" + strconv.Itoa(ms.option.Port)
var shellOptions shell.ShellOptions
- shellOptions.GrpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "master")
+ shellOptions.GrpcDialOption = security.LoadClientTLS(v, "grpc.master")
shellOptions.Masters = &masterAddress
shellOptions.FilerHost, shellOptions.FilerPort, shellOptions.Directory, err = util.ParseFilerUrl(filerURL)
@@ -220,7 +221,7 @@ func (ms *MasterServer) startAdminScripts() {
commandEnv.MasterClient.WaitUntilConnected()
c := time.Tick(time.Duration(sleepMinutes) * time.Minute)
- for _ = range c {
+ for range c {
if ms.Topo.IsLeader() {
for _, line := range scriptLines {
@@ -250,7 +251,7 @@ func (ms *MasterServer) startAdminScripts() {
func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer {
var seq sequence.Sequencer
- v := viper.GetViper()
+ v := util.GetViper()
seqType := strings.ToLower(v.GetString(SequencerType))
glog.V(1).Infof("[%s] : [%s]", SequencerType, seqType)
switch strings.ToLower(seqType) {
diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go
index a5d976008..5d0986f97 100644
--- a/weed/server/master_server_handlers_admin.go
+++ b/weed/server/master_server_handlers_admin.go
@@ -10,9 +10,9 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
- "github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/backend/memory_map"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
"github.com/chrislusf/seaweedfs/weed/topology"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -61,7 +61,7 @@ func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Reque
return
}
}
- glog.Infoln("garbageThreshold =", gcThreshold)
+ // glog.Infoln("garbageThreshold =", gcThreshold)
ms.Topo.Vacuum(ms.grpcDialOption, gcThreshold, ms.preallocateSize)
ms.dirStatusHandler(w, r)
}
@@ -145,7 +145,7 @@ func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGr
if replicationString == "" {
replicationString = ms.option.DefaultReplicaPlacement
}
- replicaPlacement, err := storage.NewReplicaPlacementFromString(replicationString)
+ replicaPlacement, err := super_block.NewReplicaPlacementFromString(replicationString)
if err != nil {
return nil, err
}
diff --git a/weed/server/msg_broker_grpc_server.go b/weed/server/msg_broker_grpc_server.go
new file mode 100644
index 000000000..8b13aac76
--- /dev/null
+++ b/weed/server/msg_broker_grpc_server.go
@@ -0,0 +1,23 @@
+package weed_server
+
+import (
+ "context"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/queue_pb"
+)
+
+func (broker *MessageBroker) ConfigureTopic(context.Context, *queue_pb.ConfigureTopicRequest) (*queue_pb.ConfigureTopicResponse, error) {
+ panic("implement me")
+}
+
+func (broker *MessageBroker) DeleteTopic(context.Context, *queue_pb.DeleteTopicRequest) (*queue_pb.DeleteTopicResponse, error) {
+ panic("implement me")
+}
+
+func (broker *MessageBroker) StreamWrite(queue_pb.SeaweedQueue_StreamWriteServer) error {
+ panic("implement me")
+}
+
+func (broker *MessageBroker) StreamRead(*queue_pb.ReadMessageRequest, queue_pb.SeaweedQueue_StreamReadServer) error {
+ panic("implement me")
+}
diff --git a/weed/server/msg_broker_server.go b/weed/server/msg_broker_server.go
new file mode 100644
index 000000000..a9d908581
--- /dev/null
+++ b/weed/server/msg_broker_server.go
@@ -0,0 +1,121 @@
+package weed_server
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+type MessageBrokerOption struct {
+ Filers []string
+ DefaultReplication string
+ MaxMB int
+ Port int
+}
+
+type MessageBroker struct {
+ option *MessageBrokerOption
+ grpcDialOption grpc.DialOption
+}
+
+func NewMessageBroker(option *MessageBrokerOption) (messageBroker *MessageBroker, err error) {
+
+ messageBroker = &MessageBroker{
+ option: option,
+ grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.msg_broker"),
+ }
+
+ go messageBroker.loopForEver()
+
+ return messageBroker, nil
+}
+
+func (broker *MessageBroker) loopForEver() {
+
+ for {
+ broker.checkPeers()
+ time.Sleep(3 * time.Second)
+ }
+
+}
+
+func (broker *MessageBroker) checkPeers() {
+
+ // contact a filer about masters
+ var masters []string
+ for _, filer := range broker.option.Filers {
+ err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error {
+ resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
+ if err != nil {
+ return err
+ }
+ masters = append(masters, resp.Masters...)
+ return nil
+ })
+ if err != nil {
+ fmt.Printf("failed to read masters from %+v: %v\n", broker.option.Filers, err)
+ return
+ }
+ }
+
+ // contact each masters for filers
+ var filers []string
+ for _, master := range masters {
+ err := broker.withMasterClient(master, func(client master_pb.SeaweedClient) error {
+ resp, err := client.ListMasterClients(context.Background(), &master_pb.ListMasterClientsRequest{
+ ClientType: "filer",
+ })
+ if err != nil {
+ return err
+ }
+
+ fmt.Printf("filers: %+v\n", resp.GrpcAddresses)
+ filers = append(filers, resp.GrpcAddresses...)
+
+ return nil
+ })
+ if err != nil {
+ fmt.Printf("failed to list filers: %v\n", err)
+ return
+ }
+ }
+
+ // contact each filer about brokers
+ for _, filer := range filers {
+ err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error {
+ resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
+ if err != nil {
+ return err
+ }
+ masters = append(masters, resp.Masters...)
+ return nil
+ })
+ if err != nil {
+ fmt.Printf("failed to read masters from %+v: %v\n", broker.option.Filers, err)
+ return
+ }
+ }
+
+}
+
+func (broker *MessageBroker) withFilerClient(filer string, fn func(filer_pb.SeaweedFilerClient) error) error {
+
+ return pb.WithFilerClient(filer, broker.grpcDialOption, fn)
+
+}
+
+func (broker *MessageBroker) withMasterClient(master string, fn func(client master_pb.SeaweedClient) error) error {
+
+ return pb.WithMasterClient(master, broker.grpcDialOption, func(client master_pb.SeaweedClient) error {
+ return fn(client)
+ })
+
+}
diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go
index 53289f1c1..0381c7feb 100644
--- a/weed/server/raft_server.go
+++ b/weed/server/raft_server.go
@@ -2,8 +2,6 @@ package weed_server
import (
"encoding/json"
- "github.com/chrislusf/seaweedfs/weed/util"
- "google.golang.org/grpc"
"io/ioutil"
"os"
"path"
@@ -11,7 +9,12 @@ import (
"sort"
"time"
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/pb"
+
"github.com/chrislusf/raft"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/topology"
)
@@ -61,7 +64,7 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, d
s.raftServer.Start()
for _, peer := range s.peers {
- s.raftServer.AddPeer(peer, util.ServerToGrpcAddress(peer))
+ s.raftServer.AddPeer(peer, pb.ServerToGrpcAddress(peer))
}
s.GrpcServer = raft.NewGrpcServer(s.raftServer)
@@ -72,7 +75,7 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, d
_, err := s.raftServer.Do(&raft.DefaultJoinCommand{
Name: s.raftServer.Name(),
- ConnectionString: util.ServerToGrpcAddress(s.serverAddr),
+ ConnectionString: pb.ServerToGrpcAddress(s.serverAddr),
})
if err != nil {
diff --git a/weed/server/volume_grpc_admin.go b/weed/server/volume_grpc_admin.go
index c631d2535..27b21ac09 100644
--- a/weed/server/volume_grpc_admin.go
+++ b/weed/server/volume_grpc_admin.go
@@ -2,10 +2,14 @@ package weed_server
import (
"context"
+ "fmt"
+ "path/filepath"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
)
func (vs *VolumeServer) DeleteCollection(ctx context.Context, req *volume_server_pb.DeleteCollectionRequest) (*volume_server_pb.DeleteCollectionResponse, error) {
@@ -96,6 +100,41 @@ func (vs *VolumeServer) VolumeDelete(ctx context.Context, req *volume_server_pb.
}
+func (vs *VolumeServer) VolumeConfigure(ctx context.Context, req *volume_server_pb.VolumeConfigureRequest) (*volume_server_pb.VolumeConfigureResponse, error) {
+
+ resp := &volume_server_pb.VolumeConfigureResponse{}
+
+ // check replication format
+ if _, err := super_block.NewReplicaPlacementFromString(req.Replication); err != nil {
+ resp.Error = fmt.Sprintf("volume configure replication %v: %v", req, err)
+ return resp, nil
+ }
+
+ // unmount
+ if err := vs.store.UnmountVolume(needle.VolumeId(req.VolumeId)); err != nil {
+ glog.Errorf("volume configure unmount %v: %v", req, err)
+ resp.Error = fmt.Sprintf("volume configure unmount %v: %v", req, err)
+ return resp, nil
+ }
+
+ // modify the volume info file
+ if err := vs.store.ConfigureVolume(needle.VolumeId(req.VolumeId), req.Replication); err != nil {
+ glog.Errorf("volume configure %v: %v", req, err)
+ resp.Error = fmt.Sprintf("volume configure %v: %v", req, err)
+ return resp, nil
+ }
+
+ // mount
+ if err := vs.store.MountVolume(needle.VolumeId(req.VolumeId)); err != nil {
+ glog.Errorf("volume configure mount %v: %v", req, err)
+ resp.Error = fmt.Sprintf("volume configure mount %v: %v", req, err)
+ return resp, nil
+ }
+
+ return resp, nil
+
+}
+
func (vs *VolumeServer) VolumeMarkReadonly(ctx context.Context, req *volume_server_pb.VolumeMarkReadonlyRequest) (*volume_server_pb.VolumeMarkReadonlyResponse, error) {
resp := &volume_server_pb.VolumeMarkReadonlyResponse{}
@@ -111,3 +150,19 @@ func (vs *VolumeServer) VolumeMarkReadonly(ctx context.Context, req *volume_serv
return resp, err
}
+
+func (vs *VolumeServer) VolumeServerStatus(ctx context.Context, req *volume_server_pb.VolumeServerStatusRequest) (*volume_server_pb.VolumeServerStatusResponse, error) {
+
+ resp := &volume_server_pb.VolumeServerStatusResponse{}
+
+ for _, loc := range vs.store.Locations {
+ if dir, e := filepath.Abs(loc.Directory); e == nil {
+ resp.DiskStatuses = append(resp.DiskStatuses, stats.NewDiskStatus(dir))
+ }
+ }
+
+ resp.MemoryStatus = stats.MemStat()
+
+ return resp, nil
+
+}
diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go
index 731675b48..1f4d9df10 100644
--- a/weed/server/volume_grpc_client_to_master.go
+++ b/weed/server/volume_grpc_client_to_master.go
@@ -5,15 +5,18 @@ import (
"net"
"time"
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
- "github.com/spf13/viper"
- "google.golang.org/grpc"
+
+ "golang.org/x/net/context"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/util"
- "golang.org/x/net/context"
)
func (vs *VolumeServer) GetMaster() string {
@@ -25,7 +28,7 @@ func (vs *VolumeServer) heartbeat() {
vs.store.SetDataCenter(vs.dataCenter)
vs.store.SetRack(vs.rack)
- grpcDialOption := security.LoadClientTLS(viper.Sub("grpc"), "volume")
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.volume")
var err error
var newLeader string
@@ -34,13 +37,13 @@ func (vs *VolumeServer) heartbeat() {
if newLeader != "" {
master = newLeader
}
- masterGrpcAddress, parseErr := util.ParseServerToGrpcAddress(master)
+ masterGrpcAddress, parseErr := pb.ParseServerToGrpcAddress(master)
if parseErr != nil {
glog.V(0).Infof("failed to parse master grpc %v: %v", masterGrpcAddress, parseErr)
continue
}
vs.store.MasterAddress = master
- newLeader, err = vs.doHeartbeat(context.Background(), master, masterGrpcAddress, grpcDialOption, time.Duration(vs.pulseSeconds)*time.Second)
+ newLeader, err = vs.doHeartbeat(master, masterGrpcAddress, grpcDialOption, time.Duration(vs.pulseSeconds)*time.Second)
if err != nil {
glog.V(0).Infof("heartbeat error: %v", err)
time.Sleep(time.Duration(vs.pulseSeconds) * time.Second)
@@ -51,16 +54,16 @@ func (vs *VolumeServer) heartbeat() {
}
}
-func (vs *VolumeServer) doHeartbeat(ctx context.Context, masterNode, masterGrpcAddress string, grpcDialOption grpc.DialOption, sleepInterval time.Duration) (newLeader string, err error) {
+func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDialOption grpc.DialOption, sleepInterval time.Duration) (newLeader string, err error) {
- grpcConection, err := util.GrpcDial(ctx, masterGrpcAddress, grpcDialOption)
+ grpcConection, err := pb.GrpcDial(context.Background(), masterGrpcAddress, grpcDialOption)
if err != nil {
return "", fmt.Errorf("fail to dial %s : %v", masterNode, err)
}
defer grpcConection.Close()
client := master_pb.NewSeaweedClient(grpcConection)
- stream, err := client.SendHeartbeat(ctx)
+ stream, err := client.SendHeartbeat(context.Background())
if err != nil {
glog.V(0).Infof("SendHeartbeat to %s: %v", masterNode, err)
return "", err
@@ -90,6 +93,9 @@ func (vs *VolumeServer) doHeartbeat(ctx context.Context, masterNode, masterGrpcA
vs.MetricsAddress = in.GetMetricsAddress()
vs.MetricsIntervalSec = int(in.GetMetricsIntervalSeconds())
}
+ if len(in.StorageBackends) > 0 {
+ backend.LoadFromPbStorageBackends(in.StorageBackends)
+ }
}
}()
diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go
index 711a3ebad..5cc75e74c 100644
--- a/weed/server/volume_grpc_copy.go
+++ b/weed/server/volume_grpc_copy.go
@@ -20,7 +20,7 @@ import (
const BufferSizeLimit = 1024 * 1024 * 2
-// VolumeCopy copy the .idx .dat files, and mount the volume
+// VolumeCopy copy the .idx .dat .vif files, and mount the volume
func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.VolumeCopyRequest) (*volume_server_pb.VolumeCopyResponse, error) {
v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
@@ -43,7 +43,7 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
var volumeFileName, idxFileName, datFileName string
err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
var err error
- volFileInfoResp, err = client.ReadVolumeFileStatus(ctx,
+ volFileInfoResp, err = client.ReadVolumeFileStatus(context.Background(),
&volume_server_pb.ReadVolumeFileStatusRequest{
VolumeId: req.VolumeId,
})
@@ -55,11 +55,15 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
// println("source:", volFileInfoResp.String())
// copy ecx file
- if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, volumeFileName, ".idx", false); err != nil {
+ if err := vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, volumeFileName, ".idx", false, false); err != nil {
return err
}
- if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".dat", false); err != nil {
+ if err := vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".dat", false, true); err != nil {
+ return err
+ }
+
+ if err := vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".vif", false, true); err != nil {
return err
}
@@ -70,12 +74,9 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
datFileName = volumeFileName + ".dat"
if err != nil && volumeFileName != "" {
- if idxFileName != "" {
- os.Remove(idxFileName)
- }
- if datFileName != "" {
- os.Remove(datFileName)
- }
+ os.Remove(idxFileName)
+ os.Remove(datFileName)
+ os.Remove(volumeFileName + ".vif")
return nil, err
}
@@ -94,16 +95,16 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
}, err
}
-func (vs *VolumeServer) doCopyFile(ctx context.Context, client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid uint32,
- compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend bool) error {
+func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool) error {
- copyFileClient, err := client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{
- VolumeId: vid,
- Ext: ext,
- CompactionRevision: compactRevision,
- StopOffset: stopOffset,
- Collection: collection,
- IsEcVolume: isEcVolume,
+ copyFileClient, err := client.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{
+ VolumeId: vid,
+ Ext: ext,
+ CompactionRevision: compactRevision,
+ StopOffset: stopOffset,
+ Collection: collection,
+ IsEcVolume: isEcVolume,
+ IgnoreSourceFileNotFound: ignoreSourceFileNotFound,
})
if err != nil {
return fmt.Errorf("failed to start copying volume %d %s file: %v", vid, ext, err)
@@ -213,6 +214,9 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v
}
}
if fileName == "" {
+ if req.IgnoreSourceFileNotFound {
+ return nil
+ }
return fmt.Errorf("CopyFile not found ec volume id %d", req.VolumeId)
}
}
@@ -221,6 +225,9 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v
file, err := os.Open(fileName)
if err != nil {
+ if req.IgnoreSourceFileNotFound && err == os.ErrNotExist {
+ return nil
+ }
return err
}
defer file.Close()
diff --git a/weed/server/volume_grpc_copy_incremental.go b/weed/server/volume_grpc_copy_incremental.go
index 6c5bb8a62..6d6c3daa3 100644
--- a/weed/server/volume_grpc_copy_incremental.go
+++ b/weed/server/volume_grpc_copy_incremental.go
@@ -47,7 +47,7 @@ func (vs *VolumeServer) VolumeSyncStatus(ctx context.Context, req *volume_server
}
-func sendFileContent(datBackend backend.DataStorageBackend, buf []byte, startOffset, stopOffset int64, stream volume_server_pb.VolumeServer_VolumeIncrementalCopyServer) error {
+func sendFileContent(datBackend backend.BackendStorageFile, buf []byte, startOffset, stopOffset int64, stream volume_server_pb.VolumeServer_VolumeIncrementalCopyServer) error {
var blockSizeLimit = int64(len(buf))
for i := int64(0); i < stopOffset-startOffset; i += blockSizeLimit {
n, readErr := datBackend.ReadAt(buf, startOffset+i)
diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go
index 242480197..66dd5bf8d 100644
--- a/weed/server/volume_grpc_erasure_coding.go
+++ b/weed/server/volume_grpc_erasure_coding.go
@@ -8,10 +8,12 @@ import (
"math"
"os"
"path"
+ "path/filepath"
"strings"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
@@ -24,7 +26,7 @@ import (
Steps to apply erasure coding to .dat .idx files
0. ensure the volume is readonly
-1. client call VolumeEcShardsGenerate to generate the .ecx and .ec01~.ec14 files
+1. client call VolumeEcShardsGenerate to generate the .ecx and .ec00 ~ .ec13 files
2. client ask master for possible servers to hold the ec files, at least 4 servers
3. client call VolumeEcShardsCopy on above target servers to copy ec files from the source server
4. target servers report the new ec files to the master
@@ -33,7 +35,7 @@ Steps to apply erasure coding to .dat .idx files
*/
-// VolumeEcShardsGenerate generates the .ecx and .ec01 ~ .ec14 files
+// VolumeEcShardsGenerate generates the .ecx and .ec00 ~ .ec13 files
func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_server_pb.VolumeEcShardsGenerateRequest) (*volume_server_pb.VolumeEcShardsGenerateResponse, error) {
v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
@@ -47,19 +49,24 @@ func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_
}
// write .ecx file
- if err := erasure_coding.WriteSortedEcxFile(baseFileName); err != nil {
- return nil, fmt.Errorf("WriteSortedEcxFile %s: %v", baseFileName, err)
+ if err := erasure_coding.WriteSortedFileFromIdx(baseFileName, ".ecx"); err != nil {
+ return nil, fmt.Errorf("WriteSortedFileFromIdx %s: %v", baseFileName, err)
}
- // write .ec01 ~ .ec14 files
+ // write .ec00 ~ .ec13 files
if err := erasure_coding.WriteEcFiles(baseFileName); err != nil {
return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err)
}
+ // write .vif files
+ if err := pb.SaveVolumeInfo(baseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(v.Version())}); err != nil {
+ return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err)
+ }
+
return &volume_server_pb.VolumeEcShardsGenerateResponse{}, nil
}
-// VolumeEcShardsRebuild generates the any of the missing .ec01 ~ .ec14 files
+// VolumeEcShardsRebuild generates the any of the missing .ec00 ~ .ec13 files
func (vs *VolumeServer) VolumeEcShardsRebuild(ctx context.Context, req *volume_server_pb.VolumeEcShardsRebuildRequest) (*volume_server_pb.VolumeEcShardsRebuildResponse, error) {
baseFileName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId))
@@ -68,7 +75,7 @@ func (vs *VolumeServer) VolumeEcShardsRebuild(ctx context.Context, req *volume_s
for _, location := range vs.store.Locations {
if util.FileExists(path.Join(location.Directory, baseFileName+".ecx")) {
- // write .ec01 ~ .ec14 files
+ // write .ec00 ~ .ec13 files
baseFileName = path.Join(location.Directory, baseFileName)
if generatedShardIds, err := erasure_coding.RebuildEcFiles(baseFileName); err != nil {
return nil, fmt.Errorf("RebuildEcFiles %s: %v", baseFileName, err)
@@ -103,23 +110,32 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv
// copy ec data slices
for _, shardId := range req.ShardIds {
- if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, erasure_coding.ToExt(int(shardId)), false); err != nil {
+ if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, erasure_coding.ToExt(int(shardId)), false, false); err != nil {
return err
}
}
- if !req.CopyEcxFile {
+ if req.CopyEcxFile {
+
+ // copy ecx file
+ if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecx", false, false); err != nil {
+ return err
+ }
return nil
}
- // copy ecx file
- if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecx", false); err != nil {
- return err
+ if req.CopyEcjFile {
+ // copy ecj file
+ if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecj", true, true); err != nil {
+ return err
+ }
}
- // copy ecj file
- if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecj", true); err != nil {
- return err
+ if req.CopyVifFile {
+ // copy vif file
+ if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".vif", false, true); err != nil {
+ return err
+ }
}
return nil
@@ -137,6 +153,8 @@ func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_se
baseFilename := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId))
+ glog.V(0).Infof("ec volume %d shard delete %v", req.VolumeId, req.ShardIds)
+
found := false
for _, location := range vs.store.Locations {
if util.FileExists(path.Join(location.Directory, baseFilename+".ecx")) {
@@ -153,21 +171,27 @@ func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_se
return nil, nil
}
- // check whether to delete the ecx file also
+ // check whether to delete the .ecx and .ecj file also
hasEcxFile := false
+ hasIdxFile := false
existingShardCount := 0
+ bName := filepath.Base(baseFilename)
for _, location := range vs.store.Locations {
fileInfos, err := ioutil.ReadDir(location.Directory)
if err != nil {
continue
}
for _, fileInfo := range fileInfos {
- if fileInfo.Name() == baseFilename+".ecx" {
+ if fileInfo.Name() == bName+".ecx" || fileInfo.Name() == bName+".ecj" {
hasEcxFile = true
continue
}
- if strings.HasPrefix(fileInfo.Name(), baseFilename+".ec") {
+ if fileInfo.Name() == bName+".idx" {
+ hasIdxFile = true
+ continue
+ }
+ if strings.HasPrefix(fileInfo.Name(), bName+".ec") {
existingShardCount++
}
}
@@ -181,6 +205,10 @@ func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_se
return nil, err
}
}
+ if !hasIdxFile {
+ // .vif is used for ec volumes and normal volumes
+ os.Remove(baseFilename + ".vif")
+ }
return &volume_server_pb.VolumeEcShardsDeleteResponse{}, nil
}
@@ -317,3 +345,35 @@ func (vs *VolumeServer) VolumeEcBlobDelete(ctx context.Context, req *volume_serv
return resp, nil
}
+
+// VolumeEcShardsToVolume generates the .idx, .dat files from .ecx, .ecj and .ec01 ~ .ec14 files
+func (vs *VolumeServer) VolumeEcShardsToVolume(ctx context.Context, req *volume_server_pb.VolumeEcShardsToVolumeRequest) (*volume_server_pb.VolumeEcShardsToVolumeResponse, error) {
+
+ v, found := vs.store.FindEcVolume(needle.VolumeId(req.VolumeId))
+ if !found {
+ return nil, fmt.Errorf("ec volume %d not found", req.VolumeId)
+ }
+ baseFileName := v.FileName()
+
+ if v.Collection != req.Collection {
+ return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
+ }
+
+ // calculate .dat file size
+ datFileSize, err := erasure_coding.FindDatFileSize(baseFileName)
+ if err != nil {
+ return nil, fmt.Errorf("FindDatFileSize %s: %v", baseFileName, err)
+ }
+
+ // write .dat file from .ec00 ~ .ec09 files
+ if err := erasure_coding.WriteDatFile(baseFileName, datFileSize); err != nil {
+ return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err)
+ }
+
+ // write .idx file from .ecx and .ecj files
+ if err := erasure_coding.WriteIdxFileFromEcIndex(baseFileName); err != nil {
+ return nil, fmt.Errorf("WriteIdxFileFromEcIndex %s: %v", baseFileName, err)
+ }
+
+ return &volume_server_pb.VolumeEcShardsToVolumeResponse{}, nil
+}
diff --git a/weed/server/volume_grpc_file.go b/weed/server/volume_grpc_file.go
new file mode 100644
index 000000000..4d71ddeb1
--- /dev/null
+++ b/weed/server/volume_grpc_file.go
@@ -0,0 +1,129 @@
+package weed_server
+
+import (
+ "encoding/json"
+ "net/http"
+ "strings"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func (vs *VolumeServer) FileGet(req *volume_server_pb.FileGetRequest, stream volume_server_pb.VolumeServer_FileGetServer) error {
+
+ headResponse := &volume_server_pb.FileGetResponse{}
+ n := new(needle.Needle)
+
+ commaIndex := strings.LastIndex(req.FileId, ",")
+ vid := req.FileId[:commaIndex]
+ fid := req.FileId[commaIndex+1:]
+
+ volumeId, err := needle.NewVolumeId(vid)
+ if err != nil {
+ headResponse.ErrorCode = http.StatusBadRequest
+ return stream.Send(headResponse)
+ }
+ err = n.ParsePath(fid)
+ if err != nil {
+ headResponse.ErrorCode = http.StatusBadRequest
+ return stream.Send(headResponse)
+ }
+
+ hasVolume := vs.store.HasVolume(volumeId)
+ _, hasEcVolume := vs.store.FindEcVolume(volumeId)
+
+ if !hasVolume && !hasEcVolume {
+ headResponse.ErrorCode = http.StatusMovedPermanently
+ return stream.Send(headResponse)
+ }
+
+ cookie := n.Cookie
+ var count int
+ if hasVolume {
+ count, err = vs.store.ReadVolumeNeedle(volumeId, n)
+ } else if hasEcVolume {
+ count, err = vs.store.ReadEcShardNeedle(volumeId, n)
+ }
+
+ if err != nil || count < 0 {
+ headResponse.ErrorCode = http.StatusNotFound
+ return stream.Send(headResponse)
+ }
+ if n.Cookie != cookie {
+ headResponse.ErrorCode = http.StatusNotFound
+ return stream.Send(headResponse)
+ }
+
+ if n.LastModified != 0 {
+ headResponse.LastModified = n.LastModified
+ }
+
+ headResponse.Etag = n.Etag()
+
+ if n.HasPairs() {
+ pairMap := make(map[string]string)
+ err = json.Unmarshal(n.Pairs, &pairMap)
+ if err != nil {
+ glog.V(0).Infoln("Unmarshal pairs error:", err)
+ }
+ headResponse.Headers = pairMap
+ }
+
+ /*
+ // skip this, no redirection
+ if vs.tryHandleChunkedFile(n, filename, w, r) {
+ return
+ }
+ */
+
+ if n.NameSize > 0 {
+ headResponse.Filename = string(n.Name)
+ }
+ mtype := ""
+ if n.MimeSize > 0 {
+ mt := string(n.Mime)
+ if !strings.HasPrefix(mt, "application/octet-stream") {
+ mtype = mt
+ }
+ }
+ headResponse.ContentType = mtype
+
+ headResponse.IsGzipped = n.IsGzipped()
+
+ if n.IsGzipped() && req.AcceptGzip {
+ if n.Data, err = util.UnGzipData(n.Data); err != nil {
+ glog.V(0).Infof("ungzip %s error: %v", req.FileId, err)
+ }
+ }
+
+ headResponse.ContentLength = uint32(len(n.Data))
+ bytesToRead := len(n.Data)
+ bytesRead := 0
+
+ t := headResponse
+
+ for bytesRead < bytesToRead {
+
+ stopIndex := bytesRead + BufferSizeLimit
+ if stopIndex > bytesToRead {
+ stopIndex = bytesToRead
+ }
+
+ if t == nil {
+ t = &volume_server_pb.FileGetResponse{}
+ }
+ t.Data = n.Data[bytesRead:stopIndex]
+
+ err = stream.Send(t)
+ t = nil
+ if err != nil {
+ return err
+ }
+
+ bytesRead = stopIndex
+ }
+
+ return nil
+}
diff --git a/weed/server/volume_grpc_tail.go b/weed/server/volume_grpc_tail.go
index 1bf61e1c7..c26d6ed8f 100644
--- a/weed/server/volume_grpc_tail.go
+++ b/weed/server/volume_grpc_tail.go
@@ -10,6 +10,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
)
func (vs *VolumeServer) VolumeTailSender(req *volume_server_pb.VolumeTailSenderRequest, stream volume_server_pb.VolumeServer_VolumeTailSenderServer) error {
@@ -101,7 +102,7 @@ type VolumeFileScanner4Tailing struct {
lastProcessedTimestampNs uint64
}
-func (scanner *VolumeFileScanner4Tailing) VisitSuperBlock(superBlock storage.SuperBlock) error {
+func (scanner *VolumeFileScanner4Tailing) VisitSuperBlock(superBlock super_block.SuperBlock) error {
return nil
}
diff --git a/weed/server/volume_grpc_tier_download.go b/weed/server/volume_grpc_tier_download.go
new file mode 100644
index 000000000..7b3982e40
--- /dev/null
+++ b/weed/server/volume_grpc_tier_download.go
@@ -0,0 +1,85 @@
+package weed_server
+
+import (
+ "fmt"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+)
+
+// VolumeTierMoveDatFromRemote copy dat file from a remote tier to local volume server
+func (vs *VolumeServer) VolumeTierMoveDatFromRemote(req *volume_server_pb.VolumeTierMoveDatFromRemoteRequest, stream volume_server_pb.VolumeServer_VolumeTierMoveDatFromRemoteServer) error {
+
+ // find existing volume
+ v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
+ if v == nil {
+ return fmt.Errorf("volume %d not found", req.VolumeId)
+ }
+
+ // verify the collection
+ if v.Collection != req.Collection {
+ return fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
+ }
+
+ // locate the disk file
+ storageName, storageKey := v.RemoteStorageNameKey()
+ if storageName == "" || storageKey == "" {
+ return fmt.Errorf("volume %d is already on local disk", req.VolumeId)
+ }
+
+ // check whether the local .dat already exists
+ _, ok := v.DataBackend.(*backend.DiskFile)
+ if ok {
+ return fmt.Errorf("volume %d is already on local disk", req.VolumeId)
+ }
+
+ // check valid storage backend type
+ backendStorage, found := backend.BackendStorages[storageName]
+ if !found {
+ var keys []string
+ for key := range backend.BackendStorages {
+ keys = append(keys, key)
+ }
+ return fmt.Errorf("remote storage %s not found from suppported: %v", storageName, keys)
+ }
+
+ startTime := time.Now()
+ fn := func(progressed int64, percentage float32) error {
+ now := time.Now()
+ if now.Sub(startTime) < time.Second {
+ return nil
+ }
+ startTime = now
+ return stream.Send(&volume_server_pb.VolumeTierMoveDatFromRemoteResponse{
+ Processed: progressed,
+ ProcessedPercentage: percentage,
+ })
+ }
+ // copy the data file
+ _, err := backendStorage.DownloadFile(v.FileName()+".dat", storageKey, fn)
+ if err != nil {
+ return fmt.Errorf("backend %s copy file %s: %v", storageName, v.FileName()+".dat", err)
+ }
+
+ if req.KeepRemoteDatFile {
+ return nil
+ }
+
+ // remove remote file
+ if err := backendStorage.DeleteFile(storageKey); err != nil {
+ return fmt.Errorf("volume %d fail to delete remote file %s: %v", v.Id, storageKey, err)
+ }
+
+ // forget remote file
+ v.GetVolumeInfo().Files = v.GetVolumeInfo().Files[1:]
+ if err := v.SaveVolumeInfo(); err != nil {
+ return fmt.Errorf("volume %d fail to save remote file info: %v", v.Id, err)
+ }
+
+ v.DataBackend.Close()
+ v.DataBackend = nil
+
+ return nil
+}
diff --git a/weed/server/volume_grpc_tier_upload.go b/weed/server/volume_grpc_tier_upload.go
new file mode 100644
index 000000000..c9694df59
--- /dev/null
+++ b/weed/server/volume_grpc_tier_upload.go
@@ -0,0 +1,100 @@
+package weed_server
+
+import (
+ "fmt"
+ "os"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+)
+
+// VolumeTierMoveDatToRemote copy dat file to a remote tier
+func (vs *VolumeServer) VolumeTierMoveDatToRemote(req *volume_server_pb.VolumeTierMoveDatToRemoteRequest, stream volume_server_pb.VolumeServer_VolumeTierMoveDatToRemoteServer) error {
+
+ // find existing volume
+ v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
+ if v == nil {
+ return fmt.Errorf("volume %d not found", req.VolumeId)
+ }
+
+ // verify the collection
+ if v.Collection != req.Collection {
+ return fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
+ }
+
+ // locate the disk file
+ diskFile, ok := v.DataBackend.(*backend.DiskFile)
+ if !ok {
+ return fmt.Errorf("volume %d is not on local disk", req.VolumeId)
+ }
+
+ // check valid storage backend type
+ backendStorage, found := backend.BackendStorages[req.DestinationBackendName]
+ if !found {
+ var keys []string
+ for key := range backend.BackendStorages {
+ keys = append(keys, key)
+ }
+ return fmt.Errorf("destination %s not found, suppported: %v", req.DestinationBackendName, keys)
+ }
+
+ // check whether the existing backend storage is the same as requested
+ // if same, skip
+ backendType, backendId := backend.BackendNameToTypeId(req.DestinationBackendName)
+ for _, remoteFile := range v.GetVolumeInfo().GetFiles() {
+ if remoteFile.BackendType == backendType && remoteFile.BackendId == backendId {
+ return fmt.Errorf("destination %s already exists", req.DestinationBackendName)
+ }
+ }
+
+ startTime := time.Now()
+ fn := func(progressed int64, percentage float32) error {
+ now := time.Now()
+ if now.Sub(startTime) < time.Second {
+ return nil
+ }
+ startTime = now
+ return stream.Send(&volume_server_pb.VolumeTierMoveDatToRemoteResponse{
+ Processed: progressed,
+ ProcessedPercentage: percentage,
+ })
+ }
+
+ // remember the file original source
+ attributes := make(map[string]string)
+ attributes["volumeId"] = v.Id.String()
+ attributes["collection"] = v.Collection
+ attributes["ext"] = ".dat"
+ // copy the data file
+ key, size, err := backendStorage.CopyFile(diskFile.File, attributes, fn)
+ if err != nil {
+ return fmt.Errorf("backend %s copy file %s: %v", req.DestinationBackendName, diskFile.Name(), err)
+ }
+
+ // save the remote file to volume tier info
+ v.GetVolumeInfo().Files = append(v.GetVolumeInfo().GetFiles(), &volume_server_pb.RemoteFile{
+ BackendType: backendType,
+ BackendId: backendId,
+ Key: key,
+ Offset: 0,
+ FileSize: uint64(size),
+ ModifiedTime: uint64(time.Now().Unix()),
+ Extension: ".dat",
+ })
+
+ if err := v.SaveVolumeInfo(); err != nil {
+ return fmt.Errorf("volume %d fail to save remote file info: %v", v.Id, err)
+ }
+
+ if err := v.LoadRemoteFile(); err != nil {
+ return fmt.Errorf("volume %d fail to load remote file: %v", v.Id, err)
+ }
+
+ if !req.KeepLocalDatFile {
+ os.Remove(v.FileName() + ".dat")
+ }
+
+ return nil
+}
diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go
index 6cf654738..2d716edc1 100644
--- a/weed/server/volume_server.go
+++ b/weed/server/volume_server.go
@@ -4,13 +4,14 @@ import (
"fmt"
"net/http"
- "github.com/chrislusf/seaweedfs/weed/stats"
"google.golang.org/grpc"
+ "github.com/chrislusf/seaweedfs/weed/stats"
+ "github.com/chrislusf/seaweedfs/weed/util"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/storage"
- "github.com/spf13/viper"
)
type VolumeServer struct {
@@ -29,6 +30,7 @@ type VolumeServer struct {
compactionBytePerSecond int64
MetricsAddress string
MetricsIntervalSec int
+ fileSizeLimitBytes int64
}
func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
@@ -41,9 +43,10 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
fixJpgOrientation bool,
readRedirect bool,
compactionMBPerSecond int,
+ fileSizeLimitMB int,
) *VolumeServer {
- v := viper.GetViper()
+ v := util.GetViper()
signingKey := v.GetString("jwt.signing.key")
v.SetDefault("jwt.signing.expires_after_seconds", 10)
expiresAfterSec := v.GetInt("jwt.signing.expires_after_seconds")
@@ -60,8 +63,9 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
needleMapKind: needleMapKind,
FixJpgOrientation: fixJpgOrientation,
ReadRedirect: readRedirect,
- grpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "volume"),
+ grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.volume"),
compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024,
+ fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024,
}
vs.SeedMasterNodes = masterNodes
vs.store = storage.NewStore(vs.grpcDialOption, port, ip, publicUrl, folders, maxCounts, vs.needleMapKind)
@@ -73,9 +77,11 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
// only expose the volume server details for safe environments
adminMux.HandleFunc("/ui/index.html", vs.uiStatusHandler)
adminMux.HandleFunc("/status", vs.guard.WhiteList(vs.statusHandler))
- adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler))
- adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler))
- adminMux.HandleFunc("/stats/disk", vs.guard.WhiteList(vs.statsDiskHandler))
+ /*
+ adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler))
+ adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler))
+ adminMux.HandleFunc("/stats/disk", vs.guard.WhiteList(vs.statsDiskHandler))
+ */
}
adminMux.HandleFunc("/", vs.privateStoreHandler)
if publicMux != adminMux {
diff --git a/weed/server/volume_server_handlers_admin.go b/weed/server/volume_server_handlers_admin.go
index 25b6582f7..89bc051c5 100644
--- a/weed/server/volume_server_handlers_admin.go
+++ b/weed/server/volume_server_handlers_admin.go
@@ -12,7 +12,14 @@ import (
func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) {
m := make(map[string]interface{})
m["Version"] = util.VERSION
- m["Volumes"] = vs.store.Status()
+ var ds []*volume_server_pb.DiskStatus
+ for _, loc := range vs.store.Locations {
+ if dir, e := filepath.Abs(loc.Directory); e == nil {
+ ds = append(ds, stats.NewDiskStatus(dir))
+ }
+ }
+ m["DiskStatuses"] = ds
+ m["Volumes"] = vs.store.VolumeInfos()
writeJsonQuiet(w, r, http.StatusOK, m)
}
diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go
index cd11356b9..6e603d158 100644
--- a/weed/server/volume_server_handlers_read.go
+++ b/weed/server/volume_server_handlers_read.go
@@ -2,21 +2,18 @@ package weed_server
import (
"bytes"
- "context"
+ "encoding/json"
"errors"
"fmt"
"io"
"mime"
- "mime/multipart"
"net/http"
"net/url"
- "path"
+ "path/filepath"
"strconv"
"strings"
"time"
- "encoding/json"
-
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/images"
"github.com/chrislusf/seaweedfs/weed/operation"
@@ -54,7 +51,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
return
}
- glog.V(4).Infoln("volume", volumeId, "reading", n)
+ // glog.V(4).Infoln("volume", volumeId, "reading", n)
hasVolume := vs.store.HasVolume(volumeId)
_, hasEcVolume := vs.store.FindEcVolume(volumeId)
if !hasVolume && !hasEcVolume {
@@ -86,9 +83,9 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
if hasVolume {
count, err = vs.store.ReadVolumeNeedle(volumeId, n)
} else if hasEcVolume {
- count, err = vs.store.ReadEcShardNeedle(context.Background(), volumeId, n)
+ count, err = vs.store.ReadEcShardNeedle(volumeId, n)
}
- glog.V(4).Infoln("read bytes", count, "error", err)
+ // glog.V(4).Infoln("read bytes", count, "error", err)
if err != nil || count < 0 {
glog.V(0).Infof("read %s isNormalVolume %v error: %v", r.URL.Path, hasVolume, err)
w.WriteHeader(http.StatusNotFound)
@@ -114,11 +111,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
w.WriteHeader(http.StatusNotModified)
return
}
- if r.Header.Get("ETag-MD5") == "True" {
- setEtag(w, n.MD5())
- } else {
- setEtag(w, n.Etag())
- }
+ setEtag(w, n.Etag())
if n.HasPairs() {
pairMap := make(map[string]string)
@@ -138,7 +131,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
if n.NameSize > 0 && filename == "" {
filename = string(n.Name)
if ext == "" {
- ext = path.Ext(filename)
+ ext = filepath.Ext(filename)
}
}
mtype := ""
@@ -182,7 +175,7 @@ func (vs *VolumeServer) tryHandleChunkedFile(n *needle.Needle, fileName string,
fileName = chunkManifest.Name
}
- ext := path.Ext(fileName)
+ ext := filepath.Ext(fileName)
mType := ""
if chunkManifest.Mime != "" {
@@ -229,113 +222,28 @@ func conditionallyResizeImages(originalDataReaderSeeker io.ReadSeeker, ext strin
func writeResponseContent(filename, mimeType string, rs io.ReadSeeker, w http.ResponseWriter, r *http.Request) error {
totalSize, e := rs.Seek(0, 2)
if mimeType == "" {
- if ext := path.Ext(filename); ext != "" {
+ if ext := filepath.Ext(filename); ext != "" {
mimeType = mime.TypeByExtension(ext)
}
}
if mimeType != "" {
w.Header().Set("Content-Type", mimeType)
}
- if filename != "" {
- contentDisposition := "inline"
- if r.FormValue("dl") != "" {
- if dl, _ := strconv.ParseBool(r.FormValue("dl")); dl {
- contentDisposition = "attachment"
- }
- }
- w.Header().Set("Content-Disposition", contentDisposition+`; filename="`+fileNameEscaper.Replace(filename)+`"`)
- }
w.Header().Set("Accept-Ranges", "bytes")
+
if r.Method == "HEAD" {
w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10))
return nil
}
- rangeReq := r.Header.Get("Range")
- if rangeReq == "" {
- w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10))
- if _, e = rs.Seek(0, 0); e != nil {
- return e
- }
- _, e = io.Copy(w, rs)
- return e
- }
- //the rest is dealing with partial content request
- //mostly copy from src/pkg/net/http/fs.go
- ranges, err := parseRange(rangeReq, totalSize)
- if err != nil {
- http.Error(w, err.Error(), http.StatusRequestedRangeNotSatisfiable)
- return nil
- }
- if sumRangesSize(ranges) > totalSize {
- // The total number of bytes in all the ranges
- // is larger than the size of the file by
- // itself, so this is probably an attack, or a
- // dumb client. Ignore the range request.
- return nil
- }
- if len(ranges) == 0 {
- return nil
- }
- if len(ranges) == 1 {
- // RFC 2616, Section 14.16:
- // "When an HTTP message includes the content of a single
- // range (for example, a response to a request for a
- // single range, or to a request for a set of ranges
- // that overlap without any holes), this content is
- // transmitted with a Content-Range header, and a
- // Content-Length header showing the number of bytes
- // actually transferred.
- // ...
- // A response to a request for a single range MUST NOT
- // be sent using the multipart/byteranges media type."
- ra := ranges[0]
- w.Header().Set("Content-Length", strconv.FormatInt(ra.length, 10))
- w.Header().Set("Content-Range", ra.contentRange(totalSize))
- w.WriteHeader(http.StatusPartialContent)
- if _, e = rs.Seek(ra.start, 0); e != nil {
+ adjustHeadersAfterHEAD(w, r, filename)
+
+ processRangeRequst(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error {
+ if _, e = rs.Seek(offset, 0); e != nil {
return e
}
-
- _, e = io.CopyN(w, rs, ra.length)
+ _, e = io.CopyN(writer, rs, size)
return e
- }
- // process multiple ranges
- for _, ra := range ranges {
- if ra.start > totalSize {
- http.Error(w, "Out of Range", http.StatusRequestedRangeNotSatisfiable)
- return nil
- }
- }
- sendSize := rangesMIMESize(ranges, mimeType, totalSize)
- pr, pw := io.Pipe()
- mw := multipart.NewWriter(pw)
- w.Header().Set("Content-Type", "multipart/byteranges; boundary="+mw.Boundary())
- sendContent := pr
- defer pr.Close() // cause writing goroutine to fail and exit if CopyN doesn't finish.
- go func() {
- for _, ra := range ranges {
- part, e := mw.CreatePart(ra.mimeHeader(mimeType, totalSize))
- if e != nil {
- pw.CloseWithError(e)
- return
- }
- if _, e = rs.Seek(ra.start, 0); e != nil {
- pw.CloseWithError(e)
- return
- }
- if _, e = io.CopyN(part, rs, ra.length); e != nil {
- pw.CloseWithError(e)
- return
- }
- }
- mw.Close()
- pw.Close()
- }()
- if w.Header().Get("Content-Encoding") == "" {
- w.Header().Set("Content-Length", strconv.FormatInt(sendSize, 10))
- }
- w.WriteHeader(http.StatusPartialContent)
- _, e = io.CopyN(w, sendContent, sendSize)
- return e
+ })
+ return nil
}
diff --git a/weed/server/volume_server_handlers_ui.go b/weed/server/volume_server_handlers_ui.go
index 852f0b751..8d35c9c8b 100644
--- a/weed/server/volume_server_handlers_ui.go
+++ b/weed/server/volume_server_handlers_ui.go
@@ -8,6 +8,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
ui "github.com/chrislusf/seaweedfs/weed/server/volume_server_ui"
"github.com/chrislusf/seaweedfs/weed/stats"
+ "github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -20,19 +21,30 @@ func (vs *VolumeServer) uiStatusHandler(w http.ResponseWriter, r *http.Request)
ds = append(ds, stats.NewDiskStatus(dir))
}
}
+ volumeInfos := vs.store.VolumeInfos()
+ var normalVolumeInfos, remoteVolumeInfos []*storage.VolumeInfo
+ for _, vinfo := range volumeInfos {
+ if vinfo.IsRemote() {
+ remoteVolumeInfos = append(remoteVolumeInfos, vinfo)
+ } else {
+ normalVolumeInfos = append(normalVolumeInfos, vinfo)
+ }
+ }
args := struct {
- Version string
- Masters []string
- Volumes interface{}
- EcVolumes interface{}
- DiskStatuses interface{}
- Stats interface{}
- Counters *stats.ServerStats
+ Version string
+ Masters []string
+ Volumes interface{}
+ EcVolumes interface{}
+ RemoteVolumes interface{}
+ DiskStatuses interface{}
+ Stats interface{}
+ Counters *stats.ServerStats
}{
util.VERSION,
vs.SeedMasterNodes,
- vs.store.Status(),
+ normalVolumeInfos,
vs.store.EcVolumes(),
+ remoteVolumeInfos,
ds,
infos,
serverStats,
diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go
index db8fcb555..101be4c43 100644
--- a/weed/server/volume_server_handlers_write.go
+++ b/weed/server/volume_server_handlers_write.go
@@ -1,7 +1,6 @@
package weed_server
import (
- "context"
"errors"
"fmt"
"net/http"
@@ -43,7 +42,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
return
}
- needle, originalSize, ne := needle.CreateNeedleFromRequest(r, vs.FixJpgOrientation)
+ needle, originalSize, ne := needle.CreateNeedleFromRequest(r, vs.FixJpgOrientation, vs.fileSizeLimitBytes)
if ne != nil {
writeJsonError(w, r, http.StatusBadRequest, ne)
return
@@ -51,10 +50,15 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
ret := operation.UploadResult{}
_, isUnchanged, writeError := topology.ReplicatedWrite(vs.GetMaster(), vs.store, volumeId, needle, r)
- httpStatus := http.StatusCreated
- if isUnchanged {
- httpStatus = http.StatusNotModified
+
+ // http 204 status code does not allow body
+ if writeError == nil && isUnchanged {
+ setEtag(w, needle.Etag())
+ w.WriteHeader(http.StatusNoContent)
+ return
}
+
+ httpStatus := http.StatusCreated
if writeError != nil {
httpStatus = http.StatusInternalServerError
ret.Error = writeError.Error()
@@ -64,6 +68,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
}
ret.Size = uint32(originalSize)
ret.ETag = needle.Etag()
+ ret.Mime = string(needle.Mime)
setEtag(w, ret.ETag)
writeJsonQuiet(w, r, httpStatus, ret)
}
@@ -93,7 +98,7 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
ecVolume, hasEcVolume := vs.store.FindEcVolume(volumeId)
if hasEcVolume {
- count, err := vs.store.DeleteEcShardNeedle(context.Background(), ecVolume, n, cookie)
+ count, err := vs.store.DeleteEcShardNeedle(ecVolume, n, cookie)
writeDeleteResult(err, count, w, r)
return
}
diff --git a/weed/server/volume_server_ui/templates.go b/weed/server/volume_server_ui/templates.go
index eafc0aaeb..1c1394369 100644
--- a/weed/server/volume_server_ui/templates.go
+++ b/weed/server/volume_server_ui/templates.go
@@ -1,11 +1,29 @@
package master_ui
import (
+ "fmt"
"html/template"
"strconv"
"strings"
)
+func bytesToHumanReadble(b uint64) string {
+ const unit = 1024
+ if b < unit {
+ return fmt.Sprintf("%d B", b)
+ }
+ div, exp := uint64(unit), 0
+ for n := b / unit; n >= unit; n /= unit {
+ div *= unit
+ exp++
+ }
+ return fmt.Sprintf("%.2f %ciB", float64(b)/float64(div), "KMGTPE"[exp])
+}
+
+func percentFrom(total uint64, part_of uint64) string {
+ return fmt.Sprintf("%.2f", (float64(part_of)/float64(total))*100)
+}
+
func join(data []int64) string {
var ret []string
for _, d := range data {
@@ -15,7 +33,9 @@ func join(data []int64) string {
}
var funcMap = template.FuncMap{
- "join": join,
+ "join": join,
+ "bytesToHumanReadble": bytesToHumanReadble,
+ "percentFrom": percentFrom,
}
var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOCTYPE html>
@@ -57,13 +77,25 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC
<div class="row">
<div class="col-sm-6">
<h2>Disk Stats</h2>
- <table class="table table-condensed table-striped">
+ <table class="table table-striped">
+ <thead>
+ <tr>
+ <th>Path</th>
+ <th>Total</th>
+ <th>Free</th>
+ <th>Usage</th>
+ </tr>
+ </thead>
+ <tbody>
{{ range .DiskStatuses }}
<tr>
- <th>{{ .Dir }}</th>
- <td>{{ .Free }} Bytes Free</td>
+ <td>{{ .Dir }}</td>
+ <td>{{ bytesToHumanReadble .All }}</td>
+ <td>{{ bytesToHumanReadble .Free }}</td>
+ <td>{{ percentFrom .All .Used}}%</td>
</tr>
{{ end }}
+ </tbody>
</table>
</div>
@@ -107,10 +139,11 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC
<tr>
<th>Id</th>
<th>Collection</th>
- <th>Size</th>
+ <th>Data Size</th>
<th>Files</th>
<th>Trash</th>
<th>TTL</th>
+ <th>ReadOnly</th>
</tr>
</thead>
<tbody>
@@ -122,6 +155,37 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC
<td>{{ .FileCount }}</td>
<td>{{ .DeleteCount }} / {{.DeletedByteCount}} Bytes</td>
<td>{{ .Ttl }}</td>
+ <td>{{ .ReadOnly }}</td>
+ </tr>
+ {{ end }}
+ </tbody>
+ </table>
+ </div>
+
+ <div class="row">
+ <h2>Remote Volumes</h2>
+ <table class="table table-striped">
+ <thead>
+ <tr>
+ <th>Id</th>
+ <th>Collection</th>
+ <th>Size</th>
+ <th>Files</th>
+ <th>Trash</th>
+ <th>Remote</th>
+ <th>Key</th>
+ </tr>
+ </thead>
+ <tbody>
+ {{ range .RemoteVolumes }}
+ <tr>
+ <td><code>{{ .Id }}</code></td>
+ <td>{{ .Collection }}</td>
+ <td>{{ .Size }} Bytes</td>
+ <td>{{ .FileCount }}</td>
+ <td>{{ .DeleteCount }} / {{.DeletedByteCount}} Bytes</td>
+ <td>{{ .RemoteStorageName }}</td>
+ <td>{{ .RemoteStorageKey }}</td>
</tr>
{{ end }}
</tbody>
diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go
index 151b48a78..1fb0912c5 100644
--- a/weed/server/webdav_server.go
+++ b/weed/server/webdav_server.go
@@ -1,7 +1,6 @@
package weed_server
import (
- "bytes"
"context"
"fmt"
"io"
@@ -10,16 +9,17 @@ import (
"strings"
"time"
+ "golang.org/x/net/webdav"
+ "google.golang.org/grpc"
+
"github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
- "golang.org/x/net/webdav"
- "google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/security"
- "github.com/spf13/viper"
)
type WebDavOption struct {
@@ -31,6 +31,7 @@ type WebDavOption struct {
Collection string
Uid uint32
Gid uint32
+ Cipher bool
}
type WebDavServer struct {
@@ -47,7 +48,7 @@ func NewWebDavServer(option *WebDavOption) (ws *WebDavServer, err error) {
ws = &WebDavServer{
option: option,
- grpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "filer"),
+ grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
Handler: &webdav.Handler{
FileSystem: fs,
LockSystem: webdav.NewMemLS(),
@@ -96,14 +97,17 @@ func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) {
}, nil
}
-func (fs *WebDavFileSystem) WithFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error {
+func (fs *WebDavFileSystem) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
- return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error {
+ return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}, fs.option.FilerGrpcAddress, fs.option.GrpcDialOption)
}
+func (fs *WebDavFileSystem) AdjustedUrl(hostAndPort string) string {
+ return hostAndPort
+}
func clearName(name string) (string, error) {
slashed := strings.HasSuffix(name, "/")
@@ -135,7 +139,7 @@ func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm
return os.ErrExist
}
- return fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
+ return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
dir, name := filer2.FullPath(fullDirPath).DirAndName()
request := &filer_pb.CreateEntryRequest{
Directory: dir,
@@ -153,7 +157,7 @@ func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm
}
glog.V(1).Infof("mkdir: %v", request)
- if _, err := client.CreateEntry(ctx, request); err != nil {
+ if err := filer_pb.CreateEntry(client, request); err != nil {
return fmt.Errorf("mkdir %s/%s: %v", dir, name, err)
}
@@ -163,7 +167,7 @@ func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm
func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, flag int, perm os.FileMode) (webdav.File, error) {
- glog.V(2).Infof("WebDavFileSystem.OpenFile %v", fullFilePath)
+ glog.V(2).Infof("WebDavFileSystem.OpenFile %v %x", fullFilePath, flag)
var err error
if fullFilePath, err = clearName(fullFilePath); err != nil {
@@ -175,12 +179,6 @@ func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, f
if strings.HasSuffix(fullFilePath, "/") {
return nil, os.ErrInvalid
}
- // based directory should be exists.
- dir, _ := path.Split(fullFilePath)
- _, err := fs.stat(ctx, dir)
- if err != nil {
- return nil, os.ErrInvalid
- }
_, err = fs.stat(ctx, fullFilePath)
if err == nil {
if flag&os.O_EXCL != 0 {
@@ -190,8 +188,8 @@ func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, f
}
dir, name := filer2.FullPath(fullFilePath).DirAndName()
- err = fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
- if _, err := client.CreateEntry(ctx, &filer_pb.CreateEntryRequest{
+ err = fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ if err := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
Directory: dir,
Entry: &filer_pb.Entry{
Name: name,
@@ -255,7 +253,7 @@ func (fs *WebDavFileSystem) removeAll(ctx context.Context, fullFilePath string)
//_, err = fs.db.Exec(`delete from filesystem where fullFilePath = ?`, fullFilePath)
}
dir, name := filer2.FullPath(fullFilePath).DirAndName()
- err = fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
+ err = fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.DeleteEntryRequest{
Directory: dir,
@@ -314,7 +312,7 @@ func (fs *WebDavFileSystem) Rename(ctx context.Context, oldName, newName string)
oldDir, oldBaseName := filer2.FullPath(oldName).DirAndName()
newDir, newBaseName := filer2.FullPath(newName).DirAndName()
- return fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
+ return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.AtomicRenameEntryRequest{
OldDirectory: oldDir,
@@ -339,8 +337,10 @@ func (fs *WebDavFileSystem) stat(ctx context.Context, fullFilePath string) (os.F
return nil, err
}
+ fullpath := filer2.FullPath(fullFilePath)
+
var fi FileInfo
- entry, err := filer2.GetEntry(ctx, fs, fullFilePath)
+ entry, err := filer2.GetEntry(fs, fullpath)
if entry == nil {
return nil, os.ErrNotExist
}
@@ -348,14 +348,12 @@ func (fs *WebDavFileSystem) stat(ctx context.Context, fullFilePath string) (os.F
return nil, err
}
fi.size = int64(filer2.TotalSize(entry.GetChunks()))
- fi.name = fullFilePath
+ fi.name = string(fullpath)
fi.mode = os.FileMode(entry.Attributes.FileMode)
fi.modifiledTime = time.Unix(entry.Attributes.Mtime, 0)
fi.isDirectory = entry.IsDirectory
- _, fi.name = path.Split(path.Clean(fi.name))
- if fi.name == "" {
- fi.name = "/"
+ if fi.name == "/" {
fi.modifiledTime = time.Now()
fi.isDirectory = true
}
@@ -373,10 +371,12 @@ func (f *WebDavFile) Write(buf []byte) (int, error) {
glog.V(2).Infof("WebDavFileSystem.Write %v", f.name)
+ dir, _ := filer2.FullPath(f.name).DirAndName()
+
var err error
ctx := context.Background()
if f.entry == nil {
- f.entry, err = filer2.GetEntry(ctx, f.fs, f.name)
+ f.entry, err = filer2.GetEntry(f.fs, filer2.FullPath(f.name))
}
if f.entry == nil {
@@ -388,13 +388,15 @@ func (f *WebDavFile) Write(buf []byte) (int, error) {
var fileId, host string
var auth security.EncodedJwt
+ var collection, replication string
- if err = f.fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
+ if err = f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.AssignVolumeRequest{
Count: 1,
- Replication: "000",
+ Replication: "",
Collection: f.fs.option.Collection,
+ ParentPath: dir,
}
resp, err := client.AssignVolume(ctx, request)
@@ -402,8 +404,12 @@ func (f *WebDavFile) Write(buf []byte) (int, error) {
glog.V(0).Infof("assign volume failure %v: %v", request, err)
return err
}
+ if resp.Error != "" {
+ return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
+ }
fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
+ collection, replication = resp.Collection, resp.Replication
return nil
}); err != nil {
@@ -411,8 +417,7 @@ func (f *WebDavFile) Write(buf []byte) (int, error) {
}
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
- bufReader := bytes.NewReader(buf)
- uploadResult, err := operation.Upload(fileUrl, f.name, bufReader, false, "application/octet-stream", nil, auth)
+ uploadResult, err := operation.UploadData(fileUrl, f.name, f.fs.option.Cipher, buf, false, "", nil, auth)
if err != nil {
glog.V(0).Infof("upload data %v to %s: %v", f.name, fileUrl, err)
return 0, fmt.Errorf("upload data: %v", err)
@@ -423,18 +428,21 @@ func (f *WebDavFile) Write(buf []byte) (int, error) {
}
chunk := &filer_pb.FileChunk{
- FileId: fileId,
- Offset: f.off,
- Size: uint64(len(buf)),
- Mtime: time.Now().UnixNano(),
- ETag: uploadResult.ETag,
+ FileId: fileId,
+ Offset: f.off,
+ Size: uint64(len(buf)),
+ Mtime: time.Now().UnixNano(),
+ ETag: uploadResult.ETag,
+ CipherKey: uploadResult.CipherKey,
+ IsGzipped: uploadResult.Gzip > 0,
}
f.entry.Chunks = append(f.entry.Chunks, chunk)
- dir, _ := filer2.FullPath(f.name).DirAndName()
- err = f.fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
+ err = f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
f.entry.Attributes.Mtime = time.Now().Unix()
+ f.entry.Attributes.Collection = collection
+ f.entry.Attributes.Replication = replication
request := &filer_pb.UpdateEntryRequest{
Directory: dir,
@@ -448,9 +456,11 @@ func (f *WebDavFile) Write(buf []byte) (int, error) {
return nil
})
- if err != nil {
+ if err == nil {
+ glog.V(3).Infof("WebDavFileSystem.Write %v: written [%d,%d)", f.name, f.off, f.off+int64(len(buf)))
f.off += int64(len(buf))
}
+
return len(buf), err
}
@@ -469,10 +479,9 @@ func (f *WebDavFile) Close() error {
func (f *WebDavFile) Read(p []byte) (readSize int, err error) {
glog.V(2).Infof("WebDavFileSystem.Read %v", f.name)
- ctx := context.Background()
if f.entry == nil {
- f.entry, err = filer2.GetEntry(ctx, f.fs, f.name)
+ f.entry, err = filer2.GetEntry(f.fs, filer2.FullPath(f.name))
}
if f.entry == nil {
return 0, err
@@ -488,30 +497,29 @@ func (f *WebDavFile) Read(p []byte) (readSize int, err error) {
}
chunkViews := filer2.ViewFromVisibleIntervals(f.entryViewCache, f.off, len(p))
- totalRead, err := filer2.ReadIntoBuffer(ctx, f.fs, f.name, p, chunkViews, f.off)
+ totalRead, err := filer2.ReadIntoBuffer(f.fs, filer2.FullPath(f.name), p, chunkViews, f.off)
if err != nil {
return 0, err
}
readSize = int(totalRead)
+ glog.V(3).Infof("WebDavFileSystem.Read %v: [%d,%d)", f.name, f.off, f.off+totalRead)
+
f.off += totalRead
if readSize == 0 {
return 0, io.EOF
}
+
return
}
func (f *WebDavFile) Readdir(count int) (ret []os.FileInfo, err error) {
glog.V(2).Infof("WebDavFileSystem.Readdir %v count %d", f.name, count)
- ctx := context.Background()
- dir := f.name
- if dir != "/" && strings.HasSuffix(dir, "/") {
- dir = dir[:len(dir)-1]
- }
+ dir, _ := filer2.FullPath(f.name).DirAndName()
- err = filer2.ReadDirAllEntries(ctx, f.fs, dir, func(entry *filer_pb.Entry) {
+ err = filer2.ReadDirAllEntries(f.fs, filer2.FullPath(dir), "", func(entry *filer_pb.Entry, isLast bool) {
fi := FileInfo{
size: int64(filer2.TotalSize(entry.GetChunks())),
name: entry.Name,