aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/common.go14
-rw-r--r--weed/server/common_test.go31
-rw-r--r--weed/server/filer_grpc_server.go52
-rw-r--r--weed/server/filer_grpc_server_rename.go130
-rw-r--r--weed/server/filer_server.go95
-rw-r--r--weed/server/filer_server_handlers.go19
-rw-r--r--weed/server/filer_server_handlers_read.go58
-rw-r--r--weed/server/filer_server_handlers_read_dir.go7
-rw-r--r--weed/server/filer_server_handlers_write.go200
-rw-r--r--weed/server/filer_server_handlers_write_autochunk.go42
-rw-r--r--weed/server/filer_ui/breadcrumb.go2
-rw-r--r--weed/server/filer_ui/templates.go2
-rw-r--r--weed/server/master_grpc_server.go80
-rw-r--r--weed/server/master_grpc_server_collection.go94
-rw-r--r--weed/server/master_grpc_server_volume.go72
-rw-r--r--weed/server/master_server.go181
-rw-r--r--weed/server/master_server_handlers.go45
-rw-r--r--weed/server/master_server_handlers_admin.go33
-rw-r--r--weed/server/master_ui/templates.go4
-rw-r--r--weed/server/raft_server.go55
-rw-r--r--weed/server/raft_server_handlers.go11
-rw-r--r--weed/server/volume_grpc_admin.go30
-rw-r--r--weed/server/volume_grpc_batch_delete.go6
-rw-r--r--weed/server/volume_grpc_client_to_master.go110
-rw-r--r--weed/server/volume_grpc_copy.go263
-rw-r--r--weed/server/volume_grpc_copy_incremental.go66
-rw-r--r--weed/server/volume_grpc_erasure_coding.go313
-rw-r--r--weed/server/volume_grpc_sync.go101
-rw-r--r--weed/server/volume_grpc_tail.go117
-rw-r--r--weed/server/volume_grpc_vacuum.go24
-rw-r--r--weed/server/volume_server.go86
-rw-r--r--weed/server/volume_server_handlers.go47
-rw-r--r--weed/server/volume_server_handlers_read.go41
-rw-r--r--weed/server/volume_server_handlers_ui.go4
-rw-r--r--weed/server/volume_server_handlers_write.go66
-rw-r--r--weed/server/volume_server_ui/templates.go26
-rw-r--r--weed/server/webdav_server.go578
37 files changed, 2601 insertions, 504 deletions
diff --git a/weed/server/common.go b/weed/server/common.go
index d88abfdc8..e02ab38a6 100644
--- a/weed/server/common.go
+++ b/weed/server/common.go
@@ -11,11 +11,12 @@ import (
"strings"
"time"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "google.golang.org/grpc"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
- "github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/stats"
- "github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/util"
_ "github.com/chrislusf/seaweedfs/weed/statik"
@@ -82,8 +83,7 @@ func debug(params ...interface{}) {
glog.V(4).Infoln(params...)
}
-func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl string) {
- jwt := security.GetJwt(r)
+func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl string, grpcDialOption grpc.DialOption) {
m := make(map[string]interface{})
if r.Method != "POST" {
writeJsonError(w, r, http.StatusMethodNotAllowed, errors.New("Only submit via POST!"))
@@ -91,7 +91,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st
}
debug("parsing upload file...")
- fname, data, mimeType, pairMap, isGzipped, originalDataSize, lastModified, _, _, pe := storage.ParseUpload(r)
+ fname, data, mimeType, pairMap, isGzipped, originalDataSize, lastModified, _, _, pe := needle.ParseUpload(r)
if pe != nil {
writeJsonError(w, r, http.StatusBadRequest, pe)
return
@@ -113,7 +113,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st
Collection: r.FormValue("collection"),
Ttl: r.FormValue("ttl"),
}
- assignResult, ae := operation.Assign(masterUrl, ar)
+ assignResult, ae := operation.Assign(masterUrl, grpcDialOption, ar)
if ae != nil {
writeJsonError(w, r, http.StatusInternalServerError, ae)
return
@@ -125,7 +125,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st
}
debug("upload file to store", url)
- uploadResult, err := operation.Upload(url, fname, bytes.NewReader(data), isGzipped, mimeType, pairMap, jwt)
+ uploadResult, err := operation.Upload(url, fname, bytes.NewReader(data), isGzipped, mimeType, pairMap, assignResult.Auth)
if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err)
return
diff --git a/weed/server/common_test.go b/weed/server/common_test.go
new file mode 100644
index 000000000..2e6c70bfe
--- /dev/null
+++ b/weed/server/common_test.go
@@ -0,0 +1,31 @@
+package weed_server
+
+import (
+ "strings"
+ "testing"
+)
+
+func TestParseURL(t *testing.T) {
+ if vid, fid, _, _, _ := parseURLPath("/1,06dfa8a684"); true {
+ if vid != "1" {
+ t.Errorf("fail to parse vid: %s", vid)
+ }
+ if fid != "06dfa8a684" {
+ t.Errorf("fail to parse fid: %s", fid)
+ }
+ }
+ if vid, fid, _, _, _ := parseURLPath("/1,06dfa8a684_1"); true {
+ if vid != "1" {
+ t.Errorf("fail to parse vid: %s", vid)
+ }
+ if fid != "06dfa8a684_1" {
+ t.Errorf("fail to parse fid: %s", fid)
+ }
+ if sepIndex := strings.LastIndex(fid, "_"); sepIndex > 0 {
+ fid = fid[:sepIndex]
+ }
+ if fid != "06dfa8a684" {
+ t.Errorf("fail to parse fid: %s", fid)
+ }
+ }
+}
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go
index 06589e3c6..8eea2441e 100644
--- a/weed/server/filer_grpc_server.go
+++ b/weed/server/filer_grpc_server.go
@@ -14,12 +14,11 @@ import (
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
)
func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.LookupDirectoryEntryRequest) (*filer_pb.LookupDirectoryEntryResponse, error) {
- entry, err := fs.filer.FindEntry(filer2.FullPath(filepath.Join(req.Directory, req.Name)))
+ entry, err := fs.filer.FindEntry(ctx, filer2.FullPath(filepath.ToSlash(filepath.Join(req.Directory, req.Name))))
if err != nil {
return nil, fmt.Errorf("%s not found under %s: %v", req.Name, req.Directory, err)
}
@@ -45,7 +44,7 @@ func (fs *FilerServer) ListEntries(ctx context.Context, req *filer_pb.ListEntrie
lastFileName := req.StartFromFileName
includeLastFile := req.InclusiveStartFrom
for limit > 0 {
- entries, err := fs.filer.ListDirectoryEntries(filer2.FullPath(req.Directory), lastFileName, includeLastFile, 1024)
+ entries, err := fs.filer.ListDirectoryEntries(ctx, filer2.FullPath(req.Directory), lastFileName, includeLastFile, 1024)
if err != nil {
return nil, err
}
@@ -112,22 +111,21 @@ func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVol
func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntryRequest) (resp *filer_pb.CreateEntryResponse, err error) {
- fullpath := filer2.FullPath(filepath.Join(req.Directory, req.Entry.Name))
+ fullpath := filer2.FullPath(filepath.ToSlash(filepath.Join(req.Directory, req.Entry.Name)))
chunks, garbages := filer2.CompactFileChunks(req.Entry.Chunks)
- fs.filer.DeleteChunks(garbages)
-
if req.Entry.Attributes == nil {
return nil, fmt.Errorf("can not create entry with empty attributes")
}
- err = fs.filer.CreateEntry(&filer2.Entry{
+ err = fs.filer.CreateEntry(ctx, &filer2.Entry{
FullPath: fullpath,
Attr: filer2.PbToEntryAttribute(req.Entry.Attributes),
Chunks: chunks,
})
if err == nil {
+ fs.filer.DeleteChunks(fullpath, garbages)
}
return &filer_pb.CreateEntryResponse{}, err
@@ -135,19 +133,19 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr
func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntryRequest) (*filer_pb.UpdateEntryResponse, error) {
- fullpath := filepath.Join(req.Directory, req.Entry.Name)
- entry, err := fs.filer.FindEntry(filer2.FullPath(fullpath))
+ fullpath := filepath.ToSlash(filepath.Join(req.Directory, req.Entry.Name))
+ entry, err := fs.filer.FindEntry(ctx, filer2.FullPath(fullpath))
if err != nil {
return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("not found %s: %v", fullpath, err)
}
// remove old chunks if not included in the new ones
- unusedChunks := filer2.FindUnusedFileChunks(entry.Chunks, req.Entry.Chunks)
+ unusedChunks := filer2.MinusChunks(entry.Chunks, req.Entry.Chunks)
chunks, garbages := filer2.CompactFileChunks(req.Entry.Chunks)
newEntry := &filer2.Entry{
- FullPath: filer2.FullPath(filepath.Join(req.Directory, req.Entry.Name)),
+ FullPath: filer2.FullPath(filepath.ToSlash(filepath.Join(req.Directory, req.Entry.Name))),
Attr: entry.Attr,
Chunks: chunks,
}
@@ -175,9 +173,9 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr
return &filer_pb.UpdateEntryResponse{}, err
}
- if err = fs.filer.UpdateEntry(entry, newEntry); err == nil {
- fs.filer.DeleteChunks(unusedChunks)
- fs.filer.DeleteChunks(garbages)
+ if err = fs.filer.UpdateEntry(ctx, entry, newEntry); err == nil {
+ fs.filer.DeleteChunks(entry.FullPath, unusedChunks)
+ fs.filer.DeleteChunks(entry.FullPath, garbages)
}
fs.filer.NotifyUpdateEvent(entry, newEntry, true)
@@ -186,7 +184,7 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr
}
func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntryRequest) (resp *filer_pb.DeleteEntryResponse, err error) {
- err = fs.filer.DeleteEntryMetaAndData(filer2.FullPath(filepath.Join(req.Directory, req.Name)), req.IsRecursive, req.IsDeleteData)
+ err = fs.filer.DeleteEntryMetaAndData(ctx, filer2.FullPath(filepath.ToSlash(filepath.Join(req.Directory, req.Name))), req.IsRecursive, req.IsDeleteData)
return &filer_pb.DeleteEntryResponse{}, err
}
@@ -220,7 +218,7 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol
DataCenter: "",
}
}
- assignResult, err := operation.Assign(fs.filer.GetMaster(), assignRequest, altRequest)
+ assignResult, err := operation.Assign(fs.filer.GetMaster(), fs.grpcDialOption, assignRequest, altRequest)
if err != nil {
return nil, fmt.Errorf("assign volume: %v", err)
}
@@ -233,14 +231,18 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol
Count: int32(assignResult.Count),
Url: assignResult.Url,
PublicUrl: assignResult.PublicUrl,
+ Auth: string(assignResult.Auth),
}, err
}
func (fs *FilerServer) DeleteCollection(ctx context.Context, req *filer_pb.DeleteCollectionRequest) (resp *filer_pb.DeleteCollectionResponse, err error) {
- for _, master := range fs.option.Masters {
- _, err = util.Get(fmt.Sprintf("http://%s/col/delete?collection=%s", master, req.Collection))
- }
+ err = fs.filer.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
+ _, err := client.CollectionDelete(ctx, &master_pb.CollectionDeleteRequest{
+ Name: req.GetCollection(),
+ })
+ return err
+ })
return &filer_pb.DeleteCollectionResponse{}, err
}
@@ -253,7 +255,7 @@ func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsR
Ttl: req.Ttl,
}
- output, err := operation.Statistics(fs.filer.GetMaster(), input)
+ output, err := operation.Statistics(fs.filer.GetMaster(), fs.grpcDialOption, input)
if err != nil {
return nil, err
}
@@ -264,3 +266,13 @@ func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsR
FileCount: output.FileCount,
}, nil
}
+
+func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb.GetFilerConfigurationRequest) (resp *filer_pb.GetFilerConfigurationResponse, err error) {
+
+ return &filer_pb.GetFilerConfigurationResponse{
+ Masters: fs.option.Masters,
+ Collection: fs.option.Collection,
+ Replication: fs.option.DefaultReplication,
+ MaxMb: uint32(fs.option.MaxMB),
+ }, nil
+}
diff --git a/weed/server/filer_grpc_server_rename.go b/weed/server/filer_grpc_server_rename.go
new file mode 100644
index 000000000..7142f7606
--- /dev/null
+++ b/weed/server/filer_grpc_server_rename.go
@@ -0,0 +1,130 @@
+package weed_server
+
+import (
+ "context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "path/filepath"
+)
+
+func (fs *FilerServer) AtomicRenameEntry(ctx context.Context, req *filer_pb.AtomicRenameEntryRequest) (*filer_pb.AtomicRenameEntryResponse, error) {
+
+ glog.V(1).Infof("AtomicRenameEntry %v", req)
+
+ ctx, err := fs.filer.BeginTransaction(ctx)
+ if err != nil {
+ return nil, err
+ }
+
+ oldParent := filer2.FullPath(filepath.ToSlash(req.OldDirectory))
+
+ oldEntry, err := fs.filer.FindEntry(ctx, oldParent.Child(req.OldName))
+ if err != nil {
+ fs.filer.RollbackTransaction(ctx)
+ return nil, fmt.Errorf("%s/%s not found: %v", req.OldDirectory, req.OldName, err)
+ }
+
+ var events MoveEvents
+ moveErr := fs.moveEntry(ctx, oldParent, oldEntry, filer2.FullPath(filepath.ToSlash(req.NewDirectory)), req.NewName, &events)
+ if moveErr != nil {
+ fs.filer.RollbackTransaction(ctx)
+ return nil, fmt.Errorf("%s/%s move error: %v", req.OldDirectory, req.OldName, err)
+ } else {
+ if commitError := fs.filer.CommitTransaction(ctx); commitError != nil {
+ fs.filer.RollbackTransaction(ctx)
+ return nil, fmt.Errorf("%s/%s move commit error: %v", req.OldDirectory, req.OldName, err)
+ }
+ }
+
+ for _, entry := range events.newEntries {
+ fs.filer.NotifyUpdateEvent(nil, entry, false)
+ }
+ for _, entry := range events.oldEntries {
+ fs.filer.NotifyUpdateEvent(entry, nil, false)
+ }
+
+ return &filer_pb.AtomicRenameEntryResponse{}, nil
+}
+
+func (fs *FilerServer) moveEntry(ctx context.Context, oldParent filer2.FullPath, entry *filer2.Entry, newParent filer2.FullPath, newName string, events *MoveEvents) error {
+ if entry.IsDirectory() {
+ if err := fs.moveFolderSubEntries(ctx, oldParent, entry, newParent, newName, events); err != nil {
+ return err
+ }
+ }
+ return fs.moveSelfEntry(ctx, oldParent, entry, newParent, newName, events)
+}
+
+func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent filer2.FullPath, entry *filer2.Entry, newParent filer2.FullPath, newName string, events *MoveEvents) error {
+
+ currentDirPath := oldParent.Child(entry.Name())
+ newDirPath := newParent.Child(newName)
+
+ glog.V(1).Infof("moving folder %s => %s", currentDirPath, newDirPath)
+
+ lastFileName := ""
+ includeLastFile := false
+ for {
+
+ entries, err := fs.filer.ListDirectoryEntries(ctx, currentDirPath, lastFileName, includeLastFile, 1024)
+ if err != nil {
+ return err
+ }
+
+ println("found", len(entries), "entries under", currentDirPath)
+
+ for _, item := range entries {
+ lastFileName = item.Name()
+ println("processing", lastFileName)
+ err := fs.moveEntry(ctx, currentDirPath, item, newDirPath, item.Name(), events)
+ if err != nil {
+ return err
+ }
+ }
+ if len(entries) < 1024 {
+ break
+ }
+ }
+ return nil
+}
+
+func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent filer2.FullPath, entry *filer2.Entry, newParent filer2.FullPath, newName string, events *MoveEvents) error {
+
+ oldPath, newPath := oldParent.Child(entry.Name()), newParent.Child(newName)
+
+ glog.V(1).Infof("moving entry %s => %s", oldPath, newPath)
+
+ if oldPath == newPath {
+ glog.V(1).Infof("skip moving entry %s => %s", oldPath, newPath)
+ return nil
+ }
+
+ // add to new directory
+ newEntry := &filer2.Entry{
+ FullPath: newPath,
+ Attr: entry.Attr,
+ Chunks: entry.Chunks,
+ }
+ createErr := fs.filer.CreateEntry(ctx, newEntry)
+ if createErr != nil {
+ return createErr
+ }
+
+ // delete old entry
+ deleteErr := fs.filer.DeleteEntryMetaAndData(ctx, oldPath, false, false)
+ if deleteErr != nil {
+ return deleteErr
+ }
+
+ events.oldEntries = append(events.oldEntries, entry)
+ events.newEntries = append(events.newEntries, newEntry)
+ return nil
+
+}
+
+type MoveEvents struct {
+ oldEntries []*filer2.Entry
+ newEntries []*filer2.Entry
+}
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index 9d70e4dac..b9e6aa23d 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -1,12 +1,22 @@
package weed_server
import (
+ "context"
+ "fmt"
"net/http"
"os"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/stats"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/filer2"
_ "github.com/chrislusf/seaweedfs/weed/filer2/cassandra"
_ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb"
+ _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb2"
_ "github.com/chrislusf/seaweedfs/weed/filer2/memdb"
_ "github.com/chrislusf/seaweedfs/weed/filer2/mysql"
_ "github.com/chrislusf/seaweedfs/weed/filer2/postgres"
@@ -14,6 +24,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/notification"
_ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs"
+ _ "github.com/chrislusf/seaweedfs/weed/notification/gocdk_pub_sub"
_ "github.com/chrislusf/seaweedfs/weed/notification/google_pub_sub"
_ "github.com/chrislusf/seaweedfs/weed/notification/kafka"
_ "github.com/chrislusf/seaweedfs/weed/notification/log"
@@ -28,85 +39,93 @@ type FilerOption struct {
RedirectOnRead bool
DisableDirListing bool
MaxMB int
- SecretKey string
DirListingLimit int
DataCenter string
DefaultLevelDbDir string
+ DisableHttp bool
+ Port int
}
type FilerServer struct {
- option *FilerOption
- secret security.Secret
- filer *filer2.Filer
+ option *FilerOption
+ secret security.SigningKey
+ filer *filer2.Filer
+ grpcDialOption grpc.DialOption
}
func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) {
fs = &FilerServer{
- option: option,
+ option: option,
+ grpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "filer"),
}
if len(option.Masters) == 0 {
glog.Fatal("master list is required!")
}
- fs.filer = filer2.NewFiler(option.Masters)
+ fs.filer = filer2.NewFiler(option.Masters, fs.grpcDialOption)
go fs.filer.KeepConnectedToMaster()
v := viper.GetViper()
- if !LoadConfiguration("filer", false) {
- v.Set("leveldb.enabled", true)
- v.Set("leveldb.dir", option.DefaultLevelDbDir)
+ if !util.LoadConfiguration("filer", false) {
+ v.Set("leveldb2.enabled", true)
+ v.Set("leveldb2.dir", option.DefaultLevelDbDir)
_, err := os.Stat(option.DefaultLevelDbDir)
if os.IsNotExist(err) {
os.MkdirAll(option.DefaultLevelDbDir, 0755)
}
}
- LoadConfiguration("notification", false)
+ util.LoadConfiguration("notification", false)
fs.filer.LoadConfiguration(v)
notification.LoadConfiguration(v.Sub("notification"))
handleStaticResources(defaultMux)
- defaultMux.HandleFunc("/", fs.filerHandler)
+ if !option.DisableHttp {
+ defaultMux.HandleFunc("/", fs.filerHandler)
+ }
if defaultMux != readonlyMux {
readonlyMux.HandleFunc("/", fs.readonlyFilerHandler)
}
- return fs, nil
-}
+ maybeStartMetrics(fs, option)
-func (fs *FilerServer) jwt(fileId string) security.EncodedJwt {
- return security.GenJwt(fs.secret, fileId)
+ return fs, nil
}
-func LoadConfiguration(configFileName string, required bool) (loaded bool) {
-
- // find a filer store
- viper.SetConfigName(configFileName) // name of config file (without extension)
- viper.AddConfigPath(".") // optionally look for config in the working directory
- viper.AddConfigPath("$HOME/.seaweedfs") // call multiple times to add many search paths
- viper.AddConfigPath("/etc/seaweedfs/") // path to look for the config file in
-
- glog.V(0).Infof("Reading %s.toml from %s", configFileName, viper.ConfigFileUsed())
-
- if err := viper.MergeInConfig(); err != nil { // Handle errors reading the config file
- glog.V(0).Infof("Reading %s: %v", viper.ConfigFileUsed(), err)
- if required {
- glog.Fatalf("Failed to load %s.toml file from current directory, or $HOME/.seaweedfs/, or /etc/seaweedfs/"+
- "\n\nPlease follow this example and add a filer.toml file to "+
- "current directory, or $HOME/.seaweedfs/, or /etc/seaweedfs/:\n"+
- " https://github.com/chrislusf/seaweedfs/blob/master/weed/%s.toml\n"+
- "\nOr use this command to generate the default toml file\n"+
- " weed scaffold -config=%s -output=.\n\n\n",
- configFileName, configFileName, configFileName)
+func maybeStartMetrics(fs *FilerServer, option *FilerOption) {
+ isConnected := false
+ var metricsAddress string
+ var metricsIntervalSec int
+ var readErr error
+ for !isConnected {
+ metricsAddress, metricsIntervalSec, readErr = readFilerConfiguration(fs.grpcDialOption, option.Masters[0])
+ if readErr == nil {
+ isConnected = true
} else {
- return false
+ time.Sleep(7 * time.Second)
}
}
+ if metricsAddress == "" && metricsIntervalSec <= 0 {
+ return
+ }
+ go stats.LoopPushingMetric("filer", stats.SourceName(option.Port), stats.FilerGather,
+ func() (addr string, intervalSeconds int) {
+ return metricsAddress, metricsIntervalSec
+ })
+}
- return true
-
+func readFilerConfiguration(grpcDialOption grpc.DialOption, masterGrpcAddress string) (metricsAddress string, metricsIntervalSec int, err error) {
+ err = operation.WithMasterServerClient(masterGrpcAddress, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
+ resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
+ if err != nil {
+ return fmt.Errorf("get master %s configuration: %v", masterGrpcAddress, err)
+ }
+ metricsAddress, metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds)
+ return nil
+ })
+ return
}
diff --git a/weed/server/filer_server_handlers.go b/weed/server/filer_server_handlers.go
index d76d7df8c..b6bfc3b04 100644
--- a/weed/server/filer_server_handlers.go
+++ b/weed/server/filer_server_handlers.go
@@ -2,28 +2,47 @@ package weed_server
import (
"net/http"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/stats"
)
func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) {
+ start := time.Now()
switch r.Method {
case "GET":
+ stats.FilerRequestCounter.WithLabelValues("get").Inc()
fs.GetOrHeadHandler(w, r, true)
+ stats.FilerRequestHistogram.WithLabelValues("get").Observe(time.Since(start).Seconds())
case "HEAD":
+ stats.FilerRequestCounter.WithLabelValues("head").Inc()
fs.GetOrHeadHandler(w, r, false)
+ stats.FilerRequestHistogram.WithLabelValues("head").Observe(time.Since(start).Seconds())
case "DELETE":
+ stats.FilerRequestCounter.WithLabelValues("delete").Inc()
fs.DeleteHandler(w, r)
+ stats.FilerRequestHistogram.WithLabelValues("delete").Observe(time.Since(start).Seconds())
case "PUT":
+ stats.FilerRequestCounter.WithLabelValues("put").Inc()
fs.PostHandler(w, r)
+ stats.FilerRequestHistogram.WithLabelValues("put").Observe(time.Since(start).Seconds())
case "POST":
+ stats.FilerRequestCounter.WithLabelValues("post").Inc()
fs.PostHandler(w, r)
+ stats.FilerRequestHistogram.WithLabelValues("post").Observe(time.Since(start).Seconds())
}
}
func (fs *FilerServer) readonlyFilerHandler(w http.ResponseWriter, r *http.Request) {
+ start := time.Now()
switch r.Method {
case "GET":
+ stats.FilerRequestCounter.WithLabelValues("get").Inc()
fs.GetOrHeadHandler(w, r, true)
+ stats.FilerRequestHistogram.WithLabelValues("get").Observe(time.Since(start).Seconds())
case "HEAD":
+ stats.FilerRequestCounter.WithLabelValues("head").Inc()
fs.GetOrHeadHandler(w, r, false)
+ stats.FilerRequestHistogram.WithLabelValues("head").Observe(time.Since(start).Seconds())
}
}
diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go
index 226de640c..0edf501a8 100644
--- a/weed/server/filer_server_handlers_read.go
+++ b/weed/server/filer_server_handlers_read.go
@@ -1,7 +1,9 @@
package weed_server
import (
+ "context"
"io"
+ "io/ioutil"
"mime"
"mime/multipart"
"net/http"
@@ -12,22 +14,27 @@ import (
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/util"
)
func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, isGetMethod bool) {
+
path := r.URL.Path
- if strings.HasSuffix(path, "/") && len(path) > 1 {
+ isForDirectory := strings.HasSuffix(path, "/")
+ if isForDirectory && len(path) > 1 {
path = path[:len(path)-1]
}
- entry, err := fs.filer.FindEntry(filer2.FullPath(path))
+ entry, err := fs.filer.FindEntry(context.Background(), filer2.FullPath(path))
if err != nil {
if path == "/" {
fs.listDirectoryHandler(w, r)
return
}
glog.V(1).Infof("Not found %s: %v", path, err)
+
+ stats.FilerRequestCounter.WithLabelValues("read.notfound").Inc()
w.WriteHeader(http.StatusNotFound)
return
}
@@ -41,8 +48,14 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
return
}
+ if isForDirectory {
+ w.WriteHeader(http.StatusNotFound)
+ return
+ }
+
if len(entry.Chunks) == 0 {
glog.V(1).Infof("no file chunks for %s, attr=%+v", path, entry.Attr)
+ stats.FilerRequestCounter.WithLabelValues("read.nocontent").Inc()
w.WriteHeader(http.StatusNoContent)
return
}
@@ -51,6 +64,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
if r.Method == "HEAD" {
w.Header().Set("Content-Length", strconv.FormatInt(int64(filer2.TotalSize(entry.Chunks)), 10))
w.Header().Set("Last-Modified", entry.Attr.Mtime.Format(http.TimeFormat))
+ setEtag(w, filer2.ETag(entry.Chunks))
return
}
@@ -65,7 +79,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
func (fs *FilerServer) handleSingleChunk(w http.ResponseWriter, r *http.Request, entry *filer2.Entry) {
- fileId := entry.Chunks[0].FileId
+ fileId := entry.Chunks[0].GetFileIdString()
urlString, err := fs.filer.MasterClient.LookupFileId(fileId)
if err != nil {
@@ -75,6 +89,7 @@ func (fs *FilerServer) handleSingleChunk(w http.ResponseWriter, r *http.Request,
}
if fs.option.RedirectOnRead {
+ stats.FilerRequestCounter.WithLabelValues("redirect").Inc()
http.Redirect(w, r, urlString, http.StatusFound)
return
}
@@ -105,17 +120,23 @@ func (fs *FilerServer) handleSingleChunk(w http.ResponseWriter, r *http.Request,
writeJsonError(w, r, http.StatusInternalServerError, do_err)
return
}
- defer resp.Body.Close()
+ defer func() {
+ io.Copy(ioutil.Discard, resp.Body)
+ resp.Body.Close()
+ }()
for k, v := range resp.Header {
w.Header()[k] = v
}
+ if entry.Attr.Mime != "" {
+ w.Header().Set("Content-Type", entry.Attr.Mime)
+ }
w.WriteHeader(resp.StatusCode)
io.Copy(w, resp.Body)
}
func (fs *FilerServer) handleMultipleChunks(w http.ResponseWriter, r *http.Request, entry *filer2.Entry) {
- mimeType := entry.Mime
+ mimeType := entry.Attr.Mime
if mimeType == "" {
if ext := path.Ext(entry.Name()); ext != "" {
mimeType = mime.TypeByExtension(ext)
@@ -222,31 +243,6 @@ func (fs *FilerServer) handleMultipleChunks(w http.ResponseWriter, r *http.Reque
func (fs *FilerServer) writeContent(w io.Writer, entry *filer2.Entry, offset int64, size int) error {
- chunkViews := filer2.ViewFromChunks(entry.Chunks, offset, size)
-
- fileId2Url := make(map[string]string)
-
- for _, chunkView := range chunkViews {
-
- urlString, err := fs.filer.MasterClient.LookupFileId(chunkView.FileId)
- if err != nil {
- glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
- return err
- }
- fileId2Url[chunkView.FileId] = urlString
- }
-
- for _, chunkView := range chunkViews {
- urlString := fileId2Url[chunkView.FileId]
- _, err := util.ReadUrlAsStream(urlString, chunkView.Offset, int(chunkView.Size), func(data []byte) {
- w.Write(data)
- })
- if err != nil {
- glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err)
- return err
- }
- }
-
- return nil
+ return filer2.StreamContent(fs.filer.MasterClient, w, entry.Chunks, offset, size)
}
diff --git a/weed/server/filer_server_handlers_read_dir.go b/weed/server/filer_server_handlers_read_dir.go
index bcf7f0eb5..87e864559 100644
--- a/weed/server/filer_server_handlers_read_dir.go
+++ b/weed/server/filer_server_handlers_read_dir.go
@@ -1,6 +1,7 @@
package weed_server
import (
+ "context"
"net/http"
"strconv"
"strings"
@@ -8,6 +9,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
ui "github.com/chrislusf/seaweedfs/weed/server/filer_ui"
+ "github.com/chrislusf/seaweedfs/weed/stats"
)
// listDirectoryHandler lists directories and folers under a directory
@@ -15,6 +17,9 @@ import (
// sub directories are listed on the first page, when "lastFileName"
// is empty.
func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Request) {
+
+ stats.FilerRequestCounter.WithLabelValues("list").Inc()
+
path := r.URL.Path
if strings.HasSuffix(path, "/") && len(path) > 1 {
path = path[:len(path)-1]
@@ -27,7 +32,7 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque
lastFileName := r.FormValue("lastFileName")
- entries, err := fs.filer.ListDirectoryEntries(filer2.FullPath(path), lastFileName, false, limit)
+ entries, err := fs.filer.ListDirectoryEntries(context.Background(), filer2.FullPath(path), lastFileName, false, limit)
if err != nil {
glog.V(0).Infof("listDirectory %s %s %d: %s", path, lastFileName, limit, err)
diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go
index 32f481e74..0bf1218ce 100644
--- a/weed/server/filer_server_handlers_write.go
+++ b/weed/server/filer_server_handlers_write.go
@@ -1,11 +1,17 @@
package weed_server
import (
+ "context"
"encoding/json"
"errors"
+ "fmt"
+ "io"
"io/ioutil"
+ "mime"
"net/http"
"net/url"
+ "os"
+ filenamePath "path"
"strconv"
"strings"
"time"
@@ -14,8 +20,9 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/util"
- "os"
)
var (
@@ -31,7 +38,12 @@ type FilerPostResult struct {
Url string `json:"url,omitempty"`
}
-func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, replication, collection string, dataCenter string) (fileId, urlLocation string, err error) {
+func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, replication, collection string, dataCenter string) (fileId, urlLocation string, auth security.EncodedJwt, err error) {
+
+ stats.FilerRequestCounter.WithLabelValues("assign").Inc()
+ start := time.Now()
+ defer func() { stats.FilerRequestHistogram.WithLabelValues("assign").Observe(time.Since(start).Seconds()) }()
+
ar := &operation.VolumeAssignRequest{
Count: 1,
Replication: replication,
@@ -50,7 +62,7 @@ func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request,
}
}
- assignResult, ae := operation.Assign(fs.filer.GetMaster(), ar, altRequest)
+ assignResult, ae := operation.Assign(fs.filer.GetMaster(), fs.grpcDialOption, ar, altRequest)
if ae != nil {
glog.Errorf("failing to assign a file id: %v", ae)
writeJsonError(w, r, http.StatusInternalServerError, ae)
@@ -59,11 +71,14 @@ func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request,
}
fileId = assignResult.Fid
urlLocation = "http://" + assignResult.Url + "/" + assignResult.Fid
+ auth = assignResult.Auth
return
}
func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
+ ctx := context.Background()
+
query := r.URL.Query()
replication := query.Get("replication")
if replication == "" {
@@ -78,11 +93,11 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
dataCenter = fs.option.DataCenter
}
- if autoChunked := fs.autoChunk(w, r, replication, collection, dataCenter); autoChunked {
+ if autoChunked := fs.autoChunk(ctx, w, r, replication, collection, dataCenter); autoChunked {
return
}
- fileId, urlLocation, err := fs.assignNewFileInfo(w, r, replication, collection, dataCenter)
+ fileId, urlLocation, auth, err := fs.assignNewFileInfo(w, r, replication, collection, dataCenter)
if err != nil || fileId == "" || urlLocation == "" {
glog.V(0).Infof("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter)
@@ -103,70 +118,47 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
}
glog.V(4).Infoln("post to", u)
- // send request to volume server
- request := &http.Request{
- Method: r.Method,
- URL: u,
- Proto: r.Proto,
- ProtoMajor: r.ProtoMajor,
- ProtoMinor: r.ProtoMinor,
- Header: r.Header,
- Body: r.Body,
- Host: r.Host,
- ContentLength: r.ContentLength,
- }
- resp, do_err := util.Do(request)
- if do_err != nil {
- glog.Errorf("failing to connect to volume server %s: %v, %+v", r.RequestURI, do_err, r.Method)
- writeJsonError(w, r, http.StatusInternalServerError, do_err)
- return
- }
- defer resp.Body.Close()
- etag := resp.Header.Get("ETag")
- resp_body, ra_err := ioutil.ReadAll(resp.Body)
- if ra_err != nil {
- glog.V(0).Infoln("failing to upload to volume server", r.RequestURI, ra_err.Error())
- writeJsonError(w, r, http.StatusInternalServerError, ra_err)
+ ret, err := fs.uploadToVolumeServer(r, u, auth, w, fileId)
+ if err != nil {
return
}
- glog.V(4).Infoln("post result", string(resp_body))
- var ret operation.UploadResult
- unmarshal_err := json.Unmarshal(resp_body, &ret)
- if unmarshal_err != nil {
- glog.V(0).Infoln("failing to read upload resonse", r.RequestURI, string(resp_body))
- writeJsonError(w, r, http.StatusInternalServerError, unmarshal_err)
+
+ if err = fs.updateFilerStore(ctx, r, w, replication, collection, ret, fileId); err != nil {
return
}
- if ret.Error != "" {
- glog.V(0).Infoln("failing to post to volume server", r.RequestURI, ret.Error)
- writeJsonError(w, r, http.StatusInternalServerError, errors.New(ret.Error))
- return
+
+ // send back post result
+ reply := FilerPostResult{
+ Name: ret.Name,
+ Size: ret.Size,
+ Error: ret.Error,
+ Fid: fileId,
+ Url: urlLocation,
}
+ setEtag(w, ret.ETag)
+ writeJsonQuiet(w, r, http.StatusCreated, reply)
+}
+
+// update metadata in filer store
+func (fs *FilerServer) updateFilerStore(ctx context.Context, r *http.Request, w http.ResponseWriter,
+ replication string, collection string, ret operation.UploadResult, fileId string) (err error) {
+
+ stats.FilerRequestCounter.WithLabelValues("postStoreWrite").Inc()
+ start := time.Now()
+ defer func() {
+ stats.FilerRequestHistogram.WithLabelValues("postStoreWrite").Observe(time.Since(start).Seconds())
+ }()
- // find correct final path
path := r.URL.Path
if strings.HasSuffix(path, "/") {
if ret.Name != "" {
path += ret.Name
- } else {
- fs.filer.DeleteFileByFileId(fileId)
- glog.V(0).Infoln("Can not to write to folder", path, "without a file name!")
- writeJsonError(w, r, http.StatusInternalServerError,
- errors.New("Can not to write to folder "+path+" without a file name"))
- return
}
}
-
- // update metadata in filer store
- existingEntry, err := fs.filer.FindEntry(filer2.FullPath(path))
+ existingEntry, err := fs.filer.FindEntry(ctx, filer2.FullPath(path))
crTime := time.Now()
if err == nil && existingEntry != nil {
- // glog.V(4).Infof("existing %s => %+v", path, existingEntry)
- if existingEntry.IsDirectory() {
- path += "/" + ret.Name
- } else {
- crTime = existingEntry.Crtime
- }
+ crTime = existingEntry.Crtime
}
entry := &filer2.Entry{
FullPath: filer2.FullPath(path),
@@ -184,27 +176,95 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
FileId: fileId,
Size: uint64(ret.Size),
Mtime: time.Now().UnixNano(),
- ETag: etag,
+ ETag: ret.ETag,
}},
}
+ if ext := filenamePath.Ext(path); ext != "" {
+ entry.Attr.Mime = mime.TypeByExtension(ext)
+ }
// glog.V(4).Infof("saving %s => %+v", path, entry)
- if db_err := fs.filer.CreateEntry(entry); db_err != nil {
- fs.filer.DeleteFileByFileId(fileId)
- glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err)
- writeJsonError(w, r, http.StatusInternalServerError, db_err)
+ if dbErr := fs.filer.CreateEntry(ctx, entry); dbErr != nil {
+ fs.filer.DeleteChunks(entry.FullPath, entry.Chunks)
+ glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr)
+ writeJsonError(w, r, http.StatusInternalServerError, dbErr)
+ err = dbErr
return
}
- // send back post result
- reply := FilerPostResult{
- Name: ret.Name,
- Size: ret.Size,
- Error: ret.Error,
- Fid: fileId,
- Url: urlLocation,
+ return nil
+}
+
+// send request to volume server
+func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth security.EncodedJwt, w http.ResponseWriter, fileId string) (ret operation.UploadResult, err error) {
+
+ stats.FilerRequestCounter.WithLabelValues("postUpload").Inc()
+ start := time.Now()
+ defer func() { stats.FilerRequestHistogram.WithLabelValues("postUpload").Observe(time.Since(start).Seconds()) }()
+
+ request := &http.Request{
+ Method: r.Method,
+ URL: u,
+ Proto: r.Proto,
+ ProtoMajor: r.ProtoMajor,
+ ProtoMinor: r.ProtoMinor,
+ Header: r.Header,
+ Body: r.Body,
+ Host: r.Host,
+ ContentLength: r.ContentLength,
}
- setEtag(w, etag)
- writeJsonQuiet(w, r, http.StatusCreated, reply)
+ if auth != "" {
+ request.Header.Set("Authorization", "BEARER "+string(auth))
+ }
+ resp, doErr := util.Do(request)
+ if doErr != nil {
+ glog.Errorf("failing to connect to volume server %s: %v, %+v", r.RequestURI, doErr, r.Method)
+ writeJsonError(w, r, http.StatusInternalServerError, doErr)
+ err = doErr
+ return
+ }
+ defer func() {
+ io.Copy(ioutil.Discard, resp.Body)
+ resp.Body.Close()
+ }()
+ etag := resp.Header.Get("ETag")
+ respBody, raErr := ioutil.ReadAll(resp.Body)
+ if raErr != nil {
+ glog.V(0).Infoln("failing to upload to volume server", r.RequestURI, raErr.Error())
+ writeJsonError(w, r, http.StatusInternalServerError, raErr)
+ err = raErr
+ return
+ }
+ glog.V(4).Infoln("post result", string(respBody))
+ unmarshalErr := json.Unmarshal(respBody, &ret)
+ if unmarshalErr != nil {
+ glog.V(0).Infoln("failing to read upload resonse", r.RequestURI, string(respBody))
+ writeJsonError(w, r, http.StatusInternalServerError, unmarshalErr)
+ err = unmarshalErr
+ return
+ }
+ if ret.Error != "" {
+ err = errors.New(ret.Error)
+ glog.V(0).Infoln("failing to post to volume server", r.RequestURI, ret.Error)
+ writeJsonError(w, r, http.StatusInternalServerError, err)
+ return
+ }
+ // find correct final path
+ path := r.URL.Path
+ if strings.HasSuffix(path, "/") {
+ if ret.Name != "" {
+ path += ret.Name
+ } else {
+ err = fmt.Errorf("can not to write to folder %s without a file name", path)
+ fs.filer.DeleteFileByFileId(fileId)
+ glog.V(0).Infoln("Can not to write to folder", path, "without a file name!")
+ writeJsonError(w, r, http.StatusInternalServerError, err)
+ return
+ }
+ }
+ if etag != "" {
+ ret.ETag = etag
+ }
+ return
}
// curl -X DELETE http://localhost:8888/path/to
@@ -213,7 +273,7 @@ func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
isRecursive := r.FormValue("recursive") == "true"
- err := fs.filer.DeleteEntryMetaAndData(filer2.FullPath(r.URL.Path), isRecursive, true)
+ err := fs.filer.DeleteEntryMetaAndData(context.Background(), filer2.FullPath(r.URL.Path), isRecursive, true)
if err != nil {
glog.V(1).Infoln("deleting", r.URL.Path, ":", err.Error())
writeJsonError(w, r, http.StatusInternalServerError, err)
diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go
index 4b1745aaa..492b55943 100644
--- a/weed/server/filer_server_handlers_write_autochunk.go
+++ b/weed/server/filer_server_handlers_write_autochunk.go
@@ -2,6 +2,7 @@ package weed_server
import (
"bytes"
+ "context"
"io"
"io/ioutil"
"net/http"
@@ -14,10 +15,13 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/util"
)
-func (fs *FilerServer) autoChunk(w http.ResponseWriter, r *http.Request, replication string, collection string, dataCenter string) bool {
+func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request,
+ replication string, collection string, dataCenter string) bool {
if r.Method != "POST" {
glog.V(4).Infoln("AutoChunking not supported for method", r.Method)
return false
@@ -53,7 +57,7 @@ func (fs *FilerServer) autoChunk(w http.ResponseWriter, r *http.Request, replica
return false
}
- reply, err := fs.doAutoChunk(w, r, contentLength, chunkSize, replication, collection, dataCenter)
+ reply, err := fs.doAutoChunk(ctx, w, r, contentLength, chunkSize, replication, collection, dataCenter)
if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err)
} else if reply != nil {
@@ -62,7 +66,14 @@ func (fs *FilerServer) autoChunk(w http.ResponseWriter, r *http.Request, replica
return true
}
-func (fs *FilerServer) doAutoChunk(w http.ResponseWriter, r *http.Request, contentLength int64, chunkSize int32, replication string, collection string, dataCenter string) (filerResult *FilerPostResult, replyerr error) {
+func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request,
+ contentLength int64, chunkSize int32, replication string, collection string, dataCenter string) (filerResult *FilerPostResult, replyerr error) {
+
+ stats.FilerRequestCounter.WithLabelValues("postAutoChunk").Inc()
+ start := time.Now()
+ defer func() {
+ stats.FilerRequestHistogram.WithLabelValues("postAutoChunk").Observe(time.Since(start).Seconds())
+ }()
multipartReader, multipartReaderErr := r.MultipartReader()
if multipartReaderErr != nil {
@@ -105,14 +116,14 @@ func (fs *FilerServer) doAutoChunk(w http.ResponseWriter, r *http.Request, conte
if chunkBufOffset >= chunkSize || readFully || (chunkBufOffset > 0 && bytesRead == 0) {
writtenChunks = writtenChunks + 1
- fileId, urlLocation, assignErr := fs.assignNewFileInfo(w, r, replication, collection, dataCenter)
+ fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(w, r, replication, collection, dataCenter)
if assignErr != nil {
return nil, assignErr
}
// upload the chunk to the volume server
chunkName := fileName + "_chunk_" + strconv.FormatInt(int64(len(fileChunks)+1), 10)
- uploadErr := fs.doUpload(urlLocation, w, r, chunkBuf[0:chunkBufOffset], chunkName, "application/octet-stream", fileId)
+ uploadErr := fs.doUpload(urlLocation, w, r, chunkBuf[0:chunkBufOffset], chunkName, "application/octet-stream", fileId, auth)
if uploadErr != nil {
return nil, uploadErr
}
@@ -165,21 +176,28 @@ func (fs *FilerServer) doAutoChunk(w http.ResponseWriter, r *http.Request, conte
},
Chunks: fileChunks,
}
- if db_err := fs.filer.CreateEntry(entry); db_err != nil {
- replyerr = db_err
- filerResult.Error = db_err.Error()
- glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err)
+ if dbErr := fs.filer.CreateEntry(ctx, entry); dbErr != nil {
+ fs.filer.DeleteChunks(entry.FullPath, entry.Chunks)
+ replyerr = dbErr
+ filerResult.Error = dbErr.Error()
+ glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr)
return
}
return
}
-func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, chunkBuf []byte, fileName string, contentType string, fileId string) (err error) {
- err = nil
+func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request,
+ chunkBuf []byte, fileName string, contentType string, fileId string, auth security.EncodedJwt) (err error) {
+
+ stats.FilerRequestCounter.WithLabelValues("postAutoChunkUpload").Inc()
+ start := time.Now()
+ defer func() {
+ stats.FilerRequestHistogram.WithLabelValues("postAutoChunkUpload").Observe(time.Since(start).Seconds())
+ }()
ioReader := ioutil.NopCloser(bytes.NewBuffer(chunkBuf))
- uploadResult, uploadError := operation.Upload(urlLocation, fileName, ioReader, false, contentType, nil, fs.jwt(fileId))
+ uploadResult, uploadError := operation.Upload(urlLocation, fileName, ioReader, false, contentType, nil, auth)
if uploadResult != nil {
glog.V(0).Infoln("Chunk upload result. Name:", uploadResult.Name, "Fid:", fileId, "Size:", uploadResult.Size)
}
diff --git a/weed/server/filer_ui/breadcrumb.go b/weed/server/filer_ui/breadcrumb.go
index d056a4b25..55a1909a8 100644
--- a/weed/server/filer_ui/breadcrumb.go
+++ b/weed/server/filer_ui/breadcrumb.go
@@ -16,7 +16,7 @@ func ToBreadcrumb(fullpath string) (crumbs []Breadcrumb) {
for i := 0; i < len(parts); i++ {
crumbs = append(crumbs, Breadcrumb{
Name: parts[i] + "/",
- Link: "/" + filepath.Join(parts[0:i+1]...),
+ Link: "/" + filepath.ToSlash(filepath.Join(parts[0:i+1]...)),
})
}
diff --git a/weed/server/filer_ui/templates.go b/weed/server/filer_ui/templates.go
index e31685ea0..884798936 100644
--- a/weed/server/filer_ui/templates.go
+++ b/weed/server/filer_ui/templates.go
@@ -162,7 +162,7 @@ function uploadFile(file, i) {
var url = window.location.href
var xhr = new XMLHttpRequest()
var formData = new FormData()
- xhr.open('POST', url, true)
+ xhr.open('POST', url, false)
formData.append('file', file)
xhr.send(formData)
diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go
index 93dce59d8..1a17327a0 100644
--- a/weed/server/master_grpc_server.go
+++ b/weed/server/master_grpc_server.go
@@ -9,6 +9,7 @@ import (
"github.com/chrislusf/raft"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/topology"
"google.golang.org/grpc/peer"
)
@@ -30,6 +31,9 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
for _, v := range dn.GetVolumes() {
message.DeletedVids = append(message.DeletedVids, uint32(v.Id))
}
+ for _, s := range dn.GetEcShards() {
+ message.DeletedVids = append(message.DeletedVids, uint32(s.VolumeId))
+ }
if len(message.DeletedVids) > 0 {
ms.clientChansLock.RLock()
@@ -63,39 +67,84 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
rack := dc.GetOrCreateRack(rackName)
dn = rack.GetOrCreateDataNode(heartbeat.Ip,
int(heartbeat.Port), heartbeat.PublicUrl,
- int(heartbeat.MaxVolumeCount))
+ int64(heartbeat.MaxVolumeCount))
glog.V(0).Infof("added volume server %v:%d", heartbeat.GetIp(), heartbeat.GetPort())
if err := stream.Send(&master_pb.HeartbeatResponse{
- VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024,
- SecretKey: string(ms.guard.SecretKey),
+ VolumeSizeLimit: uint64(ms.option.VolumeSizeLimitMB) * 1024 * 1024,
}); err != nil {
return err
}
}
+ glog.V(4).Infof("master received heartbeat %s", heartbeat.String())
message := &master_pb.VolumeLocation{
Url: dn.Url(),
PublicUrl: dn.PublicUrl,
}
- if len(heartbeat.NewVids) > 0 || len(heartbeat.DeletedVids) > 0 {
+ if len(heartbeat.NewVolumes) > 0 || len(heartbeat.DeletedVolumes) > 0 {
// process delta volume ids if exists for fast volume id updates
- message.NewVids = append(message.NewVids, heartbeat.NewVids...)
- message.DeletedVids = append(message.DeletedVids, heartbeat.DeletedVids...)
- } else {
+ for _, volInfo := range heartbeat.NewVolumes {
+ message.NewVids = append(message.NewVids, volInfo.Id)
+ }
+ for _, volInfo := range heartbeat.DeletedVolumes {
+ message.DeletedVids = append(message.DeletedVids, volInfo.Id)
+ }
+ // update master internal volume layouts
+ t.IncrementalSyncDataNodeRegistration(heartbeat.NewVolumes, heartbeat.DeletedVolumes, dn)
+ }
+
+ if len(heartbeat.Volumes) > 0 || heartbeat.HasNoVolumes {
// process heartbeat.Volumes
newVolumes, deletedVolumes := t.SyncDataNodeRegistration(heartbeat.Volumes, dn)
for _, v := range newVolumes {
+ glog.V(0).Infof("master see new volume %d from %s", uint32(v.Id), dn.Url())
message.NewVids = append(message.NewVids, uint32(v.Id))
}
for _, v := range deletedVolumes {
+ glog.V(0).Infof("master see deleted volume %d from %s", uint32(v.Id), dn.Url())
message.DeletedVids = append(message.DeletedVids, uint32(v.Id))
}
}
+ if len(heartbeat.NewEcShards) > 0 || len(heartbeat.DeletedEcShards) > 0 {
+
+ // update master internal volume layouts
+ t.IncrementalSyncDataNodeEcShards(heartbeat.NewEcShards, heartbeat.DeletedEcShards, dn)
+
+ for _, s := range heartbeat.NewEcShards {
+ message.NewVids = append(message.NewVids, s.Id)
+ }
+ for _, s := range heartbeat.DeletedEcShards {
+ if dn.HasVolumesById(needle.VolumeId(s.Id)) {
+ continue
+ }
+ message.DeletedVids = append(message.DeletedVids, s.Id)
+ }
+
+ }
+
+ if len(heartbeat.EcShards) > 0 || heartbeat.HasNoEcShards {
+ glog.V(1).Infof("master recieved ec shards from %s: %+v", dn.Url(), heartbeat.EcShards)
+ newShards, deletedShards := t.SyncDataNodeEcShards(heartbeat.EcShards, dn)
+
+ // broadcast the ec vid changes to master clients
+ for _, s := range newShards {
+ message.NewVids = append(message.NewVids, uint32(s.VolumeId))
+ }
+ for _, s := range deletedShards {
+ if dn.HasVolumesById(s.VolumeId) {
+ continue
+ }
+ message.DeletedVids = append(message.DeletedVids, uint32(s.VolumeId))
+ }
+
+ }
+
if len(message.NewVids) > 0 || len(message.DeletedVids) > 0 {
ms.clientChansLock.RLock()
- for _, ch := range ms.clientChans {
+ for host, ch := range ms.clientChans {
+ glog.V(0).Infof("master send to %s: %s", host, message.String())
ch <- message
}
ms.clientChansLock.RUnlock()
@@ -103,12 +152,15 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
// tell the volume servers about the leader
newLeader, err := t.Leader()
- if err == nil {
- if err := stream.Send(&master_pb.HeartbeatResponse{
- Leader: newLeader,
- }); err != nil {
- return err
- }
+ if err != nil {
+ return err
+ }
+ if err := stream.Send(&master_pb.HeartbeatResponse{
+ Leader: newLeader,
+ MetricsAddress: ms.option.MetricsAddress,
+ MetricsIntervalSeconds: uint32(ms.option.MetricsIntervalSec),
+ }); err != nil {
+ return err
}
}
}
diff --git a/weed/server/master_grpc_server_collection.go b/weed/server/master_grpc_server_collection.go
new file mode 100644
index 000000000..a50cfa192
--- /dev/null
+++ b/weed/server/master_grpc_server_collection.go
@@ -0,0 +1,94 @@
+package weed_server
+
+import (
+ "context"
+
+ "github.com/chrislusf/raft"
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+)
+
+func (ms *MasterServer) CollectionList(ctx context.Context, req *master_pb.CollectionListRequest) (*master_pb.CollectionListResponse, error) {
+
+ if !ms.Topo.IsLeader() {
+ return nil, raft.NotLeaderError
+ }
+
+ resp := &master_pb.CollectionListResponse{}
+ collections := ms.Topo.ListCollections(req.IncludeNormalVolumes, req.IncludeEcVolumes)
+ for _, c := range collections {
+ resp.Collections = append(resp.Collections, &master_pb.Collection{
+ Name: c,
+ })
+ }
+
+ return resp, nil
+}
+
+func (ms *MasterServer) CollectionDelete(ctx context.Context, req *master_pb.CollectionDeleteRequest) (*master_pb.CollectionDeleteResponse, error) {
+
+ if !ms.Topo.IsLeader() {
+ return nil, raft.NotLeaderError
+ }
+
+ resp := &master_pb.CollectionDeleteResponse{}
+
+ err := ms.doDeleteNormalCollection(req.Name)
+
+ if err != nil {
+ return nil, err
+ }
+
+ err = ms.doDeleteEcCollection(req.Name)
+
+ if err != nil {
+ return nil, err
+ }
+
+ return resp, nil
+}
+
+func (ms *MasterServer) doDeleteNormalCollection(collectionName string) error {
+
+ collection, ok := ms.Topo.FindCollection(collectionName)
+ if !ok {
+ return nil
+ }
+
+ for _, server := range collection.ListVolumeServers() {
+ err := operation.WithVolumeServerClient(server.Url(), ms.grpcDialOpiton, func(client volume_server_pb.VolumeServerClient) error {
+ _, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{
+ Collection: collectionName,
+ })
+ return deleteErr
+ })
+ if err != nil {
+ return err
+ }
+ }
+ ms.Topo.DeleteCollection(collectionName)
+
+ return nil
+}
+
+func (ms *MasterServer) doDeleteEcCollection(collectionName string) error {
+
+ listOfEcServers := ms.Topo.ListEcServersByCollection(collectionName)
+
+ for _, server := range listOfEcServers {
+ err := operation.WithVolumeServerClient(server, ms.grpcDialOpiton, func(client volume_server_pb.VolumeServerClient) error {
+ _, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{
+ Collection: collectionName,
+ })
+ return deleteErr
+ })
+ if err != nil {
+ return err
+ }
+ }
+
+ ms.Topo.DeleteEcCollection(collectionName)
+
+ return nil
+}
diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go
index ae0819d2d..19064bcde 100644
--- a/weed/server/master_grpc_server_volume.go
+++ b/weed/server/master_grpc_server_volume.go
@@ -6,7 +6,9 @@ import (
"github.com/chrislusf/raft"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/topology"
)
@@ -48,13 +50,13 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
}
if req.Replication == "" {
- req.Replication = ms.defaultReplicaPlacement
+ req.Replication = ms.option.DefaultReplicaPlacement
}
replicaPlacement, err := storage.NewReplicaPlacementFromString(req.Replication)
if err != nil {
return nil, err
}
- ttl, err := storage.ReadTTL(req.Ttl)
+ ttl, err := needle.ReadTTL(req.Ttl)
if err != nil {
return nil, err
}
@@ -63,7 +65,7 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
Collection: req.Collection,
ReplicaPlacement: replicaPlacement,
Ttl: ttl,
- Prealloacte: ms.preallocate,
+ Prealloacte: ms.preallocateSize,
DataCenter: req.DataCenter,
Rack: req.Rack,
DataNode: req.DataNode,
@@ -75,7 +77,7 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
}
ms.vgLock.Lock()
if !ms.Topo.HasWritableVolume(option) {
- if _, err = ms.vg.AutomaticGrowByType(option, ms.Topo); err != nil {
+ if _, err = ms.vg.AutomaticGrowByType(option, ms.grpcDialOpiton, ms.Topo); err != nil {
ms.vgLock.Unlock()
return nil, fmt.Errorf("Cannot grow volume group! %v", err)
}
@@ -92,6 +94,7 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
Url: dn.Url(),
PublicUrl: dn.PublicUrl,
Count: count,
+ Auth: string(security.GenJwt(ms.guard.SigningKey, ms.guard.ExpiresAfterSec, fid)),
}, nil
}
@@ -102,13 +105,13 @@ func (ms *MasterServer) Statistics(ctx context.Context, req *master_pb.Statistic
}
if req.Replication == "" {
- req.Replication = ms.defaultReplicaPlacement
+ req.Replication = ms.option.DefaultReplicaPlacement
}
replicaPlacement, err := storage.NewReplicaPlacementFromString(req.Replication)
if err != nil {
return nil, err
}
- ttl, err := storage.ReadTTL(req.Ttl)
+ ttl, err := needle.ReadTTL(req.Ttl)
if err != nil {
return nil, err
}
@@ -124,3 +127,60 @@ func (ms *MasterServer) Statistics(ctx context.Context, req *master_pb.Statistic
return resp, nil
}
+
+func (ms *MasterServer) VolumeList(ctx context.Context, req *master_pb.VolumeListRequest) (*master_pb.VolumeListResponse, error) {
+
+ if !ms.Topo.IsLeader() {
+ return nil, raft.NotLeaderError
+ }
+
+ resp := &master_pb.VolumeListResponse{
+ TopologyInfo: ms.Topo.ToTopologyInfo(),
+ VolumeSizeLimitMb: uint64(ms.option.VolumeSizeLimitMB),
+ }
+
+ return resp, nil
+}
+
+func (ms *MasterServer) LookupEcVolume(ctx context.Context, req *master_pb.LookupEcVolumeRequest) (*master_pb.LookupEcVolumeResponse, error) {
+
+ if !ms.Topo.IsLeader() {
+ return nil, raft.NotLeaderError
+ }
+
+ resp := &master_pb.LookupEcVolumeResponse{}
+
+ ecLocations, found := ms.Topo.LookupEcShards(needle.VolumeId(req.VolumeId))
+
+ if !found {
+ return resp, fmt.Errorf("ec volume %d not found", req.VolumeId)
+ }
+
+ resp.VolumeId = req.VolumeId
+
+ for shardId, shardLocations := range ecLocations.Locations {
+ var locations []*master_pb.Location
+ for _, dn := range shardLocations {
+ locations = append(locations, &master_pb.Location{
+ Url: string(dn.Id()),
+ PublicUrl: dn.PublicUrl,
+ })
+ }
+ resp.ShardIdLocations = append(resp.ShardIdLocations, &master_pb.LookupEcVolumeResponse_EcShardIdLocation{
+ ShardId: uint32(shardId),
+ Locations: locations,
+ })
+ }
+
+ return resp, nil
+}
+
+func (ms *MasterServer) GetMasterConfiguration(ctx context.Context, req *master_pb.GetMasterConfigurationRequest) (*master_pb.GetMasterConfigurationResponse, error) {
+
+ resp := &master_pb.GetMasterConfigurationResponse{
+ MetricsAddress: ms.option.MetricsAddress,
+ MetricsIntervalSeconds: uint32(ms.option.MetricsIntervalSec),
+ }
+
+ return resp, nil
+}
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index f22925e56..3689b5495 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -2,10 +2,17 @@ package weed_server
import (
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/shell"
+ "google.golang.org/grpc"
"net/http"
"net/http/httputil"
"net/url"
+ "os"
+ "regexp"
+ "strconv"
+ "strings"
"sync"
+ "time"
"github.com/chrislusf/raft"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -15,17 +22,28 @@ import (
"github.com/chrislusf/seaweedfs/weed/topology"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/gorilla/mux"
+ "github.com/spf13/viper"
)
+type MasterOption struct {
+ Port int
+ MetaFolder string
+ VolumeSizeLimitMB uint
+ VolumePreallocate bool
+ PulseSeconds int
+ DefaultReplicaPlacement string
+ GarbageThreshold float64
+ WhiteList []string
+ DisableHttp bool
+ MetricsAddress string
+ MetricsIntervalSec int
+}
+
type MasterServer struct {
- port int
- metaFolder string
- volumeSizeLimitMB uint
- preallocate int64
- pulseSeconds int
- defaultReplicaPlacement string
- garbageThreshold float64
- guard *security.Guard
+ option *MasterOption
+ guard *security.Guard
+
+ preallocateSize int64
Topo *topology.Topology
vg *topology.VolumeGrowth
@@ -36,56 +54,60 @@ type MasterServer struct {
// notifying clients
clientChansLock sync.RWMutex
clientChans map[string]chan *master_pb.VolumeLocation
+
+ grpcDialOpiton grpc.DialOption
}
-func NewMasterServer(r *mux.Router, port int, metaFolder string,
- volumeSizeLimitMB uint,
- preallocate bool,
- pulseSeconds int,
- defaultReplicaPlacement string,
- garbageThreshold float64,
- whiteList []string,
- secureKey string,
-) *MasterServer {
+func NewMasterServer(r *mux.Router, option *MasterOption) *MasterServer {
+
+ v := viper.GetViper()
+ signingKey := v.GetString("jwt.signing.key")
+ v.SetDefault("jwt.signing.expires_after_seconds", 10)
+ expiresAfterSec := v.GetInt("jwt.signing.expires_after_seconds")
+
+ readSigningKey := v.GetString("jwt.signing.read.key")
+ v.SetDefault("jwt.signing.read.expires_after_seconds", 60)
+ readExpiresAfterSec := v.GetInt("jwt.signing.read.expires_after_seconds")
var preallocateSize int64
- if preallocate {
- preallocateSize = int64(volumeSizeLimitMB) * (1 << 20)
+ if option.VolumePreallocate {
+ preallocateSize = int64(option.VolumeSizeLimitMB) * (1 << 20)
}
ms := &MasterServer{
- port: port,
- volumeSizeLimitMB: volumeSizeLimitMB,
- preallocate: preallocateSize,
- pulseSeconds: pulseSeconds,
- defaultReplicaPlacement: defaultReplicaPlacement,
- garbageThreshold: garbageThreshold,
- clientChans: make(map[string]chan *master_pb.VolumeLocation),
+ option: option,
+ preallocateSize: preallocateSize,
+ clientChans: make(map[string]chan *master_pb.VolumeLocation),
+ grpcDialOpiton: security.LoadClientTLS(v.Sub("grpc"), "master"),
}
ms.bounedLeaderChan = make(chan int, 16)
seq := sequence.NewMemorySequencer()
- ms.Topo = topology.NewTopology("topo", seq, uint64(volumeSizeLimitMB)*1024*1024, pulseSeconds)
+ ms.Topo = topology.NewTopology("topo", seq, uint64(ms.option.VolumeSizeLimitMB)*1024*1024, ms.option.PulseSeconds)
ms.vg = topology.NewDefaultVolumeGrowth()
- glog.V(0).Infoln("Volume Size Limit is", volumeSizeLimitMB, "MB")
-
- ms.guard = security.NewGuard(whiteList, secureKey)
-
- handleStaticResources2(r)
- r.HandleFunc("/", ms.uiStatusHandler)
- r.HandleFunc("/ui/index.html", ms.uiStatusHandler)
- r.HandleFunc("/dir/assign", ms.proxyToLeader(ms.guard.WhiteList(ms.dirAssignHandler)))
- r.HandleFunc("/dir/lookup", ms.proxyToLeader(ms.guard.WhiteList(ms.dirLookupHandler)))
- r.HandleFunc("/dir/status", ms.proxyToLeader(ms.guard.WhiteList(ms.dirStatusHandler)))
- r.HandleFunc("/col/delete", ms.proxyToLeader(ms.guard.WhiteList(ms.collectionDeleteHandler)))
- r.HandleFunc("/vol/grow", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeGrowHandler)))
- r.HandleFunc("/vol/status", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeStatusHandler)))
- r.HandleFunc("/vol/vacuum", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeVacuumHandler)))
- r.HandleFunc("/submit", ms.guard.WhiteList(ms.submitFromMasterServerHandler))
- r.HandleFunc("/stats/health", ms.guard.WhiteList(statsHealthHandler))
- r.HandleFunc("/stats/counter", ms.guard.WhiteList(statsCounterHandler))
- r.HandleFunc("/stats/memory", ms.guard.WhiteList(statsMemoryHandler))
- r.HandleFunc("/{fileId}", ms.proxyToLeader(ms.redirectHandler))
-
- ms.Topo.StartRefreshWritableVolumes(garbageThreshold, ms.preallocate)
+ glog.V(0).Infoln("Volume Size Limit is", ms.option.VolumeSizeLimitMB, "MB")
+
+ ms.guard = security.NewGuard(ms.option.WhiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec)
+
+ if !ms.option.DisableHttp {
+ handleStaticResources2(r)
+ r.HandleFunc("/", ms.proxyToLeader(ms.uiStatusHandler))
+ r.HandleFunc("/ui/index.html", ms.uiStatusHandler)
+ r.HandleFunc("/dir/assign", ms.proxyToLeader(ms.guard.WhiteList(ms.dirAssignHandler)))
+ r.HandleFunc("/dir/lookup", ms.proxyToLeader(ms.guard.WhiteList(ms.dirLookupHandler)))
+ r.HandleFunc("/dir/status", ms.proxyToLeader(ms.guard.WhiteList(ms.dirStatusHandler)))
+ r.HandleFunc("/col/delete", ms.proxyToLeader(ms.guard.WhiteList(ms.collectionDeleteHandler)))
+ r.HandleFunc("/vol/grow", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeGrowHandler)))
+ r.HandleFunc("/vol/status", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeStatusHandler)))
+ r.HandleFunc("/vol/vacuum", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeVacuumHandler)))
+ r.HandleFunc("/submit", ms.guard.WhiteList(ms.submitFromMasterServerHandler))
+ r.HandleFunc("/stats/health", ms.guard.WhiteList(statsHealthHandler))
+ r.HandleFunc("/stats/counter", ms.guard.WhiteList(statsCounterHandler))
+ r.HandleFunc("/stats/memory", ms.guard.WhiteList(statsMemoryHandler))
+ r.HandleFunc("/{fileId}", ms.proxyToLeader(ms.redirectHandler))
+ }
+
+ ms.Topo.StartRefreshWritableVolumes(ms.grpcDialOpiton, ms.option.GarbageThreshold, ms.preallocateSize)
+
+ ms.startAdminScripts()
return ms
}
@@ -98,6 +120,9 @@ func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) {
glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "becomes leader.")
}
})
+ ms.Topo.RaftServer.AddEventListener(raft.StateChangeEventType, func(e raft.Event) {
+ glog.V(0).Infof("state change: %+v", e)
+ })
if ms.Topo.IsLeader() {
glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", "I am the leader!")
} else {
@@ -138,3 +163,63 @@ func (ms *MasterServer) proxyToLeader(f func(w http.ResponseWriter, r *http.Requ
}
}
}
+
+func (ms *MasterServer) startAdminScripts() {
+ v := viper.GetViper()
+ adminScripts := v.GetString("master.maintenance.scripts")
+ v.SetDefault("master.maintenance.sleep_minutes", 17)
+ sleepMinutes := v.GetInt("master.maintenance.sleep_minutes")
+
+ glog.V(0).Infof("adminScripts:\n%v", adminScripts)
+ if adminScripts == "" {
+ return
+ }
+
+ scriptLines := strings.Split(adminScripts, "\n")
+
+ masterAddress := "localhost:" + strconv.Itoa(ms.option.Port)
+
+ var shellOptions shell.ShellOptions
+ shellOptions.GrpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "master")
+ shellOptions.Masters = &masterAddress
+ shellOptions.FilerHost = "localhost"
+ shellOptions.FilerPort = 8888
+ shellOptions.Directory = "/"
+
+ commandEnv := shell.NewCommandEnv(shellOptions)
+
+ reg, _ := regexp.Compile(`'.*?'|".*?"|\S+`)
+
+ go commandEnv.MasterClient.KeepConnectedToMaster()
+
+ go func() {
+ commandEnv.MasterClient.WaitUntilConnected()
+
+ c := time.Tick(time.Duration(sleepMinutes) * time.Minute)
+ for _ = range c {
+ if ms.Topo.IsLeader() {
+ for _, line := range scriptLines {
+
+ cmds := reg.FindAllString(line, -1)
+ if len(cmds) == 0 {
+ continue
+ }
+ args := make([]string, len(cmds[1:]))
+ for i := range args {
+ args[i] = strings.Trim(string(cmds[1+i]), "\"'")
+ }
+ cmd := strings.ToLower(cmds[0])
+
+ for _, c := range shell.Commands {
+ if c.Name() == cmd {
+ glog.V(0).Infof("executing: %s %v", cmd, args)
+ if err := c.Do(args, commandEnv, os.Stdout); err != nil {
+ glog.V(0).Infof("error: %v", err)
+ }
+ }
+ }
+ }
+ }
+ }
+ }()
+}
diff --git a/weed/server/master_server_handlers.go b/weed/server/master_server_handlers.go
index a797dddfc..5c7ff41cf 100644
--- a/weed/server/master_server_handlers.go
+++ b/weed/server/master_server_handlers.go
@@ -7,8 +7,9 @@ import (
"strings"
"github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/stats"
- "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
)
func (ms *MasterServer) lookupVolumeId(vids []string, collection string) (volumeLocations map[string]operation.LookupResult) {
@@ -21,7 +22,7 @@ func (ms *MasterServer) lookupVolumeId(vids []string, collection string) (volume
if _, ok := volumeLocations[vid]; ok {
continue
}
- volumeId, err := storage.NewVolumeId(vid)
+ volumeId, err := needle.NewVolumeId(vid)
if err == nil {
machines := ms.Topo.Lookup(collection, volumeId)
if machines != nil {
@@ -40,12 +41,23 @@ func (ms *MasterServer) lookupVolumeId(vids []string, collection string) (volume
return
}
-// Takes one volumeId only, can not do batch lookup
+// If "fileId" is provided, this returns the fileId location and a JWT to update or delete the file.
+// If "volumeId" is provided, this only returns the volumeId location
func (ms *MasterServer) dirLookupHandler(w http.ResponseWriter, r *http.Request) {
vid := r.FormValue("volumeId")
- commaSep := strings.Index(vid, ",")
- if commaSep > 0 {
- vid = vid[0:commaSep]
+ if vid != "" {
+ // backward compatible
+ commaSep := strings.Index(vid, ",")
+ if commaSep > 0 {
+ vid = vid[0:commaSep]
+ }
+ }
+ fileId := r.FormValue("fileId")
+ if fileId != "" {
+ commaSep := strings.Index(fileId, ",")
+ if commaSep > 0 {
+ vid = fileId[0:commaSep]
+ }
}
vids := []string{vid}
collection := r.FormValue("collection") //optional, but can be faster if too many collections
@@ -54,6 +66,10 @@ func (ms *MasterServer) dirLookupHandler(w http.ResponseWriter, r *http.Request)
httpStatus := http.StatusOK
if location.Error != "" {
httpStatus = http.StatusNotFound
+ } else {
+ forRead := r.FormValue("read")
+ isRead := forRead == "yes"
+ ms.maybeAddJwtAuthorization(w, fileId, !isRead)
}
writeJsonQuiet(w, r, httpStatus, location)
}
@@ -79,7 +95,7 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request)
ms.vgLock.Lock()
defer ms.vgLock.Unlock()
if !ms.Topo.HasWritableVolume(option) {
- if _, err = ms.vg.AutomaticGrowByType(option, ms.Topo); err != nil {
+ if _, err = ms.vg.AutomaticGrowByType(option, ms.grpcDialOpiton, ms.Topo); err != nil {
writeJsonError(w, r, http.StatusInternalServerError,
fmt.Errorf("Cannot grow volume group! %v", err))
return
@@ -88,8 +104,23 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request)
}
fid, count, dn, err := ms.Topo.PickForWrite(requestedCount, option)
if err == nil {
+ ms.maybeAddJwtAuthorization(w, fid, true)
writeJsonQuiet(w, r, http.StatusOK, operation.AssignResult{Fid: fid, Url: dn.Url(), PublicUrl: dn.PublicUrl, Count: count})
} else {
writeJsonQuiet(w, r, http.StatusNotAcceptable, operation.AssignResult{Error: err.Error()})
}
}
+
+func (ms *MasterServer) maybeAddJwtAuthorization(w http.ResponseWriter, fileId string, isWrite bool) {
+ var encodedJwt security.EncodedJwt
+ if isWrite {
+ encodedJwt = security.GenJwt(ms.guard.SigningKey, ms.guard.ExpiresAfterSec, fileId)
+ } else {
+ encodedJwt = security.GenJwt(ms.guard.ReadSigningKey, ms.guard.ReadExpiresAfterSec, fileId)
+ }
+ if encodedJwt == "" {
+ return
+ }
+
+ w.Header().Set("Authorization", "BEARER "+string(encodedJwt))
+}
diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go
index 3a2662908..343bcb8da 100644
--- a/weed/server/master_server_handlers_admin.go
+++ b/weed/server/master_server_handlers_admin.go
@@ -7,12 +7,12 @@ import (
"math/rand"
"net/http"
"strconv"
- "time"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/topology"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -24,11 +24,8 @@ func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.R
return
}
for _, server := range collection.ListVolumeServers() {
- err := operation.WithVolumeServerClient(server.Url(), func(client volume_server_pb.VolumeServerClient) error {
- ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second))
- defer cancel()
-
- _, deleteErr := client.DeleteCollection(ctx, &volume_server_pb.DeleteCollectionRequest{
+ err := operation.WithVolumeServerClient(server.Url(), ms.grpcDialOpiton, func(client volume_server_pb.VolumeServerClient) error {
+ _, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{
Collection: collection.Name,
})
return deleteErr
@@ -50,7 +47,7 @@ func (ms *MasterServer) dirStatusHandler(w http.ResponseWriter, r *http.Request)
func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Request) {
gcString := r.FormValue("garbageThreshold")
- gcThreshold := ms.garbageThreshold
+ gcThreshold := ms.option.GarbageThreshold
if gcString != "" {
var err error
gcThreshold, err = strconv.ParseFloat(gcString, 32)
@@ -60,7 +57,7 @@ func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Reque
}
}
glog.Infoln("garbageThreshold =", gcThreshold)
- ms.Topo.Vacuum(gcThreshold, ms.preallocate)
+ ms.Topo.Vacuum(ms.grpcDialOpiton, gcThreshold, ms.preallocateSize)
ms.dirStatusHandler(w, r)
}
@@ -73,10 +70,10 @@ func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request
}
if err == nil {
if count, err = strconv.Atoi(r.FormValue("count")); err == nil {
- if ms.Topo.FreeSpace() < count*option.ReplicaPlacement.GetCopyCount() {
- err = errors.New("Only " + strconv.Itoa(ms.Topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*option.ReplicaPlacement.GetCopyCount()))
+ if ms.Topo.FreeSpace() < int64(count*option.ReplicaPlacement.GetCopyCount()) {
+ err = fmt.Errorf("only %d volumes left, not enough for %d", ms.Topo.FreeSpace(), count*option.ReplicaPlacement.GetCopyCount())
} else {
- count, err = ms.vg.GrowByCountAndType(count, option, ms.Topo)
+ count, err = ms.vg.GrowByCountAndType(ms.grpcDialOpiton, count, option, ms.Topo)
}
} else {
err = errors.New("parameter count is not found")
@@ -98,7 +95,7 @@ func (ms *MasterServer) volumeStatusHandler(w http.ResponseWriter, r *http.Reque
func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request) {
vid, _, _, _, _ := parseURLPath(r.URL.Path)
- volumeId, err := storage.NewVolumeId(vid)
+ volumeId, err := needle.NewVolumeId(vid)
if err != nil {
debug("parsing error:", err, r.URL.Path)
return
@@ -122,17 +119,17 @@ func (ms *MasterServer) selfUrl(r *http.Request) string {
if r.Host != "" {
return r.Host
}
- return "localhost:" + strconv.Itoa(ms.port)
+ return "localhost:" + strconv.Itoa(ms.option.Port)
}
func (ms *MasterServer) submitFromMasterServerHandler(w http.ResponseWriter, r *http.Request) {
if ms.Topo.IsLeader() {
- submitForClientHandler(w, r, ms.selfUrl(r))
+ submitForClientHandler(w, r, ms.selfUrl(r), ms.grpcDialOpiton)
} else {
masterUrl, err := ms.Topo.Leader()
if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err)
} else {
- submitForClientHandler(w, r, masterUrl)
+ submitForClientHandler(w, r, masterUrl, ms.grpcDialOpiton)
}
}
}
@@ -145,17 +142,17 @@ func (ms *MasterServer) HasWritableVolume(option *topology.VolumeGrowOption) boo
func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGrowOption, error) {
replicationString := r.FormValue("replication")
if replicationString == "" {
- replicationString = ms.defaultReplicaPlacement
+ replicationString = ms.option.DefaultReplicaPlacement
}
replicaPlacement, err := storage.NewReplicaPlacementFromString(replicationString)
if err != nil {
return nil, err
}
- ttl, err := storage.ReadTTL(r.FormValue("ttl"))
+ ttl, err := needle.ReadTTL(r.FormValue("ttl"))
if err != nil {
return nil, err
}
- preallocate := ms.preallocate
+ preallocate := ms.preallocateSize
if r.FormValue("preallocate") != "" {
preallocate, err = strconv.ParseInt(r.FormValue("preallocate"), 10, 64)
if err != nil {
diff --git a/weed/server/master_ui/templates.go b/weed/server/master_ui/templates.go
index f32e8e61b..b674e3f82 100644
--- a/weed/server/master_ui/templates.go
+++ b/weed/server/master_ui/templates.go
@@ -41,7 +41,7 @@ var StatusTpl = template.Must(template.New("status").Parse(`<!DOCTYPE html>
<td class="col-sm-2 field-label"><label>Other Masters:</label></td>
<td class="col-sm-10"><ul class="list-unstyled">
{{ range $k, $p := .Peers }}
- <li><a href="{{ $p.ConnectionString }}">{{ $p.Name }}</a></li>
+ <li><a href="http://{{ $p.Name }}/ui/index.html">{{ $p.Name }}</a></li>
{{ end }}
</ul></td>
</tr>
@@ -76,6 +76,7 @@ var StatusTpl = template.Must(template.New("status").Parse(`<!DOCTYPE html>
<th>Rack</th>
<th>RemoteAddr</th>
<th>#Volumes</th>
+ <th>#ErasureCodingShards</th>
<th>Max</th>
</tr>
</thead>
@@ -88,6 +89,7 @@ var StatusTpl = template.Must(template.New("status").Parse(`<!DOCTYPE html>
<td>{{ $rack.Id }}</td>
<td><a href="http://{{ $dn.Url }}/ui/index.html">{{ $dn.Url }}</a></td>
<td>{{ $dn.Volumes }}</td>
+ <td>{{ $dn.EcShards }}</td>
<td>{{ $dn.Max }}</td>
</tr>
{{ end }}
diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go
index 68042da54..88320ed98 100644
--- a/weed/server/raft_server.go
+++ b/weed/server/raft_server.go
@@ -2,37 +2,35 @@ package weed_server
import (
"encoding/json"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "google.golang.org/grpc"
"io/ioutil"
- "math/rand"
"os"
"path"
"reflect"
"sort"
- "strings"
"time"
"github.com/chrislusf/raft"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/topology"
- "github.com/gorilla/mux"
)
type RaftServer struct {
peers []string // initial peers to join with
raftServer raft.Server
dataDir string
- httpAddr string
- router *mux.Router
+ serverAddr string
topo *topology.Topology
+ *raft.GrpcServer
}
-func NewRaftServer(r *mux.Router, peers []string, httpAddr string, dataDir string, topo *topology.Topology, pulseSeconds int) *RaftServer {
+func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr string, dataDir string, topo *topology.Topology, pulseSeconds int) *RaftServer {
s := &RaftServer{
- peers: peers,
- httpAddr: httpAddr,
- dataDir: dataDir,
- router: r,
- topo: topo,
+ peers: peers,
+ serverAddr: serverAddr,
+ dataDir: dataDir,
+ topo: topo,
}
if glog.V(4) {
@@ -42,42 +40,39 @@ func NewRaftServer(r *mux.Router, peers []string, httpAddr string, dataDir strin
raft.RegisterCommand(&topology.MaxVolumeIdCommand{})
var err error
- transporter := raft.NewHTTPTransporter("/cluster", time.Second)
- transporter.Transport.MaxIdleConnsPerHost = 1024
- transporter.Transport.IdleConnTimeout = time.Second
- glog.V(0).Infof("Starting RaftServer with %v", httpAddr)
+ transporter := raft.NewGrpcTransporter(grpcDialOption)
+ glog.V(0).Infof("Starting RaftServer with %v", serverAddr)
// Clear old cluster configurations if peers are changed
- if oldPeers, changed := isPeersChanged(s.dataDir, httpAddr, s.peers); changed {
+ if oldPeers, changed := isPeersChanged(s.dataDir, serverAddr, s.peers); changed {
glog.V(0).Infof("Peers Change: %v => %v", oldPeers, s.peers)
os.RemoveAll(path.Join(s.dataDir, "conf"))
os.RemoveAll(path.Join(s.dataDir, "log"))
os.RemoveAll(path.Join(s.dataDir, "snapshot"))
}
- s.raftServer, err = raft.NewServer(s.httpAddr, s.dataDir, transporter, nil, topo, "")
+ s.raftServer, err = raft.NewServer(s.serverAddr, s.dataDir, transporter, nil, topo, "")
if err != nil {
glog.V(0).Infoln(err)
return nil
}
- transporter.Install(s.raftServer, s)
s.raftServer.SetHeartbeatInterval(500 * time.Millisecond)
s.raftServer.SetElectionTimeout(time.Duration(pulseSeconds) * 500 * time.Millisecond)
s.raftServer.Start()
- s.router.HandleFunc("/cluster/status", s.statusHandler).Methods("GET")
-
for _, peer := range s.peers {
- s.raftServer.AddPeer(peer, "http://"+peer)
+ s.raftServer.AddPeer(peer, util.ServerToGrpcAddress(peer))
}
- time.Sleep(time.Duration(1000+rand.Int31n(3000)) * time.Millisecond)
- if s.raftServer.IsLogEmpty() {
+
+ s.GrpcServer = raft.NewGrpcServer(s.raftServer)
+
+ if s.raftServer.IsLogEmpty() && isTheFirstOne(serverAddr, s.peers) {
// Initialize the server by joining itself.
glog.V(0).Infoln("Initializing new cluster")
_, err := s.raftServer.Do(&raft.DefaultJoinCommand{
Name: s.raftServer.Name(),
- ConnectionString: "http://" + s.httpAddr,
+ ConnectionString: util.ServerToGrpcAddress(s.serverAddr),
})
if err != nil {
@@ -95,7 +90,7 @@ func (s *RaftServer) Peers() (members []string) {
peers := s.raftServer.Peers()
for _, p := range peers {
- members = append(members, strings.TrimPrefix(p.ConnectionString, "http://"))
+ members = append(members, p.Name)
}
return
@@ -114,7 +109,7 @@ func isPeersChanged(dir string, self string, peers []string) (oldPeers []string,
}
for _, p := range conf.Peers {
- oldPeers = append(oldPeers, strings.TrimPrefix(p.ConnectionString, "http://"))
+ oldPeers = append(oldPeers, p.Name)
}
oldPeers = append(oldPeers, self)
@@ -128,3 +123,11 @@ func isPeersChanged(dir string, self string, peers []string) (oldPeers []string,
return oldPeers, !reflect.DeepEqual(peers, oldPeers)
}
+
+func isTheFirstOne(self string, peers []string) bool {
+ sort.Strings(peers)
+ if len(peers) <= 0 {
+ return true
+ }
+ return self == peers[0]
+}
diff --git a/weed/server/raft_server_handlers.go b/weed/server/raft_server_handlers.go
index 627fe354e..fd38cb977 100644
--- a/weed/server/raft_server_handlers.go
+++ b/weed/server/raft_server_handlers.go
@@ -1,16 +1,17 @@
package weed_server
import (
- "github.com/chrislusf/seaweedfs/weed/operation"
"net/http"
)
-func (s *RaftServer) HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request)) {
- s.router.HandleFunc(pattern, handler)
+type ClusterStatusResult struct {
+ IsLeader bool `json:"IsLeader,omitempty"`
+ Leader string `json:"Leader,omitempty"`
+ Peers []string `json:"Peers,omitempty"`
}
-func (s *RaftServer) statusHandler(w http.ResponseWriter, r *http.Request) {
- ret := operation.ClusterStatusResult{
+func (s *RaftServer) StatusHandler(w http.ResponseWriter, r *http.Request) {
+ ret := ClusterStatusResult{
IsLeader: s.topo.IsLeader(),
Peers: s.Peers(),
}
diff --git a/weed/server/volume_grpc_admin.go b/weed/server/volume_grpc_admin.go
index 429ca9b68..35c2508a6 100644
--- a/weed/server/volume_grpc_admin.go
+++ b/weed/server/volume_grpc_admin.go
@@ -5,7 +5,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
- "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
)
func (vs *VolumeServer) DeleteCollection(ctx context.Context, req *volume_server_pb.DeleteCollectionRequest) (*volume_server_pb.DeleteCollectionResponse, error) {
@@ -24,12 +24,12 @@ func (vs *VolumeServer) DeleteCollection(ctx context.Context, req *volume_server
}
-func (vs *VolumeServer) AssignVolume(ctx context.Context, req *volume_server_pb.AssignVolumeRequest) (*volume_server_pb.AssignVolumeResponse, error) {
+func (vs *VolumeServer) AllocateVolume(ctx context.Context, req *volume_server_pb.AllocateVolumeRequest) (*volume_server_pb.AllocateVolumeResponse, error) {
- resp := &volume_server_pb.AssignVolumeResponse{}
+ resp := &volume_server_pb.AllocateVolumeResponse{}
err := vs.store.AddVolume(
- storage.VolumeId(req.VolumdId),
+ needle.VolumeId(req.VolumeId),
req.Collection,
vs.needleMapKind,
req.Replication,
@@ -51,7 +51,7 @@ func (vs *VolumeServer) VolumeMount(ctx context.Context, req *volume_server_pb.V
resp := &volume_server_pb.VolumeMountResponse{}
- err := vs.store.MountVolume(storage.VolumeId(req.VolumdId))
+ err := vs.store.MountVolume(needle.VolumeId(req.VolumeId))
if err != nil {
glog.Errorf("volume mount %v: %v", req, err)
@@ -67,7 +67,7 @@ func (vs *VolumeServer) VolumeUnmount(ctx context.Context, req *volume_server_pb
resp := &volume_server_pb.VolumeUnmountResponse{}
- err := vs.store.UnmountVolume(storage.VolumeId(req.VolumdId))
+ err := vs.store.UnmountVolume(needle.VolumeId(req.VolumeId))
if err != nil {
glog.Errorf("volume unmount %v: %v", req, err)
@@ -83,7 +83,7 @@ func (vs *VolumeServer) VolumeDelete(ctx context.Context, req *volume_server_pb.
resp := &volume_server_pb.VolumeDeleteResponse{}
- err := vs.store.DeleteVolume(storage.VolumeId(req.VolumdId))
+ err := vs.store.DeleteVolume(needle.VolumeId(req.VolumeId))
if err != nil {
glog.Errorf("volume delete %v: %v", req, err)
@@ -94,3 +94,19 @@ func (vs *VolumeServer) VolumeDelete(ctx context.Context, req *volume_server_pb.
return resp, err
}
+
+func (vs *VolumeServer) VolumeMarkReadonly(ctx context.Context, req *volume_server_pb.VolumeMarkReadonlyRequest) (*volume_server_pb.VolumeMarkReadonlyResponse, error) {
+
+ resp := &volume_server_pb.VolumeMarkReadonlyResponse{}
+
+ err := vs.store.MarkVolumeReadonly(needle.VolumeId(req.VolumeId))
+
+ if err != nil {
+ glog.Errorf("volume mark readonly %v: %v", req, err)
+ } else {
+ glog.V(2).Infof("volume mark readonly %v", req)
+ }
+
+ return resp, err
+
+}
diff --git a/weed/server/volume_grpc_batch_delete.go b/weed/server/volume_grpc_batch_delete.go
index 3554d97ae..d7fbb6edf 100644
--- a/weed/server/volume_grpc_batch_delete.go
+++ b/weed/server/volume_grpc_batch_delete.go
@@ -7,7 +7,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
- "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
)
func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.BatchDeleteRequest) (*volume_server_pb.BatchDeleteResponse, error) {
@@ -26,8 +26,8 @@ func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.B
continue
}
- n := new(storage.Needle)
- volumeId, _ := storage.NewVolumeId(vid)
+ n := new(needle.Needle)
+ volumeId, _ := needle.NewVolumeId(vid)
n.ParsePath(id_cookie)
cookie := n.Cookie
diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go
index bd3ffd7b3..731675b48 100644
--- a/weed/server/volume_grpc_client_to_master.go
+++ b/weed/server/volume_grpc_client_to_master.go
@@ -2,11 +2,16 @@ package weed_server
import (
"fmt"
+ "net"
"time"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
+ "github.com/spf13/viper"
+ "google.golang.org/grpc"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
- "github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
"golang.org/x/net/context"
)
@@ -16,41 +21,46 @@ func (vs *VolumeServer) GetMaster() string {
}
func (vs *VolumeServer) heartbeat() {
- glog.V(0).Infof("Volume server start with masters: %v", vs.MasterNodes)
+ glog.V(0).Infof("Volume server start with seed master nodes: %v", vs.SeedMasterNodes)
vs.store.SetDataCenter(vs.dataCenter)
vs.store.SetRack(vs.rack)
+ grpcDialOption := security.LoadClientTLS(viper.Sub("grpc"), "volume")
+
var err error
var newLeader string
for {
- for _, master := range vs.MasterNodes {
+ for _, master := range vs.SeedMasterNodes {
if newLeader != "" {
master = newLeader
}
- masterGrpcAddress, parseErr := util.ParseServerToGrpcAddress(master, 0)
+ masterGrpcAddress, parseErr := util.ParseServerToGrpcAddress(master)
if parseErr != nil {
- glog.V(0).Infof("failed to parse master grpc %v", masterGrpcAddress)
+ glog.V(0).Infof("failed to parse master grpc %v: %v", masterGrpcAddress, parseErr)
continue
}
- newLeader, err = vs.doHeartbeat(master, masterGrpcAddress, time.Duration(vs.pulseSeconds)*time.Second)
+ vs.store.MasterAddress = master
+ newLeader, err = vs.doHeartbeat(context.Background(), master, masterGrpcAddress, grpcDialOption, time.Duration(vs.pulseSeconds)*time.Second)
if err != nil {
glog.V(0).Infof("heartbeat error: %v", err)
time.Sleep(time.Duration(vs.pulseSeconds) * time.Second)
+ newLeader = ""
+ vs.store.MasterAddress = ""
}
}
}
}
-func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, sleepInterval time.Duration) (newLeader string, err error) {
+func (vs *VolumeServer) doHeartbeat(ctx context.Context, masterNode, masterGrpcAddress string, grpcDialOption grpc.DialOption, sleepInterval time.Duration) (newLeader string, err error) {
- grpcConection, err := util.GrpcDial(masterGrpcAddress)
+ grpcConection, err := util.GrpcDial(ctx, masterGrpcAddress, grpcDialOption)
if err != nil {
return "", fmt.Errorf("fail to dial %s : %v", masterNode, err)
}
defer grpcConection.Close()
client := master_pb.NewSeaweedClient(grpcConection)
- stream, err := client.SendHeartbeat(context.Background())
+ stream, err := client.SendHeartbeat(ctx)
if err != nil {
glog.V(0).Infof("SendHeartbeat to %s: %v", masterNode, err)
return "", err
@@ -58,9 +68,6 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, sleepI
glog.V(0).Infof("Heartbeat to: %v", masterNode)
vs.currentMaster = masterNode
- vs.store.Client = stream
- defer func() { vs.store.Client = nil }()
-
doneChan := make(chan error, 1)
go func() {
@@ -71,17 +78,18 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, sleepI
return
}
if in.GetVolumeSizeLimit() != 0 {
- vs.store.VolumeSizeLimit = in.GetVolumeSizeLimit()
- }
- if in.GetSecretKey() != "" {
- vs.guard.SecretKey = security.Secret(in.GetSecretKey())
+ vs.store.SetVolumeSizeLimit(in.GetVolumeSizeLimit())
}
- if in.GetLeader() != "" && masterNode != in.GetLeader() {
+ if in.GetLeader() != "" && masterNode != in.GetLeader() && !isSameIP(in.GetLeader(), masterNode) {
glog.V(0).Infof("Volume Server found a new master newLeader: %v instead of %v", in.GetLeader(), masterNode)
newLeader = in.GetLeader()
doneChan <- nil
return
}
+ if in.GetMetricsAddress() != "" && vs.MetricsAddress != in.GetMetricsAddress() {
+ vs.MetricsAddress = in.GetMetricsAddress()
+ vs.MetricsIntervalSec = int(in.GetMetricsIntervalSeconds())
+ }
}
}()
@@ -90,33 +98,89 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, sleepI
return "", err
}
- tickChan := time.Tick(sleepInterval)
+ if err = stream.Send(vs.store.CollectErasureCodingHeartbeat()); err != nil {
+ glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err)
+ return "", err
+ }
+
+ volumeTickChan := time.Tick(sleepInterval)
+ ecShardTickChan := time.Tick(17 * sleepInterval)
for {
select {
- case vid := <-vs.store.NewVolumeIdChan:
+ case volumeMessage := <-vs.store.NewVolumesChan:
+ deltaBeat := &master_pb.Heartbeat{
+ NewVolumes: []*master_pb.VolumeShortInformationMessage{
+ &volumeMessage,
+ },
+ }
+ glog.V(1).Infof("volume server %s:%d adds volume %d", vs.store.Ip, vs.store.Port, volumeMessage.Id)
+ if err = stream.Send(deltaBeat); err != nil {
+ glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterNode, err)
+ return "", err
+ }
+ case ecShardMessage := <-vs.store.NewEcShardsChan:
+ deltaBeat := &master_pb.Heartbeat{
+ NewEcShards: []*master_pb.VolumeEcShardInformationMessage{
+ &ecShardMessage,
+ },
+ }
+ glog.V(1).Infof("volume server %s:%d adds ec shard %d:%d", vs.store.Ip, vs.store.Port, ecShardMessage.Id,
+ erasure_coding.ShardBits(ecShardMessage.EcIndexBits).ShardIds())
+ if err = stream.Send(deltaBeat); err != nil {
+ glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterNode, err)
+ return "", err
+ }
+ case volumeMessage := <-vs.store.DeletedVolumesChan:
deltaBeat := &master_pb.Heartbeat{
- NewVids: []uint32{uint32(vid)},
+ DeletedVolumes: []*master_pb.VolumeShortInformationMessage{
+ &volumeMessage,
+ },
}
+ glog.V(1).Infof("volume server %s:%d deletes volume %d", vs.store.Ip, vs.store.Port, volumeMessage.Id)
if err = stream.Send(deltaBeat); err != nil {
glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterNode, err)
return "", err
}
- case vid := <-vs.store.DeletedVolumeIdChan:
+ case ecShardMessage := <-vs.store.DeletedEcShardsChan:
deltaBeat := &master_pb.Heartbeat{
- DeletedVids: []uint32{uint32(vid)},
+ DeletedEcShards: []*master_pb.VolumeEcShardInformationMessage{
+ &ecShardMessage,
+ },
}
+ glog.V(1).Infof("volume server %s:%d deletes ec shard %d:%d", vs.store.Ip, vs.store.Port, ecShardMessage.Id,
+ erasure_coding.ShardBits(ecShardMessage.EcIndexBits).ShardIds())
if err = stream.Send(deltaBeat); err != nil {
glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterNode, err)
return "", err
}
- case <-tickChan:
+ case <-volumeTickChan:
+ glog.V(4).Infof("volume server %s:%d heartbeat", vs.store.Ip, vs.store.Port)
if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err)
return "", err
}
+ case <-ecShardTickChan:
+ glog.V(4).Infof("volume server %s:%d ec heartbeat", vs.store.Ip, vs.store.Port)
+ if err = stream.Send(vs.store.CollectErasureCodingHeartbeat()); err != nil {
+ glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err)
+ return "", err
+ }
case err = <-doneChan:
return
}
}
}
+
+func isSameIP(ip string, host string) bool {
+ ips, err := net.LookupIP(host)
+ if err != nil {
+ return false
+ }
+ for _, t := range ips {
+ if ip == t.String() {
+ return true
+ }
+ }
+ return false
+}
diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go
new file mode 100644
index 000000000..8b39146ee
--- /dev/null
+++ b/weed/server/volume_grpc_copy.go
@@ -0,0 +1,263 @@
+package weed_server
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "math"
+ "os"
+ "path"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+const BufferSizeLimit = 1024 * 1024 * 2
+
+// VolumeCopy copy the .idx .dat files, and mount the volume
+func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.VolumeCopyRequest) (*volume_server_pb.VolumeCopyResponse, error) {
+
+ v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
+ if v != nil {
+ return nil, fmt.Errorf("volume %d already exists", req.VolumeId)
+ }
+
+ location := vs.store.FindFreeLocation()
+ if location == nil {
+ return nil, fmt.Errorf("no space left")
+ }
+
+ // the master will not start compaction for read-only volumes, so it is safe to just copy files directly
+ // copy .dat and .idx files
+ // read .idx .dat file size and timestamp
+ // send .idx file
+ // send .dat file
+ // confirm size and timestamp
+ var volFileInfoResp *volume_server_pb.ReadVolumeFileStatusResponse
+ var volumeFileName, idxFileName, datFileName string
+ err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ var err error
+ volFileInfoResp, err = client.ReadVolumeFileStatus(ctx,
+ &volume_server_pb.ReadVolumeFileStatusRequest{
+ VolumeId: req.VolumeId,
+ })
+ if nil != err {
+ return fmt.Errorf("read volume file status failed, %v", err)
+ }
+
+ volumeFileName = storage.VolumeFileName(location.Directory, volFileInfoResp.Collection, int(req.VolumeId))
+
+ // println("source:", volFileInfoResp.String())
+ // copy ecx file
+ if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, volumeFileName, ".idx", false); err != nil {
+ return err
+ }
+
+ if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".dat", false); err != nil {
+ return err
+ }
+
+ return nil
+ })
+
+ idxFileName = volumeFileName + ".idx"
+ datFileName = volumeFileName + ".dat"
+
+ if err != nil && volumeFileName != "" {
+ if idxFileName != "" {
+ os.Remove(idxFileName)
+ }
+ if datFileName != "" {
+ os.Remove(datFileName)
+ }
+ return nil, err
+ }
+
+ if err = checkCopyFiles(volFileInfoResp, idxFileName, datFileName); err != nil { // added by panyc16
+ return nil, err
+ }
+
+ // mount the volume
+ err = vs.store.MountVolume(needle.VolumeId(req.VolumeId))
+ if err != nil {
+ return nil, fmt.Errorf("failed to mount volume %d: %v", req.VolumeId, err)
+ }
+
+ return &volume_server_pb.VolumeCopyResponse{
+ LastAppendAtNs: volFileInfoResp.DatFileTimestampSeconds * uint64(time.Second),
+ }, err
+}
+
+func (vs *VolumeServer) doCopyFile(ctx context.Context, client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid uint32,
+ compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend bool) error {
+
+ copyFileClient, err := client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{
+ VolumeId: vid,
+ Ext: ext,
+ CompactionRevision: compactRevision,
+ StopOffset: stopOffset,
+ Collection: collection,
+ IsEcVolume: isEcVolume,
+ })
+ if err != nil {
+ return fmt.Errorf("failed to start copying volume %d %s file: %v", vid, ext, err)
+ }
+
+ err = writeToFile(copyFileClient, baseFileName+ext, util.NewWriteThrottler(vs.compactionBytePerSecond), isAppend)
+ if err != nil {
+ return fmt.Errorf("failed to copy %s file: %v", baseFileName+ext, err)
+ }
+
+ return nil
+
+}
+
+/**
+only check the the differ of the file size
+todo: maybe should check the received count and deleted count of the volume
+*/
+func checkCopyFiles(originFileInf *volume_server_pb.ReadVolumeFileStatusResponse, idxFileName, datFileName string) error {
+ stat, err := os.Stat(idxFileName)
+ if err != nil {
+ return fmt.Errorf("stat idx file %s failed, %v", idxFileName, err)
+ }
+ if originFileInf.IdxFileSize != uint64(stat.Size()) {
+ return fmt.Errorf("idx file %s size [%v] is not same as origin file size [%v]",
+ idxFileName, stat.Size(), originFileInf.IdxFileSize)
+ }
+
+ stat, err = os.Stat(datFileName)
+ if err != nil {
+ return fmt.Errorf("get dat file info failed, %v", err)
+ }
+ if originFileInf.DatFileSize != uint64(stat.Size()) {
+ return fmt.Errorf("the dat file size [%v] is not same as origin file size [%v]",
+ stat.Size(), originFileInf.DatFileSize)
+ }
+ return nil
+}
+
+func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string, wt *util.WriteThrottler, isAppend bool) error {
+ glog.V(4).Infof("writing to %s", fileName)
+ flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC
+ if isAppend {
+ flags = os.O_WRONLY | os.O_CREATE
+ }
+ dst, err := os.OpenFile(fileName, flags, 0644)
+ if err != nil {
+ return nil
+ }
+ defer dst.Close()
+
+ for {
+ resp, receiveErr := client.Recv()
+ if receiveErr == io.EOF {
+ break
+ }
+ if receiveErr != nil {
+ return fmt.Errorf("receiving %s: %v", fileName, receiveErr)
+ }
+ dst.Write(resp.FileContent)
+ wt.MaybeSlowdown(int64(len(resp.FileContent)))
+ }
+ return nil
+}
+
+func (vs *VolumeServer) ReadVolumeFileStatus(ctx context.Context, req *volume_server_pb.ReadVolumeFileStatusRequest) (*volume_server_pb.ReadVolumeFileStatusResponse, error) {
+ resp := &volume_server_pb.ReadVolumeFileStatusResponse{}
+ v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
+ if v == nil {
+ return nil, fmt.Errorf("not found volume id %d", req.VolumeId)
+ }
+
+ resp.VolumeId = req.VolumeId
+ datSize, idxSize, modTime := v.FileStat()
+ resp.DatFileSize = datSize
+ resp.IdxFileSize = idxSize
+ resp.DatFileTimestampSeconds = uint64(modTime.Unix())
+ resp.IdxFileTimestampSeconds = uint64(modTime.Unix())
+ resp.FileCount = v.FileCount()
+ resp.CompactionRevision = uint32(v.CompactionRevision)
+ resp.Collection = v.Collection
+ return resp, nil
+}
+
+// CopyFile client pulls the volume related file from the source server.
+// if req.CompactionRevision != math.MaxUint32, it ensures the compact revision is as expected
+// The copying still stop at req.StopOffset, but you can set it to math.MaxUint64 in order to read all data.
+func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream volume_server_pb.VolumeServer_CopyFileServer) error {
+
+ var fileName string
+ if !req.IsEcVolume {
+ v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
+ if v == nil {
+ return fmt.Errorf("not found volume id %d", req.VolumeId)
+ }
+
+ if uint32(v.CompactionRevision) != req.CompactionRevision && req.CompactionRevision != math.MaxUint32 {
+ return fmt.Errorf("volume %d is compacted", req.VolumeId)
+ }
+ fileName = v.FileName() + req.Ext
+ } else {
+ baseFileName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId)) + req.Ext
+ for _, location := range vs.store.Locations {
+ tName := path.Join(location.Directory, baseFileName)
+ if util.FileExists(tName) {
+ fileName = tName
+ }
+ }
+ if fileName == "" {
+ return fmt.Errorf("CopyFile not found ec volume id %d", req.VolumeId)
+ }
+ }
+
+ bytesToRead := int64(req.StopOffset)
+
+ file, err := os.Open(fileName)
+ if err != nil {
+ return err
+ }
+ defer file.Close()
+
+ buffer := make([]byte, BufferSizeLimit)
+
+ for bytesToRead > 0 {
+ bytesread, err := file.Read(buffer)
+
+ // println(fileName, "read", bytesread, "bytes, with target", bytesToRead)
+
+ if err != nil {
+ if err != io.EOF {
+ return err
+ }
+ // println(fileName, "read", bytesread, "bytes, with target", bytesToRead, "err", err.Error())
+ break
+ }
+
+ if int64(bytesread) > bytesToRead {
+ bytesread = int(bytesToRead)
+ }
+ err = stream.Send(&volume_server_pb.CopyFileResponse{
+ FileContent: buffer[:bytesread],
+ })
+ if err != nil {
+ // println("sending", bytesread, "bytes err", err.Error())
+ return err
+ }
+
+ bytesToRead -= int64(bytesread)
+
+ }
+
+ return nil
+}
+
+func (vs *VolumeServer) findVolumeOrEcVolumeLocation(volumeId needle.VolumeId) {
+
+}
diff --git a/weed/server/volume_grpc_copy_incremental.go b/weed/server/volume_grpc_copy_incremental.go
new file mode 100644
index 000000000..f56fbeef4
--- /dev/null
+++ b/weed/server/volume_grpc_copy_incremental.go
@@ -0,0 +1,66 @@
+package weed_server
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "os"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+)
+
+func (vs *VolumeServer) VolumeIncrementalCopy(req *volume_server_pb.VolumeIncrementalCopyRequest, stream volume_server_pb.VolumeServer_VolumeIncrementalCopyServer) error {
+
+ v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
+ if v == nil {
+ return fmt.Errorf("not found volume id %d", req.VolumeId)
+ }
+
+ stopOffset, _, _ := v.FileStat()
+ foundOffset, isLastOne, err := v.BinarySearchByAppendAtNs(req.SinceNs)
+ if err != nil {
+ return fmt.Errorf("fail to locate by appendAtNs %d: %s", req.SinceNs, err)
+ }
+
+ if isLastOne {
+ return nil
+ }
+
+ startOffset := foundOffset.ToAcutalOffset()
+
+ buf := make([]byte, 1024*1024*2)
+ return sendFileContent(v.DataFile(), buf, startOffset, int64(stopOffset), stream)
+
+}
+
+func (vs *VolumeServer) VolumeSyncStatus(ctx context.Context, req *volume_server_pb.VolumeSyncStatusRequest) (*volume_server_pb.VolumeSyncStatusResponse, error) {
+
+ v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
+ if v == nil {
+ return nil, fmt.Errorf("not found volume id %d", req.VolumeId)
+ }
+
+ resp := v.GetVolumeSyncStatus()
+
+ return resp, nil
+
+}
+
+func sendFileContent(datFile *os.File, buf []byte, startOffset, stopOffset int64, stream volume_server_pb.VolumeServer_VolumeIncrementalCopyServer) error {
+ var blockSizeLimit = int64(len(buf))
+ for i := int64(0); i < stopOffset-startOffset; i += blockSizeLimit {
+ n, readErr := datFile.ReadAt(buf, startOffset+i)
+ if readErr == nil || readErr == io.EOF {
+ resp := &volume_server_pb.VolumeIncrementalCopyResponse{}
+ resp.FileContent = buf[:int64(n)]
+ sendErr := stream.Send(resp)
+ if sendErr != nil {
+ return sendErr
+ }
+ } else {
+ return readErr
+ }
+ }
+ return nil
+}
diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go
new file mode 100644
index 000000000..8140a06f6
--- /dev/null
+++ b/weed/server/volume_grpc_erasure_coding.go
@@ -0,0 +1,313 @@
+package weed_server
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "math"
+ "os"
+ "path"
+ "strings"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+/*
+
+Steps to apply erasure coding to .dat .idx files
+0. ensure the volume is readonly
+1. client call VolumeEcShardsGenerate to generate the .ecx and .ec01~.ec14 files
+2. client ask master for possible servers to hold the ec files, at least 4 servers
+3. client call VolumeEcShardsCopy on above target servers to copy ec files from the source server
+4. target servers report the new ec files to the master
+5. master stores vid -> [14]*DataNode
+6. client checks master. If all 14 slices are ready, delete the original .idx, .idx files
+
+*/
+
+// VolumeEcShardsGenerate generates the .ecx and .ec01 ~ .ec14 files
+func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_server_pb.VolumeEcShardsGenerateRequest) (*volume_server_pb.VolumeEcShardsGenerateResponse, error) {
+
+ v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
+ if v == nil {
+ return nil, fmt.Errorf("volume %d not found", req.VolumeId)
+ }
+ baseFileName := v.FileName()
+
+ if v.Collection != req.Collection {
+ return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
+ }
+
+ // write .ecx file
+ if err := erasure_coding.WriteSortedEcxFile(baseFileName); err != nil {
+ return nil, fmt.Errorf("WriteSortedEcxFile %s: %v", baseFileName, err)
+ }
+
+ // write .ec01 ~ .ec14 files
+ if err := erasure_coding.WriteEcFiles(baseFileName); err != nil {
+ return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err)
+ }
+
+ return &volume_server_pb.VolumeEcShardsGenerateResponse{}, nil
+}
+
+// VolumeEcShardsRebuild generates the any of the missing .ec01 ~ .ec14 files
+func (vs *VolumeServer) VolumeEcShardsRebuild(ctx context.Context, req *volume_server_pb.VolumeEcShardsRebuildRequest) (*volume_server_pb.VolumeEcShardsRebuildResponse, error) {
+
+ baseFileName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId))
+
+ var rebuiltShardIds []uint32
+
+ for _, location := range vs.store.Locations {
+ if util.FileExists(path.Join(location.Directory, baseFileName+".ecx")) {
+ // write .ec01 ~ .ec14 files
+ baseFileName = path.Join(location.Directory, baseFileName)
+ if generatedShardIds, err := erasure_coding.RebuildEcFiles(baseFileName); err != nil {
+ return nil, fmt.Errorf("RebuildEcFiles %s: %v", baseFileName, err)
+ } else {
+ rebuiltShardIds = generatedShardIds
+ }
+
+ if err := erasure_coding.RebuildEcxFile(baseFileName); err != nil {
+ return nil, fmt.Errorf("RebuildEcxFile %s: %v", baseFileName, err)
+ }
+
+ break
+ }
+ }
+
+ return &volume_server_pb.VolumeEcShardsRebuildResponse{
+ RebuiltShardIds: rebuiltShardIds,
+ }, nil
+}
+
+// VolumeEcShardsCopy copy the .ecx and some ec data slices
+func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_server_pb.VolumeEcShardsCopyRequest) (*volume_server_pb.VolumeEcShardsCopyResponse, error) {
+
+ location := vs.store.FindFreeLocation()
+ if location == nil {
+ return nil, fmt.Errorf("no space left")
+ }
+
+ baseFileName := storage.VolumeFileName(location.Directory, req.Collection, int(req.VolumeId))
+
+ err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+
+ // copy ec data slices
+ for _, shardId := range req.ShardIds {
+ if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, erasure_coding.ToExt(int(shardId)), false); err != nil {
+ return err
+ }
+ }
+
+ if !req.CopyEcxFile {
+ return nil
+ }
+
+ // copy ecx file
+ if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecx", false); err != nil {
+ return err
+ }
+
+ // copy ecj file
+ if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecj", true); err != nil {
+ return err
+ }
+
+ return nil
+ })
+ if err != nil {
+ return nil, fmt.Errorf("VolumeEcShardsCopy volume %d: %v", req.VolumeId, err)
+ }
+
+ return &volume_server_pb.VolumeEcShardsCopyResponse{}, nil
+}
+
+// VolumeEcShardsDelete local delete the .ecx and some ec data slices if not needed
+// the shard should not be mounted before calling this.
+func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_server_pb.VolumeEcShardsDeleteRequest) (*volume_server_pb.VolumeEcShardsDeleteResponse, error) {
+
+ baseFilename := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId))
+
+ found := false
+ for _, location := range vs.store.Locations {
+ if util.FileExists(path.Join(location.Directory, baseFilename+".ecx")) {
+ found = true
+ baseFilename = path.Join(location.Directory, baseFilename)
+ for _, shardId := range req.ShardIds {
+ os.Remove(baseFilename + erasure_coding.ToExt(int(shardId)))
+ }
+ break
+ }
+ }
+
+ if !found {
+ return nil, nil
+ }
+
+ // check whether to delete the ecx file also
+ hasEcxFile := false
+ existingShardCount := 0
+
+ for _, location := range vs.store.Locations {
+ fileInfos, err := ioutil.ReadDir(location.Directory)
+ if err != nil {
+ continue
+ }
+ for _, fileInfo := range fileInfos {
+ if fileInfo.Name() == baseFilename+".ecx" {
+ hasEcxFile = true
+ continue
+ }
+ if strings.HasPrefix(fileInfo.Name(), baseFilename+".ec") {
+ existingShardCount++
+ }
+ }
+ }
+
+ if hasEcxFile && existingShardCount == 0 {
+ if err := os.Remove(baseFilename + ".ecx"); err != nil {
+ return nil, err
+ }
+ if err := os.Remove(baseFilename + ".ecj"); err != nil {
+ return nil, err
+ }
+ }
+
+ return &volume_server_pb.VolumeEcShardsDeleteResponse{}, nil
+}
+
+func (vs *VolumeServer) VolumeEcShardsMount(ctx context.Context, req *volume_server_pb.VolumeEcShardsMountRequest) (*volume_server_pb.VolumeEcShardsMountResponse, error) {
+
+ for _, shardId := range req.ShardIds {
+ err := vs.store.MountEcShards(req.Collection, needle.VolumeId(req.VolumeId), erasure_coding.ShardId(shardId))
+
+ if err != nil {
+ glog.Errorf("ec shard mount %v: %v", req, err)
+ } else {
+ glog.V(2).Infof("ec shard mount %v", req)
+ }
+
+ if err != nil {
+ return nil, fmt.Errorf("mount %d.%d: %v", req.VolumeId, shardId, err)
+ }
+ }
+
+ return &volume_server_pb.VolumeEcShardsMountResponse{}, nil
+}
+
+func (vs *VolumeServer) VolumeEcShardsUnmount(ctx context.Context, req *volume_server_pb.VolumeEcShardsUnmountRequest) (*volume_server_pb.VolumeEcShardsUnmountResponse, error) {
+
+ for _, shardId := range req.ShardIds {
+ err := vs.store.UnmountEcShards(needle.VolumeId(req.VolumeId), erasure_coding.ShardId(shardId))
+
+ if err != nil {
+ glog.Errorf("ec shard unmount %v: %v", req, err)
+ } else {
+ glog.V(2).Infof("ec shard unmount %v", req)
+ }
+
+ if err != nil {
+ return nil, fmt.Errorf("unmount %d.%d: %v", req.VolumeId, shardId, err)
+ }
+ }
+
+ return &volume_server_pb.VolumeEcShardsUnmountResponse{}, nil
+}
+
+func (vs *VolumeServer) VolumeEcShardRead(req *volume_server_pb.VolumeEcShardReadRequest, stream volume_server_pb.VolumeServer_VolumeEcShardReadServer) error {
+
+ ecVolume, found := vs.store.FindEcVolume(needle.VolumeId(req.VolumeId))
+ if !found {
+ return fmt.Errorf("VolumeEcShardRead not found ec volume id %d", req.VolumeId)
+ }
+ ecShard, found := ecVolume.FindEcVolumeShard(erasure_coding.ShardId(req.ShardId))
+ if !found {
+ return fmt.Errorf("not found ec shard %d.%d", req.VolumeId, req.ShardId)
+ }
+
+ if req.FileKey != 0 {
+ _, size, _ := ecVolume.FindNeedleFromEcx(types.Uint64ToNeedleId(req.FileKey))
+ if size == types.TombstoneFileSize {
+ return stream.Send(&volume_server_pb.VolumeEcShardReadResponse{
+ IsDeleted: true,
+ })
+ }
+ }
+
+ bufSize := req.Size
+ if bufSize > BufferSizeLimit {
+ bufSize = BufferSizeLimit
+ }
+ buffer := make([]byte, bufSize)
+
+ startOffset, bytesToRead := req.Offset, req.Size
+
+ for bytesToRead > 0 {
+ bytesread, err := ecShard.ReadAt(buffer, startOffset)
+
+ // println(fileName, "read", bytesread, "bytes, with target", bytesToRead)
+ if bytesread > 0 {
+
+ if int64(bytesread) > bytesToRead {
+ bytesread = int(bytesToRead)
+ }
+ err = stream.Send(&volume_server_pb.VolumeEcShardReadResponse{
+ Data: buffer[:bytesread],
+ })
+ if err != nil {
+ // println("sending", bytesread, "bytes err", err.Error())
+ return err
+ }
+
+ bytesToRead -= int64(bytesread)
+
+ }
+
+ if err != nil {
+ if err != io.EOF {
+ return err
+ }
+ return nil
+ }
+
+ }
+
+ return nil
+
+}
+
+func (vs *VolumeServer) VolumeEcBlobDelete(ctx context.Context, req *volume_server_pb.VolumeEcBlobDeleteRequest) (*volume_server_pb.VolumeEcBlobDeleteResponse, error) {
+
+ resp := &volume_server_pb.VolumeEcBlobDeleteResponse{}
+
+ for _, location := range vs.store.Locations {
+ if localEcVolume, found := location.FindEcVolume(needle.VolumeId(req.VolumeId)); found {
+
+ _, size, _, err := localEcVolume.LocateEcShardNeedle(types.NeedleId(req.FileKey), needle.Version(req.Version))
+ if err != nil {
+ return nil, fmt.Errorf("locate in local ec volume: %v", err)
+ }
+ if size == types.TombstoneFileSize {
+ return resp, nil
+ }
+
+ err = localEcVolume.DeleteNeedleFromEcx(types.NeedleId(req.FileKey))
+ if err != nil {
+ return nil, err
+ }
+
+ break
+ }
+ }
+
+ return resp, nil
+}
diff --git a/weed/server/volume_grpc_sync.go b/weed/server/volume_grpc_sync.go
deleted file mode 100644
index 5f56ec17d..000000000
--- a/weed/server/volume_grpc_sync.go
+++ /dev/null
@@ -1,101 +0,0 @@
-package weed_server
-
-import (
- "context"
- "fmt"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
- "github.com/chrislusf/seaweedfs/weed/storage"
- "github.com/chrislusf/seaweedfs/weed/storage/types"
-)
-
-func (vs *VolumeServer) VolumeSyncStatus(ctx context.Context, req *volume_server_pb.VolumeSyncStatusRequest) (*volume_server_pb.VolumeSyncStatusResponse, error) {
-
- v := vs.store.GetVolume(storage.VolumeId(req.VolumdId))
- if v == nil {
- return nil, fmt.Errorf("Not Found Volume Id %d", req.VolumdId)
- }
-
- resp := v.GetVolumeSyncStatus()
-
- glog.V(2).Infof("volume sync status %d", req.VolumdId)
-
- return resp, nil
-
-}
-
-func (vs *VolumeServer) VolumeSyncIndex(req *volume_server_pb.VolumeSyncIndexRequest, stream volume_server_pb.VolumeServer_VolumeSyncIndexServer) error {
-
- v := vs.store.GetVolume(storage.VolumeId(req.VolumdId))
- if v == nil {
- return fmt.Errorf("Not Found Volume Id %d", req.VolumdId)
- }
-
- content, err := v.IndexFileContent()
-
- if err != nil {
- glog.Errorf("sync volume %d index: %v", req.VolumdId, err)
- } else {
- glog.V(2).Infof("sync volume %d index", req.VolumdId)
- }
-
- const blockSizeLimit = 1024 * 1024 * 2
- for i := 0; i < len(content); i += blockSizeLimit {
- blockSize := len(content) - i
- if blockSize > blockSizeLimit {
- blockSize = blockSizeLimit
- }
- resp := &volume_server_pb.VolumeSyncIndexResponse{}
- resp.IndexFileContent = content[i : i+blockSize]
- stream.Send(resp)
- }
-
- return nil
-
-}
-
-func (vs *VolumeServer) VolumeSyncData(req *volume_server_pb.VolumeSyncDataRequest, stream volume_server_pb.VolumeServer_VolumeSyncDataServer) error {
-
- v := vs.store.GetVolume(storage.VolumeId(req.VolumdId))
- if v == nil {
- return fmt.Errorf("Not Found Volume Id %d", req.VolumdId)
- }
-
- if uint32(v.SuperBlock.CompactRevision) != req.Revision {
- return fmt.Errorf("Requested Volume Revision is %d, but current revision is %d", req.Revision, v.SuperBlock.CompactRevision)
- }
-
- content, err := storage.ReadNeedleBlob(v.DataFile(), int64(req.Offset)*types.NeedlePaddingSize, req.Size, v.Version())
- if err != nil {
- return fmt.Errorf("read offset:%d size:%d", req.Offset, req.Size)
- }
-
- id, err := types.ParseNeedleId(req.NeedleId)
- if err != nil {
- return fmt.Errorf("parsing needle id %s: %v", req.NeedleId, err)
- }
- n := new(storage.Needle)
- n.ParseNeedleHeader(content)
- if id != n.Id {
- return fmt.Errorf("Expected file entry id %d, but found %d", id, n.Id)
- }
-
- if err != nil {
- glog.Errorf("sync volume %d data: %v", req.VolumdId, err)
- }
-
- const blockSizeLimit = 1024 * 1024 * 2
- for i := 0; i < len(content); i += blockSizeLimit {
- blockSize := len(content) - i
- if blockSize > blockSizeLimit {
- blockSize = blockSizeLimit
- }
- resp := &volume_server_pb.VolumeSyncDataResponse{}
- resp.FileContent = content[i : i+blockSize]
- stream.Send(resp)
- }
-
- return nil
-
-}
diff --git a/weed/server/volume_grpc_tail.go b/weed/server/volume_grpc_tail.go
new file mode 100644
index 000000000..698bad5b8
--- /dev/null
+++ b/weed/server/volume_grpc_tail.go
@@ -0,0 +1,117 @@
+package weed_server
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+)
+
+func (vs *VolumeServer) VolumeTailSender(req *volume_server_pb.VolumeTailSenderRequest, stream volume_server_pb.VolumeServer_VolumeTailSenderServer) error {
+
+ v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
+ if v == nil {
+ return fmt.Errorf("not found volume id %d", req.VolumeId)
+ }
+
+ defer glog.V(1).Infof("tailing volume %d finished", v.Id)
+
+ lastTimestampNs := req.SinceNs
+ drainingSeconds := req.IdleTimeoutSeconds
+
+ for {
+ lastProcessedTimestampNs, err := sendNeedlesSince(stream, v, lastTimestampNs)
+ if err != nil {
+ glog.Infof("sendNeedlesSince: %v", err)
+ return fmt.Errorf("streamFollow: %v", err)
+ }
+ time.Sleep(2 * time.Second)
+
+ if req.IdleTimeoutSeconds == 0 {
+ lastTimestampNs = lastProcessedTimestampNs
+ continue
+ }
+ if lastProcessedTimestampNs == lastTimestampNs {
+ drainingSeconds--
+ if drainingSeconds <= 0 {
+ return nil
+ }
+ glog.V(1).Infof("tailing volume %d drains requests with %d seconds remaining", v.Id, drainingSeconds)
+ } else {
+ lastTimestampNs = lastProcessedTimestampNs
+ drainingSeconds = req.IdleTimeoutSeconds
+ glog.V(1).Infof("tailing volume %d resets draining wait time to %d seconds", v.Id, drainingSeconds)
+ }
+
+ }
+
+}
+
+func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeTailSenderServer, v *storage.Volume, lastTimestampNs uint64) (lastProcessedTimestampNs uint64, err error) {
+
+ foundOffset, isLastOne, err := v.BinarySearchByAppendAtNs(lastTimestampNs)
+ if err != nil {
+ return 0, fmt.Errorf("fail to locate by appendAtNs %d: %s", lastTimestampNs, err)
+ }
+
+ // log.Printf("reading ts %d offset %d isLast %v", lastTimestampNs, foundOffset, isLastOne)
+
+ if isLastOne {
+ // need to heart beat to the client to ensure the connection health
+ sendErr := stream.Send(&volume_server_pb.VolumeTailSenderResponse{IsLastChunk: true})
+ return lastTimestampNs, sendErr
+ }
+
+ err = storage.ScanVolumeFileNeedleFrom(v.Version(), v.DataFile(), foundOffset.ToAcutalOffset(), func(needleHeader, needleBody []byte, needleAppendAtNs uint64) error {
+
+ isLastChunk := false
+
+ // need to send body by chunks
+ for i := 0; i < len(needleBody); i += BufferSizeLimit {
+ stopOffset := i + BufferSizeLimit
+ if stopOffset >= len(needleBody) {
+ isLastChunk = true
+ stopOffset = len(needleBody)
+ }
+
+ sendErr := stream.Send(&volume_server_pb.VolumeTailSenderResponse{
+ NeedleHeader: needleHeader,
+ NeedleBody: needleBody[i:stopOffset],
+ IsLastChunk: isLastChunk,
+ })
+ if sendErr != nil {
+ return sendErr
+ }
+ }
+
+ lastProcessedTimestampNs = needleAppendAtNs
+ return nil
+
+ })
+
+ return
+
+}
+
+func (vs *VolumeServer) VolumeTailReceiver(ctx context.Context, req *volume_server_pb.VolumeTailReceiverRequest) (*volume_server_pb.VolumeTailReceiverResponse, error) {
+
+ resp := &volume_server_pb.VolumeTailReceiverResponse{}
+
+ v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
+ if v == nil {
+ return resp, fmt.Errorf("receiver not found volume id %d", req.VolumeId)
+ }
+
+ defer glog.V(1).Infof("receive tailing volume %d finished", v.Id)
+
+ return resp, operation.TailVolumeFromSource(req.SourceVolumeServer, vs.grpcDialOption, v.Id, req.SinceNs, int(req.IdleTimeoutSeconds), func(n *needle.Needle) error {
+ _, _, err := vs.store.Write(v.Id, n)
+ return err
+ })
+
+}
diff --git a/weed/server/volume_grpc_vacuum.go b/weed/server/volume_grpc_vacuum.go
index f0c87b582..24f982241 100644
--- a/weed/server/volume_grpc_vacuum.go
+++ b/weed/server/volume_grpc_vacuum.go
@@ -5,19 +5,19 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
- "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
)
func (vs *VolumeServer) VacuumVolumeCheck(ctx context.Context, req *volume_server_pb.VacuumVolumeCheckRequest) (*volume_server_pb.VacuumVolumeCheckResponse, error) {
resp := &volume_server_pb.VacuumVolumeCheckResponse{}
- garbageRatio, err := vs.store.CheckCompactVolume(storage.VolumeId(req.VolumdId))
+ garbageRatio, err := vs.store.CheckCompactVolume(needle.VolumeId(req.VolumeId))
resp.GarbageRatio = garbageRatio
if err != nil {
- glog.V(3).Infof("check volume %d: %v", req.VolumdId, err)
+ glog.V(3).Infof("check volume %d: %v", req.VolumeId, err)
}
return resp, err
@@ -28,12 +28,12 @@ func (vs *VolumeServer) VacuumVolumeCompact(ctx context.Context, req *volume_ser
resp := &volume_server_pb.VacuumVolumeCompactResponse{}
- err := vs.store.CompactVolume(storage.VolumeId(req.VolumdId), req.Preallocate)
+ err := vs.store.CompactVolume(needle.VolumeId(req.VolumeId), req.Preallocate, vs.compactionBytePerSecond)
if err != nil {
- glog.Errorf("compact volume %d: %v", req.VolumdId, err)
+ glog.Errorf("compact volume %d: %v", req.VolumeId, err)
} else {
- glog.V(1).Infof("compact volume %d", req.VolumdId)
+ glog.V(1).Infof("compact volume %d", req.VolumeId)
}
return resp, err
@@ -44,12 +44,12 @@ func (vs *VolumeServer) VacuumVolumeCommit(ctx context.Context, req *volume_serv
resp := &volume_server_pb.VacuumVolumeCommitResponse{}
- err := vs.store.CommitCompactVolume(storage.VolumeId(req.VolumdId))
+ err := vs.store.CommitCompactVolume(needle.VolumeId(req.VolumeId))
if err != nil {
- glog.Errorf("commit volume %d: %v", req.VolumdId, err)
+ glog.Errorf("commit volume %d: %v", req.VolumeId, err)
} else {
- glog.V(1).Infof("commit volume %d", req.VolumdId)
+ glog.V(1).Infof("commit volume %d", req.VolumeId)
}
return resp, err
@@ -60,12 +60,12 @@ func (vs *VolumeServer) VacuumVolumeCleanup(ctx context.Context, req *volume_ser
resp := &volume_server_pb.VacuumVolumeCleanupResponse{}
- err := vs.store.CommitCleanupVolume(storage.VolumeId(req.VolumdId))
+ err := vs.store.CommitCleanupVolume(needle.VolumeId(req.VolumeId))
if err != nil {
- glog.Errorf("cleanup volume %d: %v", req.VolumdId, err)
+ glog.Errorf("cleanup volume %d: %v", req.VolumeId, err)
} else {
- glog.V(1).Infof("cleanup volume %d", req.VolumdId)
+ glog.V(1).Infof("cleanup volume %d", req.VolumeId)
}
return resp, err
diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go
index 0914e81b0..6cf654738 100644
--- a/weed/server/volume_server.go
+++ b/weed/server/volume_server.go
@@ -1,25 +1,34 @@
package weed_server
import (
+ "fmt"
"net/http"
+ "github.com/chrislusf/seaweedfs/weed/stats"
+ "google.golang.org/grpc"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/spf13/viper"
)
type VolumeServer struct {
- MasterNodes []string
- currentMaster string
- pulseSeconds int
- dataCenter string
- rack string
- store *storage.Store
- guard *security.Guard
+ SeedMasterNodes []string
+ currentMaster string
+ pulseSeconds int
+ dataCenter string
+ rack string
+ store *storage.Store
+ guard *security.Guard
+ grpcDialOption grpc.DialOption
- needleMapKind storage.NeedleMapType
- FixJpgOrientation bool
- ReadRedirect bool
+ needleMapKind storage.NeedleMapType
+ FixJpgOrientation bool
+ ReadRedirect bool
+ compactionBytePerSecond int64
+ MetricsAddress string
+ MetricsIntervalSec int
}
func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
@@ -30,26 +39,44 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
dataCenter string, rack string,
whiteList []string,
fixJpgOrientation bool,
- readRedirect bool) *VolumeServer {
+ readRedirect bool,
+ compactionMBPerSecond int,
+) *VolumeServer {
+
+ v := viper.GetViper()
+ signingKey := v.GetString("jwt.signing.key")
+ v.SetDefault("jwt.signing.expires_after_seconds", 10)
+ expiresAfterSec := v.GetInt("jwt.signing.expires_after_seconds")
+ enableUiAccess := v.GetBool("access.ui")
+
+ readSigningKey := v.GetString("jwt.signing.read.key")
+ v.SetDefault("jwt.signing.read.expires_after_seconds", 60)
+ readExpiresAfterSec := v.GetInt("jwt.signing.read.expires_after_seconds")
+
vs := &VolumeServer{
- pulseSeconds: pulseSeconds,
- dataCenter: dataCenter,
- rack: rack,
- needleMapKind: needleMapKind,
- FixJpgOrientation: fixJpgOrientation,
- ReadRedirect: readRedirect,
+ pulseSeconds: pulseSeconds,
+ dataCenter: dataCenter,
+ rack: rack,
+ needleMapKind: needleMapKind,
+ FixJpgOrientation: fixJpgOrientation,
+ ReadRedirect: readRedirect,
+ grpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "volume"),
+ compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024,
}
- vs.MasterNodes = masterNodes
- vs.store = storage.NewStore(port, ip, publicUrl, folders, maxCounts, vs.needleMapKind)
+ vs.SeedMasterNodes = masterNodes
+ vs.store = storage.NewStore(vs.grpcDialOption, port, ip, publicUrl, folders, maxCounts, vs.needleMapKind)
- vs.guard = security.NewGuard(whiteList, "")
+ vs.guard = security.NewGuard(whiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec)
handleStaticResources(adminMux)
- adminMux.HandleFunc("/ui/index.html", vs.uiStatusHandler)
- adminMux.HandleFunc("/status", vs.guard.WhiteList(vs.statusHandler))
- adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler))
- adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler))
- adminMux.HandleFunc("/stats/disk", vs.guard.WhiteList(vs.statsDiskHandler))
+ if signingKey == "" || enableUiAccess {
+ // only expose the volume server details for safe environments
+ adminMux.HandleFunc("/ui/index.html", vs.uiStatusHandler)
+ adminMux.HandleFunc("/status", vs.guard.WhiteList(vs.statusHandler))
+ adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler))
+ adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler))
+ adminMux.HandleFunc("/stats/disk", vs.guard.WhiteList(vs.statsDiskHandler))
+ }
adminMux.HandleFunc("/", vs.privateStoreHandler)
if publicMux != adminMux {
// separated admin and public port
@@ -58,6 +85,11 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
}
go vs.heartbeat()
+ hostAddress := fmt.Sprintf("%s:%d", ip, port)
+ go stats.LoopPushingMetric("volumeServer", hostAddress, stats.VolumeServerGather,
+ func() (addr string, intervalSeconds int) {
+ return vs.MetricsAddress, vs.MetricsIntervalSec
+ })
return vs
}
@@ -67,7 +99,3 @@ func (vs *VolumeServer) Shutdown() {
vs.store.Close()
glog.V(0).Infoln("Shut down successfully!")
}
-
-func (vs *VolumeServer) jwt(fileId string) security.EncodedJwt {
- return security.GenJwt(vs.guard.SecretKey, fileId)
-}
diff --git a/weed/server/volume_server_handlers.go b/weed/server/volume_server_handlers.go
index 77b1274fd..14ad27d42 100644
--- a/weed/server/volume_server_handlers.go
+++ b/weed/server/volume_server_handlers.go
@@ -2,7 +2,10 @@ package weed_server
import (
"net/http"
+ "strings"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/stats"
)
@@ -45,3 +48,47 @@ func (vs *VolumeServer) publicReadOnlyHandler(w http.ResponseWriter, r *http.Req
vs.GetOrHeadHandler(w, r)
}
}
+
+func (vs *VolumeServer) maybeCheckJwtAuthorization(r *http.Request, vid, fid string, isWrite bool) bool {
+
+ var signingKey security.SigningKey
+
+ if isWrite {
+ if len(vs.guard.SigningKey) == 0 {
+ return true
+ } else {
+ signingKey = vs.guard.SigningKey
+ }
+ } else {
+ if len(vs.guard.ReadSigningKey) == 0 {
+ return true
+ } else {
+ signingKey = vs.guard.ReadSigningKey
+ }
+ }
+
+ tokenStr := security.GetJwt(r)
+ if tokenStr == "" {
+ glog.V(1).Infof("missing jwt from %s", r.RemoteAddr)
+ return false
+ }
+
+ token, err := security.DecodeJwt(signingKey, tokenStr)
+ if err != nil {
+ glog.V(1).Infof("jwt verification error from %s: %v", r.RemoteAddr, err)
+ return false
+ }
+ if !token.Valid {
+ glog.V(1).Infof("jwt invalid from %s: %v", r.RemoteAddr, tokenStr)
+ return false
+ }
+
+ if sc, ok := token.Claims.(*security.SeaweedFileIdClaims); ok {
+ if sepIndex := strings.LastIndex(fid, "_"); sepIndex > 0 {
+ fid = fid[:sepIndex]
+ }
+ return sc.Fid == vid+","+fid
+ }
+ glog.V(1).Infof("unexpected jwt from %s: %v", r.RemoteAddr, tokenStr)
+ return false
+}
diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go
index 92c728141..f30ffefaf 100644
--- a/weed/server/volume_server_handlers_read.go
+++ b/weed/server/volume_server_handlers_read.go
@@ -2,6 +2,8 @@ package weed_server
import (
"bytes"
+ "context"
+ "errors"
"io"
"mime"
"mime/multipart"
@@ -17,16 +19,28 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/images"
"github.com/chrislusf/seaweedfs/weed/operation"
- "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/stats"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
)
var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"")
func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) {
- n := new(storage.Needle)
+
+ stats.VolumeServerRequestCounter.WithLabelValues("get").Inc()
+ start := time.Now()
+ defer func() { stats.VolumeServerRequestHistogram.WithLabelValues("get").Observe(time.Since(start).Seconds()) }()
+
+ n := new(needle.Needle)
vid, fid, filename, ext, _ := parseURLPath(r.URL.Path)
- volumeId, err := storage.NewVolumeId(vid)
+
+ if !vs.maybeCheckJwtAuthorization(r, vid, fid, false) {
+ writeJsonError(w, r, http.StatusUnauthorized, errors.New("wrong jwt"))
+ return
+ }
+
+ volumeId, err := needle.NewVolumeId(vid)
if err != nil {
glog.V(2).Infoln("parsing error:", err, r.URL.Path)
w.WriteHeader(http.StatusBadRequest)
@@ -40,7 +54,9 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
}
glog.V(4).Infoln("volume", volumeId, "reading", n)
- if !vs.store.HasVolume(volumeId) {
+ hasVolume := vs.store.HasVolume(volumeId)
+ _, hasEcVolume := vs.store.FindEcVolume(volumeId)
+ if !hasVolume && !hasEcVolume {
if !vs.ReadRedirect {
glog.V(2).Infoln("volume is not local:", err, r.URL.Path)
w.WriteHeader(http.StatusNotFound)
@@ -65,10 +81,15 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
return
}
cookie := n.Cookie
- count, e := vs.store.ReadVolumeNeedle(volumeId, n)
- glog.V(4).Infoln("read bytes", count, "error", e)
- if e != nil || count < 0 {
- glog.V(0).Infof("read %s error: %v", r.URL.Path, e)
+ var count int
+ if hasVolume {
+ count, err = vs.store.ReadVolumeNeedle(volumeId, n)
+ } else if hasEcVolume {
+ count, err = vs.store.ReadEcShardNeedle(context.Background(), volumeId, n)
+ }
+ glog.V(4).Infoln("read bytes", count, "error", err)
+ if err != nil || count < 0 {
+ glog.V(0).Infof("read %s isNormalVolume %v error: %v", r.URL.Path, hasVolume, err)
w.WriteHeader(http.StatusNotFound)
return
}
@@ -132,7 +153,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") {
w.Header().Set("Content-Encoding", "gzip")
} else {
- if n.Data, err = operation.UnGzipData(n.Data); err != nil {
+ if n.Data, err = util.UnGzipData(n.Data); err != nil {
glog.V(0).Infoln("ungzip error:", err, r.URL.Path)
}
}
@@ -146,7 +167,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
}
}
-func (vs *VolumeServer) tryHandleChunkedFile(n *storage.Needle, fileName string, w http.ResponseWriter, r *http.Request) (processed bool) {
+func (vs *VolumeServer) tryHandleChunkedFile(n *needle.Needle, fileName string, w http.ResponseWriter, r *http.Request) (processed bool) {
if !n.IsChunkedManifest() || r.URL.Query().Get("cm") == "false" {
return false
}
diff --git a/weed/server/volume_server_handlers_ui.go b/weed/server/volume_server_handlers_ui.go
index b3d9a21fd..852f0b751 100644
--- a/weed/server/volume_server_handlers_ui.go
+++ b/weed/server/volume_server_handlers_ui.go
@@ -24,13 +24,15 @@ func (vs *VolumeServer) uiStatusHandler(w http.ResponseWriter, r *http.Request)
Version string
Masters []string
Volumes interface{}
+ EcVolumes interface{}
DiskStatuses interface{}
Stats interface{}
Counters *stats.ServerStats
}{
util.VERSION,
- vs.MasterNodes,
+ vs.SeedMasterNodes,
vs.store.Status(),
+ vs.store.EcVolumes(),
ds,
infos,
serverStats,
diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go
index fd93142e1..db8fcb555 100644
--- a/weed/server/volume_server_handlers_write.go
+++ b/weed/server/volume_server_handlers_write.go
@@ -1,6 +1,7 @@
package weed_server
import (
+ "context"
"errors"
"fmt"
"net/http"
@@ -10,36 +11,53 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
- "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/stats"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/topology"
)
func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
+
+ stats.VolumeServerRequestCounter.WithLabelValues("post").Inc()
+ start := time.Now()
+ defer func() {
+ stats.VolumeServerRequestHistogram.WithLabelValues("post").Observe(time.Since(start).Seconds())
+ }()
+
if e := r.ParseForm(); e != nil {
glog.V(0).Infoln("form parse error:", e)
writeJsonError(w, r, http.StatusBadRequest, e)
return
}
- vid, _, _, _, _ := parseURLPath(r.URL.Path)
- volumeId, ve := storage.NewVolumeId(vid)
+
+ vid, fid, _, _, _ := parseURLPath(r.URL.Path)
+ volumeId, ve := needle.NewVolumeId(vid)
if ve != nil {
glog.V(0).Infoln("NewVolumeId error:", ve)
writeJsonError(w, r, http.StatusBadRequest, ve)
return
}
- needle, originalSize, ne := storage.CreateNeedleFromRequest(r, vs.FixJpgOrientation)
+
+ if !vs.maybeCheckJwtAuthorization(r, vid, fid, true) {
+ writeJsonError(w, r, http.StatusUnauthorized, errors.New("wrong jwt"))
+ return
+ }
+
+ needle, originalSize, ne := needle.CreateNeedleFromRequest(r, vs.FixJpgOrientation)
if ne != nil {
writeJsonError(w, r, http.StatusBadRequest, ne)
return
}
ret := operation.UploadResult{}
- _, errorStatus := topology.ReplicatedWrite(vs.GetMaster(),
- vs.store, volumeId, needle, r)
+ _, isUnchanged, writeError := topology.ReplicatedWrite(vs.GetMaster(), vs.store, volumeId, needle, r)
httpStatus := http.StatusCreated
- if errorStatus != "" {
+ if isUnchanged {
+ httpStatus = http.StatusNotModified
+ }
+ if writeError != nil {
httpStatus = http.StatusInternalServerError
- ret.Error = errorStatus
+ ret.Error = writeError.Error()
}
if needle.HasName() {
ret.Name = string(needle.Name)
@@ -51,15 +69,35 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
}
func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
- n := new(storage.Needle)
+
+ stats.VolumeServerRequestCounter.WithLabelValues("delete").Inc()
+ start := time.Now()
+ defer func() {
+ stats.VolumeServerRequestHistogram.WithLabelValues("delete").Observe(time.Since(start).Seconds())
+ }()
+
+ n := new(needle.Needle)
vid, fid, _, _, _ := parseURLPath(r.URL.Path)
- volumeId, _ := storage.NewVolumeId(vid)
+ volumeId, _ := needle.NewVolumeId(vid)
n.ParsePath(fid)
+ if !vs.maybeCheckJwtAuthorization(r, vid, fid, true) {
+ writeJsonError(w, r, http.StatusUnauthorized, errors.New("wrong jwt"))
+ return
+ }
+
// glog.V(2).Infof("volume %s deleting %s", vid, n)
cookie := n.Cookie
+ ecVolume, hasEcVolume := vs.store.FindEcVolume(volumeId)
+
+ if hasEcVolume {
+ count, err := vs.store.DeleteEcShardNeedle(context.Background(), ecVolume, n, cookie)
+ writeDeleteResult(err, count, w, r)
+ return
+ }
+
_, ok := vs.store.ReadVolumeNeedle(volumeId, n)
if ok != nil {
m := make(map[string]uint32)
@@ -83,7 +121,7 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
return
}
// make sure all chunks had deleted before delete manifest
- if e := chunkManifest.DeleteChunks(vs.GetMaster()); e != nil {
+ if e := chunkManifest.DeleteChunks(vs.GetMaster(), vs.grpcDialOption); e != nil {
writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("Delete chunks error: %v", e))
return
}
@@ -100,6 +138,11 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
_, err := topology.ReplicatedDelete(vs.GetMaster(), vs.store, volumeId, n, r)
+ writeDeleteResult(err, count, w, r)
+
+}
+
+func writeDeleteResult(err error, count int64, w http.ResponseWriter, r *http.Request) {
if err == nil {
m := make(map[string]int64)
m["size"] = count
@@ -107,7 +150,6 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
} else {
writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("Deletion Failed: %v", err))
}
-
}
func setEtag(w http.ResponseWriter, etag string) {
diff --git a/weed/server/volume_server_ui/templates.go b/weed/server/volume_server_ui/templates.go
index b9740510f..eafc0aaeb 100644
--- a/weed/server/volume_server_ui/templates.go
+++ b/weed/server/volume_server_ui/templates.go
@@ -128,6 +128,32 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC
</table>
</div>
+ <div class="row">
+ <h2>Erasure Coding Shards</h2>
+ <table class="table table-striped">
+ <thead>
+ <tr>
+ <th>Id</th>
+ <th>Collection</th>
+ <th>Shard Size</th>
+ <th>Shards</th>
+ <th>CreatedAt</th>
+ </tr>
+ </thead>
+ <tbody>
+ {{ range .EcVolumes }}
+ <tr>
+ <td><code>{{ .VolumeId }}</code></td>
+ <td>{{ .Collection }}</td>
+ <td>{{ .ShardSize }} Bytes</td>
+ <td>{{ .ShardIdList }}</td>
+ <td>{{ .CreatedAt.Format "02 Jan 06 15:04 -0700" }}</td>
+ </tr>
+ {{ end }}
+ </tbody>
+ </table>
+ </div>
+
</div>
</body>
</html>
diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go
new file mode 100644
index 000000000..151b48a78
--- /dev/null
+++ b/weed/server/webdav_server.go
@@ -0,0 +1,578 @@
+package weed_server
+
+import (
+ "bytes"
+ "context"
+ "fmt"
+ "io"
+ "os"
+ "path"
+ "strings"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "golang.org/x/net/webdav"
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/spf13/viper"
+)
+
+type WebDavOption struct {
+ Filer string
+ FilerGrpcAddress string
+ DomainName string
+ BucketsPath string
+ GrpcDialOption grpc.DialOption
+ Collection string
+ Uid uint32
+ Gid uint32
+}
+
+type WebDavServer struct {
+ option *WebDavOption
+ secret security.SigningKey
+ filer *filer2.Filer
+ grpcDialOption grpc.DialOption
+ Handler *webdav.Handler
+}
+
+func NewWebDavServer(option *WebDavOption) (ws *WebDavServer, err error) {
+
+ fs, _ := NewWebDavFileSystem(option)
+
+ ws = &WebDavServer{
+ option: option,
+ grpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "filer"),
+ Handler: &webdav.Handler{
+ FileSystem: fs,
+ LockSystem: webdav.NewMemLS(),
+ },
+ }
+
+ return ws, nil
+}
+
+// adapted from https://github.com/mattn/davfs/blob/master/plugin/mysql/mysql.go
+
+type WebDavFileSystem struct {
+ option *WebDavOption
+ secret security.SigningKey
+ filer *filer2.Filer
+ grpcDialOption grpc.DialOption
+}
+
+type FileInfo struct {
+ name string
+ size int64
+ mode os.FileMode
+ modifiledTime time.Time
+ isDirectory bool
+}
+
+func (fi *FileInfo) Name() string { return fi.name }
+func (fi *FileInfo) Size() int64 { return fi.size }
+func (fi *FileInfo) Mode() os.FileMode { return fi.mode }
+func (fi *FileInfo) ModTime() time.Time { return fi.modifiledTime }
+func (fi *FileInfo) IsDir() bool { return fi.isDirectory }
+func (fi *FileInfo) Sys() interface{} { return nil }
+
+type WebDavFile struct {
+ fs *WebDavFileSystem
+ name string
+ isDirectory bool
+ off int64
+ entry *filer_pb.Entry
+ entryViewCache []filer2.VisibleInterval
+}
+
+func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) {
+ return &WebDavFileSystem{
+ option: option,
+ }, nil
+}
+
+func (fs *WebDavFileSystem) WithFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error {
+
+ return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error {
+ client := filer_pb.NewSeaweedFilerClient(grpcConnection)
+ return fn(client)
+ }, fs.option.FilerGrpcAddress, fs.option.GrpcDialOption)
+
+}
+
+func clearName(name string) (string, error) {
+ slashed := strings.HasSuffix(name, "/")
+ name = path.Clean(name)
+ if !strings.HasSuffix(name, "/") && slashed {
+ name += "/"
+ }
+ if !strings.HasPrefix(name, "/") {
+ return "", os.ErrInvalid
+ }
+ return name, nil
+}
+
+func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm os.FileMode) error {
+
+ glog.V(2).Infof("WebDavFileSystem.Mkdir %v", fullDirPath)
+
+ if !strings.HasSuffix(fullDirPath, "/") {
+ fullDirPath += "/"
+ }
+
+ var err error
+ if fullDirPath, err = clearName(fullDirPath); err != nil {
+ return err
+ }
+
+ _, err = fs.stat(ctx, fullDirPath)
+ if err == nil {
+ return os.ErrExist
+ }
+
+ return fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
+ dir, name := filer2.FullPath(fullDirPath).DirAndName()
+ request := &filer_pb.CreateEntryRequest{
+ Directory: dir,
+ Entry: &filer_pb.Entry{
+ Name: name,
+ IsDirectory: true,
+ Attributes: &filer_pb.FuseAttributes{
+ Mtime: time.Now().Unix(),
+ Crtime: time.Now().Unix(),
+ FileMode: uint32(perm | os.ModeDir),
+ Uid: fs.option.Uid,
+ Gid: fs.option.Gid,
+ },
+ },
+ }
+
+ glog.V(1).Infof("mkdir: %v", request)
+ if _, err := client.CreateEntry(ctx, request); err != nil {
+ return fmt.Errorf("mkdir %s/%s: %v", dir, name, err)
+ }
+
+ return nil
+ })
+}
+
+func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, flag int, perm os.FileMode) (webdav.File, error) {
+
+ glog.V(2).Infof("WebDavFileSystem.OpenFile %v", fullFilePath)
+
+ var err error
+ if fullFilePath, err = clearName(fullFilePath); err != nil {
+ return nil, err
+ }
+
+ if flag&os.O_CREATE != 0 {
+ // file should not have / suffix.
+ if strings.HasSuffix(fullFilePath, "/") {
+ return nil, os.ErrInvalid
+ }
+ // based directory should be exists.
+ dir, _ := path.Split(fullFilePath)
+ _, err := fs.stat(ctx, dir)
+ if err != nil {
+ return nil, os.ErrInvalid
+ }
+ _, err = fs.stat(ctx, fullFilePath)
+ if err == nil {
+ if flag&os.O_EXCL != 0 {
+ return nil, os.ErrExist
+ }
+ fs.removeAll(ctx, fullFilePath)
+ }
+
+ dir, name := filer2.FullPath(fullFilePath).DirAndName()
+ err = fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
+ if _, err := client.CreateEntry(ctx, &filer_pb.CreateEntryRequest{
+ Directory: dir,
+ Entry: &filer_pb.Entry{
+ Name: name,
+ IsDirectory: perm&os.ModeDir > 0,
+ Attributes: &filer_pb.FuseAttributes{
+ Mtime: time.Now().Unix(),
+ Crtime: time.Now().Unix(),
+ FileMode: uint32(perm),
+ Uid: fs.option.Uid,
+ Gid: fs.option.Gid,
+ Collection: fs.option.Collection,
+ Replication: "000",
+ TtlSec: 0,
+ },
+ },
+ }); err != nil {
+ return fmt.Errorf("create %s: %v", fullFilePath, err)
+ }
+ return nil
+ })
+ if err != nil {
+ return nil, err
+ }
+ return &WebDavFile{
+ fs: fs,
+ name: fullFilePath,
+ isDirectory: false,
+ }, nil
+ }
+
+ fi, err := fs.stat(ctx, fullFilePath)
+ if err != nil {
+ return nil, os.ErrNotExist
+ }
+ if !strings.HasSuffix(fullFilePath, "/") && fi.IsDir() {
+ fullFilePath += "/"
+ }
+
+ return &WebDavFile{
+ fs: fs,
+ name: fullFilePath,
+ isDirectory: false,
+ }, nil
+
+}
+
+func (fs *WebDavFileSystem) removeAll(ctx context.Context, fullFilePath string) error {
+ var err error
+ if fullFilePath, err = clearName(fullFilePath); err != nil {
+ return err
+ }
+
+ fi, err := fs.stat(ctx, fullFilePath)
+ if err != nil {
+ return err
+ }
+
+ if fi.IsDir() {
+ //_, err = fs.db.Exec(`delete from filesystem where fullFilePath like $1 escape '\'`, strings.Replace(fullFilePath, `%`, `\%`, -1)+`%`)
+ } else {
+ //_, err = fs.db.Exec(`delete from filesystem where fullFilePath = ?`, fullFilePath)
+ }
+ dir, name := filer2.FullPath(fullFilePath).DirAndName()
+ err = fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
+
+ request := &filer_pb.DeleteEntryRequest{
+ Directory: dir,
+ Name: name,
+ IsDeleteData: true,
+ }
+
+ glog.V(3).Infof("removing entry: %v", request)
+ _, err := client.DeleteEntry(ctx, request)
+ if err != nil {
+ return fmt.Errorf("remove %s: %v", fullFilePath, err)
+ }
+
+ return nil
+ })
+ return err
+}
+
+func (fs *WebDavFileSystem) RemoveAll(ctx context.Context, name string) error {
+
+ glog.V(2).Infof("WebDavFileSystem.RemoveAll %v", name)
+
+ return fs.removeAll(ctx, name)
+}
+
+func (fs *WebDavFileSystem) Rename(ctx context.Context, oldName, newName string) error {
+
+ glog.V(2).Infof("WebDavFileSystem.Rename %v to %v", oldName, newName)
+
+ var err error
+ if oldName, err = clearName(oldName); err != nil {
+ return err
+ }
+ if newName, err = clearName(newName); err != nil {
+ return err
+ }
+
+ of, err := fs.stat(ctx, oldName)
+ if err != nil {
+ return os.ErrExist
+ }
+ if of.IsDir() {
+ if strings.HasSuffix(oldName, "/") {
+ oldName = strings.TrimRight(oldName, "/")
+ }
+ if strings.HasSuffix(newName, "/") {
+ newName = strings.TrimRight(newName, "/")
+ }
+ }
+
+ _, err = fs.stat(ctx, newName)
+ if err == nil {
+ return os.ErrExist
+ }
+
+ oldDir, oldBaseName := filer2.FullPath(oldName).DirAndName()
+ newDir, newBaseName := filer2.FullPath(newName).DirAndName()
+
+ return fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
+
+ request := &filer_pb.AtomicRenameEntryRequest{
+ OldDirectory: oldDir,
+ OldName: oldBaseName,
+ NewDirectory: newDir,
+ NewName: newBaseName,
+ }
+
+ _, err := client.AtomicRenameEntry(ctx, request)
+ if err != nil {
+ return fmt.Errorf("renaming %s/%s => %s/%s: %v", oldDir, oldBaseName, newDir, newBaseName, err)
+ }
+
+ return nil
+
+ })
+}
+
+func (fs *WebDavFileSystem) stat(ctx context.Context, fullFilePath string) (os.FileInfo, error) {
+ var err error
+ if fullFilePath, err = clearName(fullFilePath); err != nil {
+ return nil, err
+ }
+
+ var fi FileInfo
+ entry, err := filer2.GetEntry(ctx, fs, fullFilePath)
+ if entry == nil {
+ return nil, os.ErrNotExist
+ }
+ if err != nil {
+ return nil, err
+ }
+ fi.size = int64(filer2.TotalSize(entry.GetChunks()))
+ fi.name = fullFilePath
+ fi.mode = os.FileMode(entry.Attributes.FileMode)
+ fi.modifiledTime = time.Unix(entry.Attributes.Mtime, 0)
+ fi.isDirectory = entry.IsDirectory
+
+ _, fi.name = path.Split(path.Clean(fi.name))
+ if fi.name == "" {
+ fi.name = "/"
+ fi.modifiledTime = time.Now()
+ fi.isDirectory = true
+ }
+ return &fi, nil
+}
+
+func (fs *WebDavFileSystem) Stat(ctx context.Context, name string) (os.FileInfo, error) {
+
+ glog.V(2).Infof("WebDavFileSystem.Stat %v", name)
+
+ return fs.stat(ctx, name)
+}
+
+func (f *WebDavFile) Write(buf []byte) (int, error) {
+
+ glog.V(2).Infof("WebDavFileSystem.Write %v", f.name)
+
+ var err error
+ ctx := context.Background()
+ if f.entry == nil {
+ f.entry, err = filer2.GetEntry(ctx, f.fs, f.name)
+ }
+
+ if f.entry == nil {
+ return 0, err
+ }
+ if err != nil {
+ return 0, err
+ }
+
+ var fileId, host string
+ var auth security.EncodedJwt
+
+ if err = f.fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
+
+ request := &filer_pb.AssignVolumeRequest{
+ Count: 1,
+ Replication: "000",
+ Collection: f.fs.option.Collection,
+ }
+
+ resp, err := client.AssignVolume(ctx, request)
+ if err != nil {
+ glog.V(0).Infof("assign volume failure %v: %v", request, err)
+ return err
+ }
+
+ fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
+
+ return nil
+ }); err != nil {
+ return 0, fmt.Errorf("filerGrpcAddress assign volume: %v", err)
+ }
+
+ fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
+ bufReader := bytes.NewReader(buf)
+ uploadResult, err := operation.Upload(fileUrl, f.name, bufReader, false, "application/octet-stream", nil, auth)
+ if err != nil {
+ glog.V(0).Infof("upload data %v to %s: %v", f.name, fileUrl, err)
+ return 0, fmt.Errorf("upload data: %v", err)
+ }
+ if uploadResult.Error != "" {
+ glog.V(0).Infof("upload failure %v to %s: %v", f.name, fileUrl, err)
+ return 0, fmt.Errorf("upload result: %v", uploadResult.Error)
+ }
+
+ chunk := &filer_pb.FileChunk{
+ FileId: fileId,
+ Offset: f.off,
+ Size: uint64(len(buf)),
+ Mtime: time.Now().UnixNano(),
+ ETag: uploadResult.ETag,
+ }
+
+ f.entry.Chunks = append(f.entry.Chunks, chunk)
+ dir, _ := filer2.FullPath(f.name).DirAndName()
+
+ err = f.fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
+ f.entry.Attributes.Mtime = time.Now().Unix()
+
+ request := &filer_pb.UpdateEntryRequest{
+ Directory: dir,
+ Entry: f.entry,
+ }
+
+ if _, err := client.UpdateEntry(ctx, request); err != nil {
+ return fmt.Errorf("update %s: %v", f.name, err)
+ }
+
+ return nil
+ })
+
+ if err != nil {
+ f.off += int64(len(buf))
+ }
+ return len(buf), err
+}
+
+func (f *WebDavFile) Close() error {
+
+ glog.V(2).Infof("WebDavFileSystem.Close %v", f.name)
+
+ if f.entry != nil {
+ f.entry = nil
+ f.entryViewCache = nil
+ }
+
+ return nil
+}
+
+func (f *WebDavFile) Read(p []byte) (readSize int, err error) {
+
+ glog.V(2).Infof("WebDavFileSystem.Read %v", f.name)
+ ctx := context.Background()
+
+ if f.entry == nil {
+ f.entry, err = filer2.GetEntry(ctx, f.fs, f.name)
+ }
+ if f.entry == nil {
+ return 0, err
+ }
+ if err != nil {
+ return 0, err
+ }
+ if len(f.entry.Chunks) == 0 {
+ return 0, io.EOF
+ }
+ if f.entryViewCache == nil {
+ f.entryViewCache = filer2.NonOverlappingVisibleIntervals(f.entry.Chunks)
+ }
+ chunkViews := filer2.ViewFromVisibleIntervals(f.entryViewCache, f.off, len(p))
+
+ totalRead, err := filer2.ReadIntoBuffer(ctx, f.fs, f.name, p, chunkViews, f.off)
+ if err != nil {
+ return 0, err
+ }
+ readSize = int(totalRead)
+
+ f.off += totalRead
+ if readSize == 0 {
+ return 0, io.EOF
+ }
+ return
+}
+
+func (f *WebDavFile) Readdir(count int) (ret []os.FileInfo, err error) {
+
+ glog.V(2).Infof("WebDavFileSystem.Readdir %v count %d", f.name, count)
+ ctx := context.Background()
+
+ dir := f.name
+ if dir != "/" && strings.HasSuffix(dir, "/") {
+ dir = dir[:len(dir)-1]
+ }
+
+ err = filer2.ReadDirAllEntries(ctx, f.fs, dir, func(entry *filer_pb.Entry) {
+ fi := FileInfo{
+ size: int64(filer2.TotalSize(entry.GetChunks())),
+ name: entry.Name,
+ mode: os.FileMode(entry.Attributes.FileMode),
+ modifiledTime: time.Unix(entry.Attributes.Mtime, 0),
+ isDirectory: entry.IsDirectory,
+ }
+
+ if !strings.HasSuffix(fi.name, "/") && fi.IsDir() {
+ fi.name += "/"
+ }
+ glog.V(4).Infof("entry: %v", fi.name)
+ ret = append(ret, &fi)
+ })
+
+ old := f.off
+ if old >= int64(len(ret)) {
+ if count > 0 {
+ return nil, io.EOF
+ }
+ return nil, nil
+ }
+ if count > 0 {
+ f.off += int64(count)
+ if f.off > int64(len(ret)) {
+ f.off = int64(len(ret))
+ }
+ } else {
+ f.off = int64(len(ret))
+ old = 0
+ }
+
+ return ret[old:f.off], nil
+}
+
+func (f *WebDavFile) Seek(offset int64, whence int) (int64, error) {
+
+ glog.V(2).Infof("WebDavFile.Seek %v %v %v", f.name, offset, whence)
+
+ ctx := context.Background()
+
+ var err error
+ switch whence {
+ case 0:
+ f.off = 0
+ case 2:
+ if fi, err := f.fs.stat(ctx, f.name); err != nil {
+ return 0, err
+ } else {
+ f.off = fi.Size()
+ }
+ }
+ f.off += offset
+ return f.off, err
+}
+
+func (f *WebDavFile) Stat() (os.FileInfo, error) {
+
+ glog.V(2).Infof("WebDavFile.Stat %v", f.name)
+
+ ctx := context.Background()
+
+ return f.fs.stat(ctx, f.name)
+}