diff options
Diffstat (limited to 'weed/server')
37 files changed, 2601 insertions, 504 deletions
diff --git a/weed/server/common.go b/weed/server/common.go index d88abfdc8..e02ab38a6 100644 --- a/weed/server/common.go +++ b/weed/server/common.go @@ -11,11 +11,12 @@ import ( "strings" "time" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" - "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/stats" - "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/util" _ "github.com/chrislusf/seaweedfs/weed/statik" @@ -82,8 +83,7 @@ func debug(params ...interface{}) { glog.V(4).Infoln(params...) } -func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl string) { - jwt := security.GetJwt(r) +func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl string, grpcDialOption grpc.DialOption) { m := make(map[string]interface{}) if r.Method != "POST" { writeJsonError(w, r, http.StatusMethodNotAllowed, errors.New("Only submit via POST!")) @@ -91,7 +91,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st } debug("parsing upload file...") - fname, data, mimeType, pairMap, isGzipped, originalDataSize, lastModified, _, _, pe := storage.ParseUpload(r) + fname, data, mimeType, pairMap, isGzipped, originalDataSize, lastModified, _, _, pe := needle.ParseUpload(r) if pe != nil { writeJsonError(w, r, http.StatusBadRequest, pe) return @@ -113,7 +113,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st Collection: r.FormValue("collection"), Ttl: r.FormValue("ttl"), } - assignResult, ae := operation.Assign(masterUrl, ar) + assignResult, ae := operation.Assign(masterUrl, grpcDialOption, ar) if ae != nil { writeJsonError(w, r, http.StatusInternalServerError, ae) return @@ -125,7 +125,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st } debug("upload file to store", url) - uploadResult, err := operation.Upload(url, fname, bytes.NewReader(data), isGzipped, mimeType, pairMap, jwt) + uploadResult, err := operation.Upload(url, fname, bytes.NewReader(data), isGzipped, mimeType, pairMap, assignResult.Auth) if err != nil { writeJsonError(w, r, http.StatusInternalServerError, err) return diff --git a/weed/server/common_test.go b/weed/server/common_test.go new file mode 100644 index 000000000..2e6c70bfe --- /dev/null +++ b/weed/server/common_test.go @@ -0,0 +1,31 @@ +package weed_server + +import ( + "strings" + "testing" +) + +func TestParseURL(t *testing.T) { + if vid, fid, _, _, _ := parseURLPath("/1,06dfa8a684"); true { + if vid != "1" { + t.Errorf("fail to parse vid: %s", vid) + } + if fid != "06dfa8a684" { + t.Errorf("fail to parse fid: %s", fid) + } + } + if vid, fid, _, _, _ := parseURLPath("/1,06dfa8a684_1"); true { + if vid != "1" { + t.Errorf("fail to parse vid: %s", vid) + } + if fid != "06dfa8a684_1" { + t.Errorf("fail to parse fid: %s", fid) + } + if sepIndex := strings.LastIndex(fid, "_"); sepIndex > 0 { + fid = fid[:sepIndex] + } + if fid != "06dfa8a684" { + t.Errorf("fail to parse fid: %s", fid) + } + } +} diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index 06589e3c6..8eea2441e 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -14,12 +14,11 @@ import ( "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" - "github.com/chrislusf/seaweedfs/weed/util" ) func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.LookupDirectoryEntryRequest) (*filer_pb.LookupDirectoryEntryResponse, error) { - entry, err := fs.filer.FindEntry(filer2.FullPath(filepath.Join(req.Directory, req.Name))) + entry, err := fs.filer.FindEntry(ctx, filer2.FullPath(filepath.ToSlash(filepath.Join(req.Directory, req.Name)))) if err != nil { return nil, fmt.Errorf("%s not found under %s: %v", req.Name, req.Directory, err) } @@ -45,7 +44,7 @@ func (fs *FilerServer) ListEntries(ctx context.Context, req *filer_pb.ListEntrie lastFileName := req.StartFromFileName includeLastFile := req.InclusiveStartFrom for limit > 0 { - entries, err := fs.filer.ListDirectoryEntries(filer2.FullPath(req.Directory), lastFileName, includeLastFile, 1024) + entries, err := fs.filer.ListDirectoryEntries(ctx, filer2.FullPath(req.Directory), lastFileName, includeLastFile, 1024) if err != nil { return nil, err } @@ -112,22 +111,21 @@ func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVol func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntryRequest) (resp *filer_pb.CreateEntryResponse, err error) { - fullpath := filer2.FullPath(filepath.Join(req.Directory, req.Entry.Name)) + fullpath := filer2.FullPath(filepath.ToSlash(filepath.Join(req.Directory, req.Entry.Name))) chunks, garbages := filer2.CompactFileChunks(req.Entry.Chunks) - fs.filer.DeleteChunks(garbages) - if req.Entry.Attributes == nil { return nil, fmt.Errorf("can not create entry with empty attributes") } - err = fs.filer.CreateEntry(&filer2.Entry{ + err = fs.filer.CreateEntry(ctx, &filer2.Entry{ FullPath: fullpath, Attr: filer2.PbToEntryAttribute(req.Entry.Attributes), Chunks: chunks, }) if err == nil { + fs.filer.DeleteChunks(fullpath, garbages) } return &filer_pb.CreateEntryResponse{}, err @@ -135,19 +133,19 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntryRequest) (*filer_pb.UpdateEntryResponse, error) { - fullpath := filepath.Join(req.Directory, req.Entry.Name) - entry, err := fs.filer.FindEntry(filer2.FullPath(fullpath)) + fullpath := filepath.ToSlash(filepath.Join(req.Directory, req.Entry.Name)) + entry, err := fs.filer.FindEntry(ctx, filer2.FullPath(fullpath)) if err != nil { return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("not found %s: %v", fullpath, err) } // remove old chunks if not included in the new ones - unusedChunks := filer2.FindUnusedFileChunks(entry.Chunks, req.Entry.Chunks) + unusedChunks := filer2.MinusChunks(entry.Chunks, req.Entry.Chunks) chunks, garbages := filer2.CompactFileChunks(req.Entry.Chunks) newEntry := &filer2.Entry{ - FullPath: filer2.FullPath(filepath.Join(req.Directory, req.Entry.Name)), + FullPath: filer2.FullPath(filepath.ToSlash(filepath.Join(req.Directory, req.Entry.Name))), Attr: entry.Attr, Chunks: chunks, } @@ -175,9 +173,9 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr return &filer_pb.UpdateEntryResponse{}, err } - if err = fs.filer.UpdateEntry(entry, newEntry); err == nil { - fs.filer.DeleteChunks(unusedChunks) - fs.filer.DeleteChunks(garbages) + if err = fs.filer.UpdateEntry(ctx, entry, newEntry); err == nil { + fs.filer.DeleteChunks(entry.FullPath, unusedChunks) + fs.filer.DeleteChunks(entry.FullPath, garbages) } fs.filer.NotifyUpdateEvent(entry, newEntry, true) @@ -186,7 +184,7 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr } func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntryRequest) (resp *filer_pb.DeleteEntryResponse, err error) { - err = fs.filer.DeleteEntryMetaAndData(filer2.FullPath(filepath.Join(req.Directory, req.Name)), req.IsRecursive, req.IsDeleteData) + err = fs.filer.DeleteEntryMetaAndData(ctx, filer2.FullPath(filepath.ToSlash(filepath.Join(req.Directory, req.Name))), req.IsRecursive, req.IsDeleteData) return &filer_pb.DeleteEntryResponse{}, err } @@ -220,7 +218,7 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol DataCenter: "", } } - assignResult, err := operation.Assign(fs.filer.GetMaster(), assignRequest, altRequest) + assignResult, err := operation.Assign(fs.filer.GetMaster(), fs.grpcDialOption, assignRequest, altRequest) if err != nil { return nil, fmt.Errorf("assign volume: %v", err) } @@ -233,14 +231,18 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol Count: int32(assignResult.Count), Url: assignResult.Url, PublicUrl: assignResult.PublicUrl, + Auth: string(assignResult.Auth), }, err } func (fs *FilerServer) DeleteCollection(ctx context.Context, req *filer_pb.DeleteCollectionRequest) (resp *filer_pb.DeleteCollectionResponse, err error) { - for _, master := range fs.option.Masters { - _, err = util.Get(fmt.Sprintf("http://%s/col/delete?collection=%s", master, req.Collection)) - } + err = fs.filer.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { + _, err := client.CollectionDelete(ctx, &master_pb.CollectionDeleteRequest{ + Name: req.GetCollection(), + }) + return err + }) return &filer_pb.DeleteCollectionResponse{}, err } @@ -253,7 +255,7 @@ func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsR Ttl: req.Ttl, } - output, err := operation.Statistics(fs.filer.GetMaster(), input) + output, err := operation.Statistics(fs.filer.GetMaster(), fs.grpcDialOption, input) if err != nil { return nil, err } @@ -264,3 +266,13 @@ func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsR FileCount: output.FileCount, }, nil } + +func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb.GetFilerConfigurationRequest) (resp *filer_pb.GetFilerConfigurationResponse, err error) { + + return &filer_pb.GetFilerConfigurationResponse{ + Masters: fs.option.Masters, + Collection: fs.option.Collection, + Replication: fs.option.DefaultReplication, + MaxMb: uint32(fs.option.MaxMB), + }, nil +} diff --git a/weed/server/filer_grpc_server_rename.go b/weed/server/filer_grpc_server_rename.go new file mode 100644 index 000000000..7142f7606 --- /dev/null +++ b/weed/server/filer_grpc_server_rename.go @@ -0,0 +1,130 @@ +package weed_server + +import ( + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "path/filepath" +) + +func (fs *FilerServer) AtomicRenameEntry(ctx context.Context, req *filer_pb.AtomicRenameEntryRequest) (*filer_pb.AtomicRenameEntryResponse, error) { + + glog.V(1).Infof("AtomicRenameEntry %v", req) + + ctx, err := fs.filer.BeginTransaction(ctx) + if err != nil { + return nil, err + } + + oldParent := filer2.FullPath(filepath.ToSlash(req.OldDirectory)) + + oldEntry, err := fs.filer.FindEntry(ctx, oldParent.Child(req.OldName)) + if err != nil { + fs.filer.RollbackTransaction(ctx) + return nil, fmt.Errorf("%s/%s not found: %v", req.OldDirectory, req.OldName, err) + } + + var events MoveEvents + moveErr := fs.moveEntry(ctx, oldParent, oldEntry, filer2.FullPath(filepath.ToSlash(req.NewDirectory)), req.NewName, &events) + if moveErr != nil { + fs.filer.RollbackTransaction(ctx) + return nil, fmt.Errorf("%s/%s move error: %v", req.OldDirectory, req.OldName, err) + } else { + if commitError := fs.filer.CommitTransaction(ctx); commitError != nil { + fs.filer.RollbackTransaction(ctx) + return nil, fmt.Errorf("%s/%s move commit error: %v", req.OldDirectory, req.OldName, err) + } + } + + for _, entry := range events.newEntries { + fs.filer.NotifyUpdateEvent(nil, entry, false) + } + for _, entry := range events.oldEntries { + fs.filer.NotifyUpdateEvent(entry, nil, false) + } + + return &filer_pb.AtomicRenameEntryResponse{}, nil +} + +func (fs *FilerServer) moveEntry(ctx context.Context, oldParent filer2.FullPath, entry *filer2.Entry, newParent filer2.FullPath, newName string, events *MoveEvents) error { + if entry.IsDirectory() { + if err := fs.moveFolderSubEntries(ctx, oldParent, entry, newParent, newName, events); err != nil { + return err + } + } + return fs.moveSelfEntry(ctx, oldParent, entry, newParent, newName, events) +} + +func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent filer2.FullPath, entry *filer2.Entry, newParent filer2.FullPath, newName string, events *MoveEvents) error { + + currentDirPath := oldParent.Child(entry.Name()) + newDirPath := newParent.Child(newName) + + glog.V(1).Infof("moving folder %s => %s", currentDirPath, newDirPath) + + lastFileName := "" + includeLastFile := false + for { + + entries, err := fs.filer.ListDirectoryEntries(ctx, currentDirPath, lastFileName, includeLastFile, 1024) + if err != nil { + return err + } + + println("found", len(entries), "entries under", currentDirPath) + + for _, item := range entries { + lastFileName = item.Name() + println("processing", lastFileName) + err := fs.moveEntry(ctx, currentDirPath, item, newDirPath, item.Name(), events) + if err != nil { + return err + } + } + if len(entries) < 1024 { + break + } + } + return nil +} + +func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent filer2.FullPath, entry *filer2.Entry, newParent filer2.FullPath, newName string, events *MoveEvents) error { + + oldPath, newPath := oldParent.Child(entry.Name()), newParent.Child(newName) + + glog.V(1).Infof("moving entry %s => %s", oldPath, newPath) + + if oldPath == newPath { + glog.V(1).Infof("skip moving entry %s => %s", oldPath, newPath) + return nil + } + + // add to new directory + newEntry := &filer2.Entry{ + FullPath: newPath, + Attr: entry.Attr, + Chunks: entry.Chunks, + } + createErr := fs.filer.CreateEntry(ctx, newEntry) + if createErr != nil { + return createErr + } + + // delete old entry + deleteErr := fs.filer.DeleteEntryMetaAndData(ctx, oldPath, false, false) + if deleteErr != nil { + return deleteErr + } + + events.oldEntries = append(events.oldEntries, entry) + events.newEntries = append(events.newEntries, newEntry) + return nil + +} + +type MoveEvents struct { + oldEntries []*filer2.Entry + newEntries []*filer2.Entry +} diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 9d70e4dac..b9e6aa23d 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -1,12 +1,22 @@ package weed_server import ( + "context" + "fmt" "net/http" "os" + "time" + + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/stats" + "github.com/chrislusf/seaweedfs/weed/util" + "google.golang.org/grpc" "github.com/chrislusf/seaweedfs/weed/filer2" _ "github.com/chrislusf/seaweedfs/weed/filer2/cassandra" _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb" + _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb2" _ "github.com/chrislusf/seaweedfs/weed/filer2/memdb" _ "github.com/chrislusf/seaweedfs/weed/filer2/mysql" _ "github.com/chrislusf/seaweedfs/weed/filer2/postgres" @@ -14,6 +24,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/notification" _ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs" + _ "github.com/chrislusf/seaweedfs/weed/notification/gocdk_pub_sub" _ "github.com/chrislusf/seaweedfs/weed/notification/google_pub_sub" _ "github.com/chrislusf/seaweedfs/weed/notification/kafka" _ "github.com/chrislusf/seaweedfs/weed/notification/log" @@ -28,85 +39,93 @@ type FilerOption struct { RedirectOnRead bool DisableDirListing bool MaxMB int - SecretKey string DirListingLimit int DataCenter string DefaultLevelDbDir string + DisableHttp bool + Port int } type FilerServer struct { - option *FilerOption - secret security.Secret - filer *filer2.Filer + option *FilerOption + secret security.SigningKey + filer *filer2.Filer + grpcDialOption grpc.DialOption } func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) { fs = &FilerServer{ - option: option, + option: option, + grpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "filer"), } if len(option.Masters) == 0 { glog.Fatal("master list is required!") } - fs.filer = filer2.NewFiler(option.Masters) + fs.filer = filer2.NewFiler(option.Masters, fs.grpcDialOption) go fs.filer.KeepConnectedToMaster() v := viper.GetViper() - if !LoadConfiguration("filer", false) { - v.Set("leveldb.enabled", true) - v.Set("leveldb.dir", option.DefaultLevelDbDir) + if !util.LoadConfiguration("filer", false) { + v.Set("leveldb2.enabled", true) + v.Set("leveldb2.dir", option.DefaultLevelDbDir) _, err := os.Stat(option.DefaultLevelDbDir) if os.IsNotExist(err) { os.MkdirAll(option.DefaultLevelDbDir, 0755) } } - LoadConfiguration("notification", false) + util.LoadConfiguration("notification", false) fs.filer.LoadConfiguration(v) notification.LoadConfiguration(v.Sub("notification")) handleStaticResources(defaultMux) - defaultMux.HandleFunc("/", fs.filerHandler) + if !option.DisableHttp { + defaultMux.HandleFunc("/", fs.filerHandler) + } if defaultMux != readonlyMux { readonlyMux.HandleFunc("/", fs.readonlyFilerHandler) } - return fs, nil -} + maybeStartMetrics(fs, option) -func (fs *FilerServer) jwt(fileId string) security.EncodedJwt { - return security.GenJwt(fs.secret, fileId) + return fs, nil } -func LoadConfiguration(configFileName string, required bool) (loaded bool) { - - // find a filer store - viper.SetConfigName(configFileName) // name of config file (without extension) - viper.AddConfigPath(".") // optionally look for config in the working directory - viper.AddConfigPath("$HOME/.seaweedfs") // call multiple times to add many search paths - viper.AddConfigPath("/etc/seaweedfs/") // path to look for the config file in - - glog.V(0).Infof("Reading %s.toml from %s", configFileName, viper.ConfigFileUsed()) - - if err := viper.MergeInConfig(); err != nil { // Handle errors reading the config file - glog.V(0).Infof("Reading %s: %v", viper.ConfigFileUsed(), err) - if required { - glog.Fatalf("Failed to load %s.toml file from current directory, or $HOME/.seaweedfs/, or /etc/seaweedfs/"+ - "\n\nPlease follow this example and add a filer.toml file to "+ - "current directory, or $HOME/.seaweedfs/, or /etc/seaweedfs/:\n"+ - " https://github.com/chrislusf/seaweedfs/blob/master/weed/%s.toml\n"+ - "\nOr use this command to generate the default toml file\n"+ - " weed scaffold -config=%s -output=.\n\n\n", - configFileName, configFileName, configFileName) +func maybeStartMetrics(fs *FilerServer, option *FilerOption) { + isConnected := false + var metricsAddress string + var metricsIntervalSec int + var readErr error + for !isConnected { + metricsAddress, metricsIntervalSec, readErr = readFilerConfiguration(fs.grpcDialOption, option.Masters[0]) + if readErr == nil { + isConnected = true } else { - return false + time.Sleep(7 * time.Second) } } + if metricsAddress == "" && metricsIntervalSec <= 0 { + return + } + go stats.LoopPushingMetric("filer", stats.SourceName(option.Port), stats.FilerGather, + func() (addr string, intervalSeconds int) { + return metricsAddress, metricsIntervalSec + }) +} - return true - +func readFilerConfiguration(grpcDialOption grpc.DialOption, masterGrpcAddress string) (metricsAddress string, metricsIntervalSec int, err error) { + err = operation.WithMasterServerClient(masterGrpcAddress, grpcDialOption, func(masterClient master_pb.SeaweedClient) error { + resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) + if err != nil { + return fmt.Errorf("get master %s configuration: %v", masterGrpcAddress, err) + } + metricsAddress, metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds) + return nil + }) + return } diff --git a/weed/server/filer_server_handlers.go b/weed/server/filer_server_handlers.go index d76d7df8c..b6bfc3b04 100644 --- a/weed/server/filer_server_handlers.go +++ b/weed/server/filer_server_handlers.go @@ -2,28 +2,47 @@ package weed_server import ( "net/http" + "time" + + "github.com/chrislusf/seaweedfs/weed/stats" ) func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) { + start := time.Now() switch r.Method { case "GET": + stats.FilerRequestCounter.WithLabelValues("get").Inc() fs.GetOrHeadHandler(w, r, true) + stats.FilerRequestHistogram.WithLabelValues("get").Observe(time.Since(start).Seconds()) case "HEAD": + stats.FilerRequestCounter.WithLabelValues("head").Inc() fs.GetOrHeadHandler(w, r, false) + stats.FilerRequestHistogram.WithLabelValues("head").Observe(time.Since(start).Seconds()) case "DELETE": + stats.FilerRequestCounter.WithLabelValues("delete").Inc() fs.DeleteHandler(w, r) + stats.FilerRequestHistogram.WithLabelValues("delete").Observe(time.Since(start).Seconds()) case "PUT": + stats.FilerRequestCounter.WithLabelValues("put").Inc() fs.PostHandler(w, r) + stats.FilerRequestHistogram.WithLabelValues("put").Observe(time.Since(start).Seconds()) case "POST": + stats.FilerRequestCounter.WithLabelValues("post").Inc() fs.PostHandler(w, r) + stats.FilerRequestHistogram.WithLabelValues("post").Observe(time.Since(start).Seconds()) } } func (fs *FilerServer) readonlyFilerHandler(w http.ResponseWriter, r *http.Request) { + start := time.Now() switch r.Method { case "GET": + stats.FilerRequestCounter.WithLabelValues("get").Inc() fs.GetOrHeadHandler(w, r, true) + stats.FilerRequestHistogram.WithLabelValues("get").Observe(time.Since(start).Seconds()) case "HEAD": + stats.FilerRequestCounter.WithLabelValues("head").Inc() fs.GetOrHeadHandler(w, r, false) + stats.FilerRequestHistogram.WithLabelValues("head").Observe(time.Since(start).Seconds()) } } diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go index 226de640c..0edf501a8 100644 --- a/weed/server/filer_server_handlers_read.go +++ b/weed/server/filer_server_handlers_read.go @@ -1,7 +1,9 @@ package weed_server import ( + "context" "io" + "io/ioutil" "mime" "mime/multipart" "net/http" @@ -12,22 +14,27 @@ import ( "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/util" ) func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, isGetMethod bool) { + path := r.URL.Path - if strings.HasSuffix(path, "/") && len(path) > 1 { + isForDirectory := strings.HasSuffix(path, "/") + if isForDirectory && len(path) > 1 { path = path[:len(path)-1] } - entry, err := fs.filer.FindEntry(filer2.FullPath(path)) + entry, err := fs.filer.FindEntry(context.Background(), filer2.FullPath(path)) if err != nil { if path == "/" { fs.listDirectoryHandler(w, r) return } glog.V(1).Infof("Not found %s: %v", path, err) + + stats.FilerRequestCounter.WithLabelValues("read.notfound").Inc() w.WriteHeader(http.StatusNotFound) return } @@ -41,8 +48,14 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, return } + if isForDirectory { + w.WriteHeader(http.StatusNotFound) + return + } + if len(entry.Chunks) == 0 { glog.V(1).Infof("no file chunks for %s, attr=%+v", path, entry.Attr) + stats.FilerRequestCounter.WithLabelValues("read.nocontent").Inc() w.WriteHeader(http.StatusNoContent) return } @@ -51,6 +64,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, if r.Method == "HEAD" { w.Header().Set("Content-Length", strconv.FormatInt(int64(filer2.TotalSize(entry.Chunks)), 10)) w.Header().Set("Last-Modified", entry.Attr.Mtime.Format(http.TimeFormat)) + setEtag(w, filer2.ETag(entry.Chunks)) return } @@ -65,7 +79,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, func (fs *FilerServer) handleSingleChunk(w http.ResponseWriter, r *http.Request, entry *filer2.Entry) { - fileId := entry.Chunks[0].FileId + fileId := entry.Chunks[0].GetFileIdString() urlString, err := fs.filer.MasterClient.LookupFileId(fileId) if err != nil { @@ -75,6 +89,7 @@ func (fs *FilerServer) handleSingleChunk(w http.ResponseWriter, r *http.Request, } if fs.option.RedirectOnRead { + stats.FilerRequestCounter.WithLabelValues("redirect").Inc() http.Redirect(w, r, urlString, http.StatusFound) return } @@ -105,17 +120,23 @@ func (fs *FilerServer) handleSingleChunk(w http.ResponseWriter, r *http.Request, writeJsonError(w, r, http.StatusInternalServerError, do_err) return } - defer resp.Body.Close() + defer func() { + io.Copy(ioutil.Discard, resp.Body) + resp.Body.Close() + }() for k, v := range resp.Header { w.Header()[k] = v } + if entry.Attr.Mime != "" { + w.Header().Set("Content-Type", entry.Attr.Mime) + } w.WriteHeader(resp.StatusCode) io.Copy(w, resp.Body) } func (fs *FilerServer) handleMultipleChunks(w http.ResponseWriter, r *http.Request, entry *filer2.Entry) { - mimeType := entry.Mime + mimeType := entry.Attr.Mime if mimeType == "" { if ext := path.Ext(entry.Name()); ext != "" { mimeType = mime.TypeByExtension(ext) @@ -222,31 +243,6 @@ func (fs *FilerServer) handleMultipleChunks(w http.ResponseWriter, r *http.Reque func (fs *FilerServer) writeContent(w io.Writer, entry *filer2.Entry, offset int64, size int) error { - chunkViews := filer2.ViewFromChunks(entry.Chunks, offset, size) - - fileId2Url := make(map[string]string) - - for _, chunkView := range chunkViews { - - urlString, err := fs.filer.MasterClient.LookupFileId(chunkView.FileId) - if err != nil { - glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err) - return err - } - fileId2Url[chunkView.FileId] = urlString - } - - for _, chunkView := range chunkViews { - urlString := fileId2Url[chunkView.FileId] - _, err := util.ReadUrlAsStream(urlString, chunkView.Offset, int(chunkView.Size), func(data []byte) { - w.Write(data) - }) - if err != nil { - glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err) - return err - } - } - - return nil + return filer2.StreamContent(fs.filer.MasterClient, w, entry.Chunks, offset, size) } diff --git a/weed/server/filer_server_handlers_read_dir.go b/weed/server/filer_server_handlers_read_dir.go index bcf7f0eb5..87e864559 100644 --- a/weed/server/filer_server_handlers_read_dir.go +++ b/weed/server/filer_server_handlers_read_dir.go @@ -1,6 +1,7 @@ package weed_server import ( + "context" "net/http" "strconv" "strings" @@ -8,6 +9,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" ui "github.com/chrislusf/seaweedfs/weed/server/filer_ui" + "github.com/chrislusf/seaweedfs/weed/stats" ) // listDirectoryHandler lists directories and folers under a directory @@ -15,6 +17,9 @@ import ( // sub directories are listed on the first page, when "lastFileName" // is empty. func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Request) { + + stats.FilerRequestCounter.WithLabelValues("list").Inc() + path := r.URL.Path if strings.HasSuffix(path, "/") && len(path) > 1 { path = path[:len(path)-1] @@ -27,7 +32,7 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque lastFileName := r.FormValue("lastFileName") - entries, err := fs.filer.ListDirectoryEntries(filer2.FullPath(path), lastFileName, false, limit) + entries, err := fs.filer.ListDirectoryEntries(context.Background(), filer2.FullPath(path), lastFileName, false, limit) if err != nil { glog.V(0).Infof("listDirectory %s %s %d: %s", path, lastFileName, limit, err) diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index 32f481e74..0bf1218ce 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -1,11 +1,17 @@ package weed_server import ( + "context" "encoding/json" "errors" + "fmt" + "io" "io/ioutil" + "mime" "net/http" "net/url" + "os" + filenamePath "path" "strconv" "strings" "time" @@ -14,8 +20,9 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/util" - "os" ) var ( @@ -31,7 +38,12 @@ type FilerPostResult struct { Url string `json:"url,omitempty"` } -func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, replication, collection string, dataCenter string) (fileId, urlLocation string, err error) { +func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, replication, collection string, dataCenter string) (fileId, urlLocation string, auth security.EncodedJwt, err error) { + + stats.FilerRequestCounter.WithLabelValues("assign").Inc() + start := time.Now() + defer func() { stats.FilerRequestHistogram.WithLabelValues("assign").Observe(time.Since(start).Seconds()) }() + ar := &operation.VolumeAssignRequest{ Count: 1, Replication: replication, @@ -50,7 +62,7 @@ func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, } } - assignResult, ae := operation.Assign(fs.filer.GetMaster(), ar, altRequest) + assignResult, ae := operation.Assign(fs.filer.GetMaster(), fs.grpcDialOption, ar, altRequest) if ae != nil { glog.Errorf("failing to assign a file id: %v", ae) writeJsonError(w, r, http.StatusInternalServerError, ae) @@ -59,11 +71,14 @@ func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, } fileId = assignResult.Fid urlLocation = "http://" + assignResult.Url + "/" + assignResult.Fid + auth = assignResult.Auth return } func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { + ctx := context.Background() + query := r.URL.Query() replication := query.Get("replication") if replication == "" { @@ -78,11 +93,11 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { dataCenter = fs.option.DataCenter } - if autoChunked := fs.autoChunk(w, r, replication, collection, dataCenter); autoChunked { + if autoChunked := fs.autoChunk(ctx, w, r, replication, collection, dataCenter); autoChunked { return } - fileId, urlLocation, err := fs.assignNewFileInfo(w, r, replication, collection, dataCenter) + fileId, urlLocation, auth, err := fs.assignNewFileInfo(w, r, replication, collection, dataCenter) if err != nil || fileId == "" || urlLocation == "" { glog.V(0).Infof("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter) @@ -103,70 +118,47 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { } glog.V(4).Infoln("post to", u) - // send request to volume server - request := &http.Request{ - Method: r.Method, - URL: u, - Proto: r.Proto, - ProtoMajor: r.ProtoMajor, - ProtoMinor: r.ProtoMinor, - Header: r.Header, - Body: r.Body, - Host: r.Host, - ContentLength: r.ContentLength, - } - resp, do_err := util.Do(request) - if do_err != nil { - glog.Errorf("failing to connect to volume server %s: %v, %+v", r.RequestURI, do_err, r.Method) - writeJsonError(w, r, http.StatusInternalServerError, do_err) - return - } - defer resp.Body.Close() - etag := resp.Header.Get("ETag") - resp_body, ra_err := ioutil.ReadAll(resp.Body) - if ra_err != nil { - glog.V(0).Infoln("failing to upload to volume server", r.RequestURI, ra_err.Error()) - writeJsonError(w, r, http.StatusInternalServerError, ra_err) + ret, err := fs.uploadToVolumeServer(r, u, auth, w, fileId) + if err != nil { return } - glog.V(4).Infoln("post result", string(resp_body)) - var ret operation.UploadResult - unmarshal_err := json.Unmarshal(resp_body, &ret) - if unmarshal_err != nil { - glog.V(0).Infoln("failing to read upload resonse", r.RequestURI, string(resp_body)) - writeJsonError(w, r, http.StatusInternalServerError, unmarshal_err) + + if err = fs.updateFilerStore(ctx, r, w, replication, collection, ret, fileId); err != nil { return } - if ret.Error != "" { - glog.V(0).Infoln("failing to post to volume server", r.RequestURI, ret.Error) - writeJsonError(w, r, http.StatusInternalServerError, errors.New(ret.Error)) - return + + // send back post result + reply := FilerPostResult{ + Name: ret.Name, + Size: ret.Size, + Error: ret.Error, + Fid: fileId, + Url: urlLocation, } + setEtag(w, ret.ETag) + writeJsonQuiet(w, r, http.StatusCreated, reply) +} + +// update metadata in filer store +func (fs *FilerServer) updateFilerStore(ctx context.Context, r *http.Request, w http.ResponseWriter, + replication string, collection string, ret operation.UploadResult, fileId string) (err error) { + + stats.FilerRequestCounter.WithLabelValues("postStoreWrite").Inc() + start := time.Now() + defer func() { + stats.FilerRequestHistogram.WithLabelValues("postStoreWrite").Observe(time.Since(start).Seconds()) + }() - // find correct final path path := r.URL.Path if strings.HasSuffix(path, "/") { if ret.Name != "" { path += ret.Name - } else { - fs.filer.DeleteFileByFileId(fileId) - glog.V(0).Infoln("Can not to write to folder", path, "without a file name!") - writeJsonError(w, r, http.StatusInternalServerError, - errors.New("Can not to write to folder "+path+" without a file name")) - return } } - - // update metadata in filer store - existingEntry, err := fs.filer.FindEntry(filer2.FullPath(path)) + existingEntry, err := fs.filer.FindEntry(ctx, filer2.FullPath(path)) crTime := time.Now() if err == nil && existingEntry != nil { - // glog.V(4).Infof("existing %s => %+v", path, existingEntry) - if existingEntry.IsDirectory() { - path += "/" + ret.Name - } else { - crTime = existingEntry.Crtime - } + crTime = existingEntry.Crtime } entry := &filer2.Entry{ FullPath: filer2.FullPath(path), @@ -184,27 +176,95 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { FileId: fileId, Size: uint64(ret.Size), Mtime: time.Now().UnixNano(), - ETag: etag, + ETag: ret.ETag, }}, } + if ext := filenamePath.Ext(path); ext != "" { + entry.Attr.Mime = mime.TypeByExtension(ext) + } // glog.V(4).Infof("saving %s => %+v", path, entry) - if db_err := fs.filer.CreateEntry(entry); db_err != nil { - fs.filer.DeleteFileByFileId(fileId) - glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err) - writeJsonError(w, r, http.StatusInternalServerError, db_err) + if dbErr := fs.filer.CreateEntry(ctx, entry); dbErr != nil { + fs.filer.DeleteChunks(entry.FullPath, entry.Chunks) + glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr) + writeJsonError(w, r, http.StatusInternalServerError, dbErr) + err = dbErr return } - // send back post result - reply := FilerPostResult{ - Name: ret.Name, - Size: ret.Size, - Error: ret.Error, - Fid: fileId, - Url: urlLocation, + return nil +} + +// send request to volume server +func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth security.EncodedJwt, w http.ResponseWriter, fileId string) (ret operation.UploadResult, err error) { + + stats.FilerRequestCounter.WithLabelValues("postUpload").Inc() + start := time.Now() + defer func() { stats.FilerRequestHistogram.WithLabelValues("postUpload").Observe(time.Since(start).Seconds()) }() + + request := &http.Request{ + Method: r.Method, + URL: u, + Proto: r.Proto, + ProtoMajor: r.ProtoMajor, + ProtoMinor: r.ProtoMinor, + Header: r.Header, + Body: r.Body, + Host: r.Host, + ContentLength: r.ContentLength, } - setEtag(w, etag) - writeJsonQuiet(w, r, http.StatusCreated, reply) + if auth != "" { + request.Header.Set("Authorization", "BEARER "+string(auth)) + } + resp, doErr := util.Do(request) + if doErr != nil { + glog.Errorf("failing to connect to volume server %s: %v, %+v", r.RequestURI, doErr, r.Method) + writeJsonError(w, r, http.StatusInternalServerError, doErr) + err = doErr + return + } + defer func() { + io.Copy(ioutil.Discard, resp.Body) + resp.Body.Close() + }() + etag := resp.Header.Get("ETag") + respBody, raErr := ioutil.ReadAll(resp.Body) + if raErr != nil { + glog.V(0).Infoln("failing to upload to volume server", r.RequestURI, raErr.Error()) + writeJsonError(w, r, http.StatusInternalServerError, raErr) + err = raErr + return + } + glog.V(4).Infoln("post result", string(respBody)) + unmarshalErr := json.Unmarshal(respBody, &ret) + if unmarshalErr != nil { + glog.V(0).Infoln("failing to read upload resonse", r.RequestURI, string(respBody)) + writeJsonError(w, r, http.StatusInternalServerError, unmarshalErr) + err = unmarshalErr + return + } + if ret.Error != "" { + err = errors.New(ret.Error) + glog.V(0).Infoln("failing to post to volume server", r.RequestURI, ret.Error) + writeJsonError(w, r, http.StatusInternalServerError, err) + return + } + // find correct final path + path := r.URL.Path + if strings.HasSuffix(path, "/") { + if ret.Name != "" { + path += ret.Name + } else { + err = fmt.Errorf("can not to write to folder %s without a file name", path) + fs.filer.DeleteFileByFileId(fileId) + glog.V(0).Infoln("Can not to write to folder", path, "without a file name!") + writeJsonError(w, r, http.StatusInternalServerError, err) + return + } + } + if etag != "" { + ret.ETag = etag + } + return } // curl -X DELETE http://localhost:8888/path/to @@ -213,7 +273,7 @@ func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { isRecursive := r.FormValue("recursive") == "true" - err := fs.filer.DeleteEntryMetaAndData(filer2.FullPath(r.URL.Path), isRecursive, true) + err := fs.filer.DeleteEntryMetaAndData(context.Background(), filer2.FullPath(r.URL.Path), isRecursive, true) if err != nil { glog.V(1).Infoln("deleting", r.URL.Path, ":", err.Error()) writeJsonError(w, r, http.StatusInternalServerError, err) diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index 4b1745aaa..492b55943 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -2,6 +2,7 @@ package weed_server import ( "bytes" + "context" "io" "io/ioutil" "net/http" @@ -14,10 +15,13 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/util" ) -func (fs *FilerServer) autoChunk(w http.ResponseWriter, r *http.Request, replication string, collection string, dataCenter string) bool { +func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, + replication string, collection string, dataCenter string) bool { if r.Method != "POST" { glog.V(4).Infoln("AutoChunking not supported for method", r.Method) return false @@ -53,7 +57,7 @@ func (fs *FilerServer) autoChunk(w http.ResponseWriter, r *http.Request, replica return false } - reply, err := fs.doAutoChunk(w, r, contentLength, chunkSize, replication, collection, dataCenter) + reply, err := fs.doAutoChunk(ctx, w, r, contentLength, chunkSize, replication, collection, dataCenter) if err != nil { writeJsonError(w, r, http.StatusInternalServerError, err) } else if reply != nil { @@ -62,7 +66,14 @@ func (fs *FilerServer) autoChunk(w http.ResponseWriter, r *http.Request, replica return true } -func (fs *FilerServer) doAutoChunk(w http.ResponseWriter, r *http.Request, contentLength int64, chunkSize int32, replication string, collection string, dataCenter string) (filerResult *FilerPostResult, replyerr error) { +func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, + contentLength int64, chunkSize int32, replication string, collection string, dataCenter string) (filerResult *FilerPostResult, replyerr error) { + + stats.FilerRequestCounter.WithLabelValues("postAutoChunk").Inc() + start := time.Now() + defer func() { + stats.FilerRequestHistogram.WithLabelValues("postAutoChunk").Observe(time.Since(start).Seconds()) + }() multipartReader, multipartReaderErr := r.MultipartReader() if multipartReaderErr != nil { @@ -105,14 +116,14 @@ func (fs *FilerServer) doAutoChunk(w http.ResponseWriter, r *http.Request, conte if chunkBufOffset >= chunkSize || readFully || (chunkBufOffset > 0 && bytesRead == 0) { writtenChunks = writtenChunks + 1 - fileId, urlLocation, assignErr := fs.assignNewFileInfo(w, r, replication, collection, dataCenter) + fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(w, r, replication, collection, dataCenter) if assignErr != nil { return nil, assignErr } // upload the chunk to the volume server chunkName := fileName + "_chunk_" + strconv.FormatInt(int64(len(fileChunks)+1), 10) - uploadErr := fs.doUpload(urlLocation, w, r, chunkBuf[0:chunkBufOffset], chunkName, "application/octet-stream", fileId) + uploadErr := fs.doUpload(urlLocation, w, r, chunkBuf[0:chunkBufOffset], chunkName, "application/octet-stream", fileId, auth) if uploadErr != nil { return nil, uploadErr } @@ -165,21 +176,28 @@ func (fs *FilerServer) doAutoChunk(w http.ResponseWriter, r *http.Request, conte }, Chunks: fileChunks, } - if db_err := fs.filer.CreateEntry(entry); db_err != nil { - replyerr = db_err - filerResult.Error = db_err.Error() - glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err) + if dbErr := fs.filer.CreateEntry(ctx, entry); dbErr != nil { + fs.filer.DeleteChunks(entry.FullPath, entry.Chunks) + replyerr = dbErr + filerResult.Error = dbErr.Error() + glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr) return } return } -func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, chunkBuf []byte, fileName string, contentType string, fileId string) (err error) { - err = nil +func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, + chunkBuf []byte, fileName string, contentType string, fileId string, auth security.EncodedJwt) (err error) { + + stats.FilerRequestCounter.WithLabelValues("postAutoChunkUpload").Inc() + start := time.Now() + defer func() { + stats.FilerRequestHistogram.WithLabelValues("postAutoChunkUpload").Observe(time.Since(start).Seconds()) + }() ioReader := ioutil.NopCloser(bytes.NewBuffer(chunkBuf)) - uploadResult, uploadError := operation.Upload(urlLocation, fileName, ioReader, false, contentType, nil, fs.jwt(fileId)) + uploadResult, uploadError := operation.Upload(urlLocation, fileName, ioReader, false, contentType, nil, auth) if uploadResult != nil { glog.V(0).Infoln("Chunk upload result. Name:", uploadResult.Name, "Fid:", fileId, "Size:", uploadResult.Size) } diff --git a/weed/server/filer_ui/breadcrumb.go b/weed/server/filer_ui/breadcrumb.go index d056a4b25..55a1909a8 100644 --- a/weed/server/filer_ui/breadcrumb.go +++ b/weed/server/filer_ui/breadcrumb.go @@ -16,7 +16,7 @@ func ToBreadcrumb(fullpath string) (crumbs []Breadcrumb) { for i := 0; i < len(parts); i++ { crumbs = append(crumbs, Breadcrumb{ Name: parts[i] + "/", - Link: "/" + filepath.Join(parts[0:i+1]...), + Link: "/" + filepath.ToSlash(filepath.Join(parts[0:i+1]...)), }) } diff --git a/weed/server/filer_ui/templates.go b/weed/server/filer_ui/templates.go index e31685ea0..884798936 100644 --- a/weed/server/filer_ui/templates.go +++ b/weed/server/filer_ui/templates.go @@ -162,7 +162,7 @@ function uploadFile(file, i) { var url = window.location.href var xhr = new XMLHttpRequest() var formData = new FormData() - xhr.open('POST', url, true) + xhr.open('POST', url, false) formData.append('file', file) xhr.send(formData) diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index 93dce59d8..1a17327a0 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -9,6 +9,7 @@ import ( "github.com/chrislusf/raft" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/topology" "google.golang.org/grpc/peer" ) @@ -30,6 +31,9 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ for _, v := range dn.GetVolumes() { message.DeletedVids = append(message.DeletedVids, uint32(v.Id)) } + for _, s := range dn.GetEcShards() { + message.DeletedVids = append(message.DeletedVids, uint32(s.VolumeId)) + } if len(message.DeletedVids) > 0 { ms.clientChansLock.RLock() @@ -63,39 +67,84 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ rack := dc.GetOrCreateRack(rackName) dn = rack.GetOrCreateDataNode(heartbeat.Ip, int(heartbeat.Port), heartbeat.PublicUrl, - int(heartbeat.MaxVolumeCount)) + int64(heartbeat.MaxVolumeCount)) glog.V(0).Infof("added volume server %v:%d", heartbeat.GetIp(), heartbeat.GetPort()) if err := stream.Send(&master_pb.HeartbeatResponse{ - VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024, - SecretKey: string(ms.guard.SecretKey), + VolumeSizeLimit: uint64(ms.option.VolumeSizeLimitMB) * 1024 * 1024, }); err != nil { return err } } + glog.V(4).Infof("master received heartbeat %s", heartbeat.String()) message := &master_pb.VolumeLocation{ Url: dn.Url(), PublicUrl: dn.PublicUrl, } - if len(heartbeat.NewVids) > 0 || len(heartbeat.DeletedVids) > 0 { + if len(heartbeat.NewVolumes) > 0 || len(heartbeat.DeletedVolumes) > 0 { // process delta volume ids if exists for fast volume id updates - message.NewVids = append(message.NewVids, heartbeat.NewVids...) - message.DeletedVids = append(message.DeletedVids, heartbeat.DeletedVids...) - } else { + for _, volInfo := range heartbeat.NewVolumes { + message.NewVids = append(message.NewVids, volInfo.Id) + } + for _, volInfo := range heartbeat.DeletedVolumes { + message.DeletedVids = append(message.DeletedVids, volInfo.Id) + } + // update master internal volume layouts + t.IncrementalSyncDataNodeRegistration(heartbeat.NewVolumes, heartbeat.DeletedVolumes, dn) + } + + if len(heartbeat.Volumes) > 0 || heartbeat.HasNoVolumes { // process heartbeat.Volumes newVolumes, deletedVolumes := t.SyncDataNodeRegistration(heartbeat.Volumes, dn) for _, v := range newVolumes { + glog.V(0).Infof("master see new volume %d from %s", uint32(v.Id), dn.Url()) message.NewVids = append(message.NewVids, uint32(v.Id)) } for _, v := range deletedVolumes { + glog.V(0).Infof("master see deleted volume %d from %s", uint32(v.Id), dn.Url()) message.DeletedVids = append(message.DeletedVids, uint32(v.Id)) } } + if len(heartbeat.NewEcShards) > 0 || len(heartbeat.DeletedEcShards) > 0 { + + // update master internal volume layouts + t.IncrementalSyncDataNodeEcShards(heartbeat.NewEcShards, heartbeat.DeletedEcShards, dn) + + for _, s := range heartbeat.NewEcShards { + message.NewVids = append(message.NewVids, s.Id) + } + for _, s := range heartbeat.DeletedEcShards { + if dn.HasVolumesById(needle.VolumeId(s.Id)) { + continue + } + message.DeletedVids = append(message.DeletedVids, s.Id) + } + + } + + if len(heartbeat.EcShards) > 0 || heartbeat.HasNoEcShards { + glog.V(1).Infof("master recieved ec shards from %s: %+v", dn.Url(), heartbeat.EcShards) + newShards, deletedShards := t.SyncDataNodeEcShards(heartbeat.EcShards, dn) + + // broadcast the ec vid changes to master clients + for _, s := range newShards { + message.NewVids = append(message.NewVids, uint32(s.VolumeId)) + } + for _, s := range deletedShards { + if dn.HasVolumesById(s.VolumeId) { + continue + } + message.DeletedVids = append(message.DeletedVids, uint32(s.VolumeId)) + } + + } + if len(message.NewVids) > 0 || len(message.DeletedVids) > 0 { ms.clientChansLock.RLock() - for _, ch := range ms.clientChans { + for host, ch := range ms.clientChans { + glog.V(0).Infof("master send to %s: %s", host, message.String()) ch <- message } ms.clientChansLock.RUnlock() @@ -103,12 +152,15 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ // tell the volume servers about the leader newLeader, err := t.Leader() - if err == nil { - if err := stream.Send(&master_pb.HeartbeatResponse{ - Leader: newLeader, - }); err != nil { - return err - } + if err != nil { + return err + } + if err := stream.Send(&master_pb.HeartbeatResponse{ + Leader: newLeader, + MetricsAddress: ms.option.MetricsAddress, + MetricsIntervalSeconds: uint32(ms.option.MetricsIntervalSec), + }); err != nil { + return err } } } diff --git a/weed/server/master_grpc_server_collection.go b/weed/server/master_grpc_server_collection.go new file mode 100644 index 000000000..a50cfa192 --- /dev/null +++ b/weed/server/master_grpc_server_collection.go @@ -0,0 +1,94 @@ +package weed_server + +import ( + "context" + + "github.com/chrislusf/raft" + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" +) + +func (ms *MasterServer) CollectionList(ctx context.Context, req *master_pb.CollectionListRequest) (*master_pb.CollectionListResponse, error) { + + if !ms.Topo.IsLeader() { + return nil, raft.NotLeaderError + } + + resp := &master_pb.CollectionListResponse{} + collections := ms.Topo.ListCollections(req.IncludeNormalVolumes, req.IncludeEcVolumes) + for _, c := range collections { + resp.Collections = append(resp.Collections, &master_pb.Collection{ + Name: c, + }) + } + + return resp, nil +} + +func (ms *MasterServer) CollectionDelete(ctx context.Context, req *master_pb.CollectionDeleteRequest) (*master_pb.CollectionDeleteResponse, error) { + + if !ms.Topo.IsLeader() { + return nil, raft.NotLeaderError + } + + resp := &master_pb.CollectionDeleteResponse{} + + err := ms.doDeleteNormalCollection(req.Name) + + if err != nil { + return nil, err + } + + err = ms.doDeleteEcCollection(req.Name) + + if err != nil { + return nil, err + } + + return resp, nil +} + +func (ms *MasterServer) doDeleteNormalCollection(collectionName string) error { + + collection, ok := ms.Topo.FindCollection(collectionName) + if !ok { + return nil + } + + for _, server := range collection.ListVolumeServers() { + err := operation.WithVolumeServerClient(server.Url(), ms.grpcDialOpiton, func(client volume_server_pb.VolumeServerClient) error { + _, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{ + Collection: collectionName, + }) + return deleteErr + }) + if err != nil { + return err + } + } + ms.Topo.DeleteCollection(collectionName) + + return nil +} + +func (ms *MasterServer) doDeleteEcCollection(collectionName string) error { + + listOfEcServers := ms.Topo.ListEcServersByCollection(collectionName) + + for _, server := range listOfEcServers { + err := operation.WithVolumeServerClient(server, ms.grpcDialOpiton, func(client volume_server_pb.VolumeServerClient) error { + _, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{ + Collection: collectionName, + }) + return deleteErr + }) + if err != nil { + return err + } + } + + ms.Topo.DeleteEcCollection(collectionName) + + return nil +} diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go index ae0819d2d..19064bcde 100644 --- a/weed/server/master_grpc_server_volume.go +++ b/weed/server/master_grpc_server_volume.go @@ -6,7 +6,9 @@ import ( "github.com/chrislusf/raft" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/topology" ) @@ -48,13 +50,13 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest } if req.Replication == "" { - req.Replication = ms.defaultReplicaPlacement + req.Replication = ms.option.DefaultReplicaPlacement } replicaPlacement, err := storage.NewReplicaPlacementFromString(req.Replication) if err != nil { return nil, err } - ttl, err := storage.ReadTTL(req.Ttl) + ttl, err := needle.ReadTTL(req.Ttl) if err != nil { return nil, err } @@ -63,7 +65,7 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest Collection: req.Collection, ReplicaPlacement: replicaPlacement, Ttl: ttl, - Prealloacte: ms.preallocate, + Prealloacte: ms.preallocateSize, DataCenter: req.DataCenter, Rack: req.Rack, DataNode: req.DataNode, @@ -75,7 +77,7 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest } ms.vgLock.Lock() if !ms.Topo.HasWritableVolume(option) { - if _, err = ms.vg.AutomaticGrowByType(option, ms.Topo); err != nil { + if _, err = ms.vg.AutomaticGrowByType(option, ms.grpcDialOpiton, ms.Topo); err != nil { ms.vgLock.Unlock() return nil, fmt.Errorf("Cannot grow volume group! %v", err) } @@ -92,6 +94,7 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest Url: dn.Url(), PublicUrl: dn.PublicUrl, Count: count, + Auth: string(security.GenJwt(ms.guard.SigningKey, ms.guard.ExpiresAfterSec, fid)), }, nil } @@ -102,13 +105,13 @@ func (ms *MasterServer) Statistics(ctx context.Context, req *master_pb.Statistic } if req.Replication == "" { - req.Replication = ms.defaultReplicaPlacement + req.Replication = ms.option.DefaultReplicaPlacement } replicaPlacement, err := storage.NewReplicaPlacementFromString(req.Replication) if err != nil { return nil, err } - ttl, err := storage.ReadTTL(req.Ttl) + ttl, err := needle.ReadTTL(req.Ttl) if err != nil { return nil, err } @@ -124,3 +127,60 @@ func (ms *MasterServer) Statistics(ctx context.Context, req *master_pb.Statistic return resp, nil } + +func (ms *MasterServer) VolumeList(ctx context.Context, req *master_pb.VolumeListRequest) (*master_pb.VolumeListResponse, error) { + + if !ms.Topo.IsLeader() { + return nil, raft.NotLeaderError + } + + resp := &master_pb.VolumeListResponse{ + TopologyInfo: ms.Topo.ToTopologyInfo(), + VolumeSizeLimitMb: uint64(ms.option.VolumeSizeLimitMB), + } + + return resp, nil +} + +func (ms *MasterServer) LookupEcVolume(ctx context.Context, req *master_pb.LookupEcVolumeRequest) (*master_pb.LookupEcVolumeResponse, error) { + + if !ms.Topo.IsLeader() { + return nil, raft.NotLeaderError + } + + resp := &master_pb.LookupEcVolumeResponse{} + + ecLocations, found := ms.Topo.LookupEcShards(needle.VolumeId(req.VolumeId)) + + if !found { + return resp, fmt.Errorf("ec volume %d not found", req.VolumeId) + } + + resp.VolumeId = req.VolumeId + + for shardId, shardLocations := range ecLocations.Locations { + var locations []*master_pb.Location + for _, dn := range shardLocations { + locations = append(locations, &master_pb.Location{ + Url: string(dn.Id()), + PublicUrl: dn.PublicUrl, + }) + } + resp.ShardIdLocations = append(resp.ShardIdLocations, &master_pb.LookupEcVolumeResponse_EcShardIdLocation{ + ShardId: uint32(shardId), + Locations: locations, + }) + } + + return resp, nil +} + +func (ms *MasterServer) GetMasterConfiguration(ctx context.Context, req *master_pb.GetMasterConfigurationRequest) (*master_pb.GetMasterConfigurationResponse, error) { + + resp := &master_pb.GetMasterConfigurationResponse{ + MetricsAddress: ms.option.MetricsAddress, + MetricsIntervalSeconds: uint32(ms.option.MetricsIntervalSec), + } + + return resp, nil +} diff --git a/weed/server/master_server.go b/weed/server/master_server.go index f22925e56..3689b5495 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -2,10 +2,17 @@ package weed_server import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/shell" + "google.golang.org/grpc" "net/http" "net/http/httputil" "net/url" + "os" + "regexp" + "strconv" + "strings" "sync" + "time" "github.com/chrislusf/raft" "github.com/chrislusf/seaweedfs/weed/glog" @@ -15,17 +22,28 @@ import ( "github.com/chrislusf/seaweedfs/weed/topology" "github.com/chrislusf/seaweedfs/weed/util" "github.com/gorilla/mux" + "github.com/spf13/viper" ) +type MasterOption struct { + Port int + MetaFolder string + VolumeSizeLimitMB uint + VolumePreallocate bool + PulseSeconds int + DefaultReplicaPlacement string + GarbageThreshold float64 + WhiteList []string + DisableHttp bool + MetricsAddress string + MetricsIntervalSec int +} + type MasterServer struct { - port int - metaFolder string - volumeSizeLimitMB uint - preallocate int64 - pulseSeconds int - defaultReplicaPlacement string - garbageThreshold float64 - guard *security.Guard + option *MasterOption + guard *security.Guard + + preallocateSize int64 Topo *topology.Topology vg *topology.VolumeGrowth @@ -36,56 +54,60 @@ type MasterServer struct { // notifying clients clientChansLock sync.RWMutex clientChans map[string]chan *master_pb.VolumeLocation + + grpcDialOpiton grpc.DialOption } -func NewMasterServer(r *mux.Router, port int, metaFolder string, - volumeSizeLimitMB uint, - preallocate bool, - pulseSeconds int, - defaultReplicaPlacement string, - garbageThreshold float64, - whiteList []string, - secureKey string, -) *MasterServer { +func NewMasterServer(r *mux.Router, option *MasterOption) *MasterServer { + + v := viper.GetViper() + signingKey := v.GetString("jwt.signing.key") + v.SetDefault("jwt.signing.expires_after_seconds", 10) + expiresAfterSec := v.GetInt("jwt.signing.expires_after_seconds") + + readSigningKey := v.GetString("jwt.signing.read.key") + v.SetDefault("jwt.signing.read.expires_after_seconds", 60) + readExpiresAfterSec := v.GetInt("jwt.signing.read.expires_after_seconds") var preallocateSize int64 - if preallocate { - preallocateSize = int64(volumeSizeLimitMB) * (1 << 20) + if option.VolumePreallocate { + preallocateSize = int64(option.VolumeSizeLimitMB) * (1 << 20) } ms := &MasterServer{ - port: port, - volumeSizeLimitMB: volumeSizeLimitMB, - preallocate: preallocateSize, - pulseSeconds: pulseSeconds, - defaultReplicaPlacement: defaultReplicaPlacement, - garbageThreshold: garbageThreshold, - clientChans: make(map[string]chan *master_pb.VolumeLocation), + option: option, + preallocateSize: preallocateSize, + clientChans: make(map[string]chan *master_pb.VolumeLocation), + grpcDialOpiton: security.LoadClientTLS(v.Sub("grpc"), "master"), } ms.bounedLeaderChan = make(chan int, 16) seq := sequence.NewMemorySequencer() - ms.Topo = topology.NewTopology("topo", seq, uint64(volumeSizeLimitMB)*1024*1024, pulseSeconds) + ms.Topo = topology.NewTopology("topo", seq, uint64(ms.option.VolumeSizeLimitMB)*1024*1024, ms.option.PulseSeconds) ms.vg = topology.NewDefaultVolumeGrowth() - glog.V(0).Infoln("Volume Size Limit is", volumeSizeLimitMB, "MB") - - ms.guard = security.NewGuard(whiteList, secureKey) - - handleStaticResources2(r) - r.HandleFunc("/", ms.uiStatusHandler) - r.HandleFunc("/ui/index.html", ms.uiStatusHandler) - r.HandleFunc("/dir/assign", ms.proxyToLeader(ms.guard.WhiteList(ms.dirAssignHandler))) - r.HandleFunc("/dir/lookup", ms.proxyToLeader(ms.guard.WhiteList(ms.dirLookupHandler))) - r.HandleFunc("/dir/status", ms.proxyToLeader(ms.guard.WhiteList(ms.dirStatusHandler))) - r.HandleFunc("/col/delete", ms.proxyToLeader(ms.guard.WhiteList(ms.collectionDeleteHandler))) - r.HandleFunc("/vol/grow", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeGrowHandler))) - r.HandleFunc("/vol/status", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeStatusHandler))) - r.HandleFunc("/vol/vacuum", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeVacuumHandler))) - r.HandleFunc("/submit", ms.guard.WhiteList(ms.submitFromMasterServerHandler)) - r.HandleFunc("/stats/health", ms.guard.WhiteList(statsHealthHandler)) - r.HandleFunc("/stats/counter", ms.guard.WhiteList(statsCounterHandler)) - r.HandleFunc("/stats/memory", ms.guard.WhiteList(statsMemoryHandler)) - r.HandleFunc("/{fileId}", ms.proxyToLeader(ms.redirectHandler)) - - ms.Topo.StartRefreshWritableVolumes(garbageThreshold, ms.preallocate) + glog.V(0).Infoln("Volume Size Limit is", ms.option.VolumeSizeLimitMB, "MB") + + ms.guard = security.NewGuard(ms.option.WhiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec) + + if !ms.option.DisableHttp { + handleStaticResources2(r) + r.HandleFunc("/", ms.proxyToLeader(ms.uiStatusHandler)) + r.HandleFunc("/ui/index.html", ms.uiStatusHandler) + r.HandleFunc("/dir/assign", ms.proxyToLeader(ms.guard.WhiteList(ms.dirAssignHandler))) + r.HandleFunc("/dir/lookup", ms.proxyToLeader(ms.guard.WhiteList(ms.dirLookupHandler))) + r.HandleFunc("/dir/status", ms.proxyToLeader(ms.guard.WhiteList(ms.dirStatusHandler))) + r.HandleFunc("/col/delete", ms.proxyToLeader(ms.guard.WhiteList(ms.collectionDeleteHandler))) + r.HandleFunc("/vol/grow", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeGrowHandler))) + r.HandleFunc("/vol/status", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeStatusHandler))) + r.HandleFunc("/vol/vacuum", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeVacuumHandler))) + r.HandleFunc("/submit", ms.guard.WhiteList(ms.submitFromMasterServerHandler)) + r.HandleFunc("/stats/health", ms.guard.WhiteList(statsHealthHandler)) + r.HandleFunc("/stats/counter", ms.guard.WhiteList(statsCounterHandler)) + r.HandleFunc("/stats/memory", ms.guard.WhiteList(statsMemoryHandler)) + r.HandleFunc("/{fileId}", ms.proxyToLeader(ms.redirectHandler)) + } + + ms.Topo.StartRefreshWritableVolumes(ms.grpcDialOpiton, ms.option.GarbageThreshold, ms.preallocateSize) + + ms.startAdminScripts() return ms } @@ -98,6 +120,9 @@ func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) { glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "becomes leader.") } }) + ms.Topo.RaftServer.AddEventListener(raft.StateChangeEventType, func(e raft.Event) { + glog.V(0).Infof("state change: %+v", e) + }) if ms.Topo.IsLeader() { glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", "I am the leader!") } else { @@ -138,3 +163,63 @@ func (ms *MasterServer) proxyToLeader(f func(w http.ResponseWriter, r *http.Requ } } } + +func (ms *MasterServer) startAdminScripts() { + v := viper.GetViper() + adminScripts := v.GetString("master.maintenance.scripts") + v.SetDefault("master.maintenance.sleep_minutes", 17) + sleepMinutes := v.GetInt("master.maintenance.sleep_minutes") + + glog.V(0).Infof("adminScripts:\n%v", adminScripts) + if adminScripts == "" { + return + } + + scriptLines := strings.Split(adminScripts, "\n") + + masterAddress := "localhost:" + strconv.Itoa(ms.option.Port) + + var shellOptions shell.ShellOptions + shellOptions.GrpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "master") + shellOptions.Masters = &masterAddress + shellOptions.FilerHost = "localhost" + shellOptions.FilerPort = 8888 + shellOptions.Directory = "/" + + commandEnv := shell.NewCommandEnv(shellOptions) + + reg, _ := regexp.Compile(`'.*?'|".*?"|\S+`) + + go commandEnv.MasterClient.KeepConnectedToMaster() + + go func() { + commandEnv.MasterClient.WaitUntilConnected() + + c := time.Tick(time.Duration(sleepMinutes) * time.Minute) + for _ = range c { + if ms.Topo.IsLeader() { + for _, line := range scriptLines { + + cmds := reg.FindAllString(line, -1) + if len(cmds) == 0 { + continue + } + args := make([]string, len(cmds[1:])) + for i := range args { + args[i] = strings.Trim(string(cmds[1+i]), "\"'") + } + cmd := strings.ToLower(cmds[0]) + + for _, c := range shell.Commands { + if c.Name() == cmd { + glog.V(0).Infof("executing: %s %v", cmd, args) + if err := c.Do(args, commandEnv, os.Stdout); err != nil { + glog.V(0).Infof("error: %v", err) + } + } + } + } + } + } + }() +} diff --git a/weed/server/master_server_handlers.go b/weed/server/master_server_handlers.go index a797dddfc..5c7ff41cf 100644 --- a/weed/server/master_server_handlers.go +++ b/weed/server/master_server_handlers.go @@ -7,8 +7,9 @@ import ( "strings" "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/stats" - "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" ) func (ms *MasterServer) lookupVolumeId(vids []string, collection string) (volumeLocations map[string]operation.LookupResult) { @@ -21,7 +22,7 @@ func (ms *MasterServer) lookupVolumeId(vids []string, collection string) (volume if _, ok := volumeLocations[vid]; ok { continue } - volumeId, err := storage.NewVolumeId(vid) + volumeId, err := needle.NewVolumeId(vid) if err == nil { machines := ms.Topo.Lookup(collection, volumeId) if machines != nil { @@ -40,12 +41,23 @@ func (ms *MasterServer) lookupVolumeId(vids []string, collection string) (volume return } -// Takes one volumeId only, can not do batch lookup +// If "fileId" is provided, this returns the fileId location and a JWT to update or delete the file. +// If "volumeId" is provided, this only returns the volumeId location func (ms *MasterServer) dirLookupHandler(w http.ResponseWriter, r *http.Request) { vid := r.FormValue("volumeId") - commaSep := strings.Index(vid, ",") - if commaSep > 0 { - vid = vid[0:commaSep] + if vid != "" { + // backward compatible + commaSep := strings.Index(vid, ",") + if commaSep > 0 { + vid = vid[0:commaSep] + } + } + fileId := r.FormValue("fileId") + if fileId != "" { + commaSep := strings.Index(fileId, ",") + if commaSep > 0 { + vid = fileId[0:commaSep] + } } vids := []string{vid} collection := r.FormValue("collection") //optional, but can be faster if too many collections @@ -54,6 +66,10 @@ func (ms *MasterServer) dirLookupHandler(w http.ResponseWriter, r *http.Request) httpStatus := http.StatusOK if location.Error != "" { httpStatus = http.StatusNotFound + } else { + forRead := r.FormValue("read") + isRead := forRead == "yes" + ms.maybeAddJwtAuthorization(w, fileId, !isRead) } writeJsonQuiet(w, r, httpStatus, location) } @@ -79,7 +95,7 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request) ms.vgLock.Lock() defer ms.vgLock.Unlock() if !ms.Topo.HasWritableVolume(option) { - if _, err = ms.vg.AutomaticGrowByType(option, ms.Topo); err != nil { + if _, err = ms.vg.AutomaticGrowByType(option, ms.grpcDialOpiton, ms.Topo); err != nil { writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("Cannot grow volume group! %v", err)) return @@ -88,8 +104,23 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request) } fid, count, dn, err := ms.Topo.PickForWrite(requestedCount, option) if err == nil { + ms.maybeAddJwtAuthorization(w, fid, true) writeJsonQuiet(w, r, http.StatusOK, operation.AssignResult{Fid: fid, Url: dn.Url(), PublicUrl: dn.PublicUrl, Count: count}) } else { writeJsonQuiet(w, r, http.StatusNotAcceptable, operation.AssignResult{Error: err.Error()}) } } + +func (ms *MasterServer) maybeAddJwtAuthorization(w http.ResponseWriter, fileId string, isWrite bool) { + var encodedJwt security.EncodedJwt + if isWrite { + encodedJwt = security.GenJwt(ms.guard.SigningKey, ms.guard.ExpiresAfterSec, fileId) + } else { + encodedJwt = security.GenJwt(ms.guard.ReadSigningKey, ms.guard.ReadExpiresAfterSec, fileId) + } + if encodedJwt == "" { + return + } + + w.Header().Set("Authorization", "BEARER "+string(encodedJwt)) +} diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go index 3a2662908..343bcb8da 100644 --- a/weed/server/master_server_handlers_admin.go +++ b/weed/server/master_server_handlers_admin.go @@ -7,12 +7,12 @@ import ( "math/rand" "net/http" "strconv" - "time" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/topology" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -24,11 +24,8 @@ func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.R return } for _, server := range collection.ListVolumeServers() { - err := operation.WithVolumeServerClient(server.Url(), func(client volume_server_pb.VolumeServerClient) error { - ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second)) - defer cancel() - - _, deleteErr := client.DeleteCollection(ctx, &volume_server_pb.DeleteCollectionRequest{ + err := operation.WithVolumeServerClient(server.Url(), ms.grpcDialOpiton, func(client volume_server_pb.VolumeServerClient) error { + _, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{ Collection: collection.Name, }) return deleteErr @@ -50,7 +47,7 @@ func (ms *MasterServer) dirStatusHandler(w http.ResponseWriter, r *http.Request) func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Request) { gcString := r.FormValue("garbageThreshold") - gcThreshold := ms.garbageThreshold + gcThreshold := ms.option.GarbageThreshold if gcString != "" { var err error gcThreshold, err = strconv.ParseFloat(gcString, 32) @@ -60,7 +57,7 @@ func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Reque } } glog.Infoln("garbageThreshold =", gcThreshold) - ms.Topo.Vacuum(gcThreshold, ms.preallocate) + ms.Topo.Vacuum(ms.grpcDialOpiton, gcThreshold, ms.preallocateSize) ms.dirStatusHandler(w, r) } @@ -73,10 +70,10 @@ func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request } if err == nil { if count, err = strconv.Atoi(r.FormValue("count")); err == nil { - if ms.Topo.FreeSpace() < count*option.ReplicaPlacement.GetCopyCount() { - err = errors.New("Only " + strconv.Itoa(ms.Topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*option.ReplicaPlacement.GetCopyCount())) + if ms.Topo.FreeSpace() < int64(count*option.ReplicaPlacement.GetCopyCount()) { + err = fmt.Errorf("only %d volumes left, not enough for %d", ms.Topo.FreeSpace(), count*option.ReplicaPlacement.GetCopyCount()) } else { - count, err = ms.vg.GrowByCountAndType(count, option, ms.Topo) + count, err = ms.vg.GrowByCountAndType(ms.grpcDialOpiton, count, option, ms.Topo) } } else { err = errors.New("parameter count is not found") @@ -98,7 +95,7 @@ func (ms *MasterServer) volumeStatusHandler(w http.ResponseWriter, r *http.Reque func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request) { vid, _, _, _, _ := parseURLPath(r.URL.Path) - volumeId, err := storage.NewVolumeId(vid) + volumeId, err := needle.NewVolumeId(vid) if err != nil { debug("parsing error:", err, r.URL.Path) return @@ -122,17 +119,17 @@ func (ms *MasterServer) selfUrl(r *http.Request) string { if r.Host != "" { return r.Host } - return "localhost:" + strconv.Itoa(ms.port) + return "localhost:" + strconv.Itoa(ms.option.Port) } func (ms *MasterServer) submitFromMasterServerHandler(w http.ResponseWriter, r *http.Request) { if ms.Topo.IsLeader() { - submitForClientHandler(w, r, ms.selfUrl(r)) + submitForClientHandler(w, r, ms.selfUrl(r), ms.grpcDialOpiton) } else { masterUrl, err := ms.Topo.Leader() if err != nil { writeJsonError(w, r, http.StatusInternalServerError, err) } else { - submitForClientHandler(w, r, masterUrl) + submitForClientHandler(w, r, masterUrl, ms.grpcDialOpiton) } } } @@ -145,17 +142,17 @@ func (ms *MasterServer) HasWritableVolume(option *topology.VolumeGrowOption) boo func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGrowOption, error) { replicationString := r.FormValue("replication") if replicationString == "" { - replicationString = ms.defaultReplicaPlacement + replicationString = ms.option.DefaultReplicaPlacement } replicaPlacement, err := storage.NewReplicaPlacementFromString(replicationString) if err != nil { return nil, err } - ttl, err := storage.ReadTTL(r.FormValue("ttl")) + ttl, err := needle.ReadTTL(r.FormValue("ttl")) if err != nil { return nil, err } - preallocate := ms.preallocate + preallocate := ms.preallocateSize if r.FormValue("preallocate") != "" { preallocate, err = strconv.ParseInt(r.FormValue("preallocate"), 10, 64) if err != nil { diff --git a/weed/server/master_ui/templates.go b/weed/server/master_ui/templates.go index f32e8e61b..b674e3f82 100644 --- a/weed/server/master_ui/templates.go +++ b/weed/server/master_ui/templates.go @@ -41,7 +41,7 @@ var StatusTpl = template.Must(template.New("status").Parse(`<!DOCTYPE html> <td class="col-sm-2 field-label"><label>Other Masters:</label></td> <td class="col-sm-10"><ul class="list-unstyled"> {{ range $k, $p := .Peers }} - <li><a href="{{ $p.ConnectionString }}">{{ $p.Name }}</a></li> + <li><a href="http://{{ $p.Name }}/ui/index.html">{{ $p.Name }}</a></li> {{ end }} </ul></td> </tr> @@ -76,6 +76,7 @@ var StatusTpl = template.Must(template.New("status").Parse(`<!DOCTYPE html> <th>Rack</th> <th>RemoteAddr</th> <th>#Volumes</th> + <th>#ErasureCodingShards</th> <th>Max</th> </tr> </thead> @@ -88,6 +89,7 @@ var StatusTpl = template.Must(template.New("status").Parse(`<!DOCTYPE html> <td>{{ $rack.Id }}</td> <td><a href="http://{{ $dn.Url }}/ui/index.html">{{ $dn.Url }}</a></td> <td>{{ $dn.Volumes }}</td> + <td>{{ $dn.EcShards }}</td> <td>{{ $dn.Max }}</td> </tr> {{ end }} diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go index 68042da54..88320ed98 100644 --- a/weed/server/raft_server.go +++ b/weed/server/raft_server.go @@ -2,37 +2,35 @@ package weed_server import ( "encoding/json" + "github.com/chrislusf/seaweedfs/weed/util" + "google.golang.org/grpc" "io/ioutil" - "math/rand" "os" "path" "reflect" "sort" - "strings" "time" "github.com/chrislusf/raft" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/topology" - "github.com/gorilla/mux" ) type RaftServer struct { peers []string // initial peers to join with raftServer raft.Server dataDir string - httpAddr string - router *mux.Router + serverAddr string topo *topology.Topology + *raft.GrpcServer } -func NewRaftServer(r *mux.Router, peers []string, httpAddr string, dataDir string, topo *topology.Topology, pulseSeconds int) *RaftServer { +func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr string, dataDir string, topo *topology.Topology, pulseSeconds int) *RaftServer { s := &RaftServer{ - peers: peers, - httpAddr: httpAddr, - dataDir: dataDir, - router: r, - topo: topo, + peers: peers, + serverAddr: serverAddr, + dataDir: dataDir, + topo: topo, } if glog.V(4) { @@ -42,42 +40,39 @@ func NewRaftServer(r *mux.Router, peers []string, httpAddr string, dataDir strin raft.RegisterCommand(&topology.MaxVolumeIdCommand{}) var err error - transporter := raft.NewHTTPTransporter("/cluster", time.Second) - transporter.Transport.MaxIdleConnsPerHost = 1024 - transporter.Transport.IdleConnTimeout = time.Second - glog.V(0).Infof("Starting RaftServer with %v", httpAddr) + transporter := raft.NewGrpcTransporter(grpcDialOption) + glog.V(0).Infof("Starting RaftServer with %v", serverAddr) // Clear old cluster configurations if peers are changed - if oldPeers, changed := isPeersChanged(s.dataDir, httpAddr, s.peers); changed { + if oldPeers, changed := isPeersChanged(s.dataDir, serverAddr, s.peers); changed { glog.V(0).Infof("Peers Change: %v => %v", oldPeers, s.peers) os.RemoveAll(path.Join(s.dataDir, "conf")) os.RemoveAll(path.Join(s.dataDir, "log")) os.RemoveAll(path.Join(s.dataDir, "snapshot")) } - s.raftServer, err = raft.NewServer(s.httpAddr, s.dataDir, transporter, nil, topo, "") + s.raftServer, err = raft.NewServer(s.serverAddr, s.dataDir, transporter, nil, topo, "") if err != nil { glog.V(0).Infoln(err) return nil } - transporter.Install(s.raftServer, s) s.raftServer.SetHeartbeatInterval(500 * time.Millisecond) s.raftServer.SetElectionTimeout(time.Duration(pulseSeconds) * 500 * time.Millisecond) s.raftServer.Start() - s.router.HandleFunc("/cluster/status", s.statusHandler).Methods("GET") - for _, peer := range s.peers { - s.raftServer.AddPeer(peer, "http://"+peer) + s.raftServer.AddPeer(peer, util.ServerToGrpcAddress(peer)) } - time.Sleep(time.Duration(1000+rand.Int31n(3000)) * time.Millisecond) - if s.raftServer.IsLogEmpty() { + + s.GrpcServer = raft.NewGrpcServer(s.raftServer) + + if s.raftServer.IsLogEmpty() && isTheFirstOne(serverAddr, s.peers) { // Initialize the server by joining itself. glog.V(0).Infoln("Initializing new cluster") _, err := s.raftServer.Do(&raft.DefaultJoinCommand{ Name: s.raftServer.Name(), - ConnectionString: "http://" + s.httpAddr, + ConnectionString: util.ServerToGrpcAddress(s.serverAddr), }) if err != nil { @@ -95,7 +90,7 @@ func (s *RaftServer) Peers() (members []string) { peers := s.raftServer.Peers() for _, p := range peers { - members = append(members, strings.TrimPrefix(p.ConnectionString, "http://")) + members = append(members, p.Name) } return @@ -114,7 +109,7 @@ func isPeersChanged(dir string, self string, peers []string) (oldPeers []string, } for _, p := range conf.Peers { - oldPeers = append(oldPeers, strings.TrimPrefix(p.ConnectionString, "http://")) + oldPeers = append(oldPeers, p.Name) } oldPeers = append(oldPeers, self) @@ -128,3 +123,11 @@ func isPeersChanged(dir string, self string, peers []string) (oldPeers []string, return oldPeers, !reflect.DeepEqual(peers, oldPeers) } + +func isTheFirstOne(self string, peers []string) bool { + sort.Strings(peers) + if len(peers) <= 0 { + return true + } + return self == peers[0] +} diff --git a/weed/server/raft_server_handlers.go b/weed/server/raft_server_handlers.go index 627fe354e..fd38cb977 100644 --- a/weed/server/raft_server_handlers.go +++ b/weed/server/raft_server_handlers.go @@ -1,16 +1,17 @@ package weed_server import ( - "github.com/chrislusf/seaweedfs/weed/operation" "net/http" ) -func (s *RaftServer) HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request)) { - s.router.HandleFunc(pattern, handler) +type ClusterStatusResult struct { + IsLeader bool `json:"IsLeader,omitempty"` + Leader string `json:"Leader,omitempty"` + Peers []string `json:"Peers,omitempty"` } -func (s *RaftServer) statusHandler(w http.ResponseWriter, r *http.Request) { - ret := operation.ClusterStatusResult{ +func (s *RaftServer) StatusHandler(w http.ResponseWriter, r *http.Request) { + ret := ClusterStatusResult{ IsLeader: s.topo.IsLeader(), Peers: s.Peers(), } diff --git a/weed/server/volume_grpc_admin.go b/weed/server/volume_grpc_admin.go index 429ca9b68..35c2508a6 100644 --- a/weed/server/volume_grpc_admin.go +++ b/weed/server/volume_grpc_admin.go @@ -5,7 +5,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" - "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" ) func (vs *VolumeServer) DeleteCollection(ctx context.Context, req *volume_server_pb.DeleteCollectionRequest) (*volume_server_pb.DeleteCollectionResponse, error) { @@ -24,12 +24,12 @@ func (vs *VolumeServer) DeleteCollection(ctx context.Context, req *volume_server } -func (vs *VolumeServer) AssignVolume(ctx context.Context, req *volume_server_pb.AssignVolumeRequest) (*volume_server_pb.AssignVolumeResponse, error) { +func (vs *VolumeServer) AllocateVolume(ctx context.Context, req *volume_server_pb.AllocateVolumeRequest) (*volume_server_pb.AllocateVolumeResponse, error) { - resp := &volume_server_pb.AssignVolumeResponse{} + resp := &volume_server_pb.AllocateVolumeResponse{} err := vs.store.AddVolume( - storage.VolumeId(req.VolumdId), + needle.VolumeId(req.VolumeId), req.Collection, vs.needleMapKind, req.Replication, @@ -51,7 +51,7 @@ func (vs *VolumeServer) VolumeMount(ctx context.Context, req *volume_server_pb.V resp := &volume_server_pb.VolumeMountResponse{} - err := vs.store.MountVolume(storage.VolumeId(req.VolumdId)) + err := vs.store.MountVolume(needle.VolumeId(req.VolumeId)) if err != nil { glog.Errorf("volume mount %v: %v", req, err) @@ -67,7 +67,7 @@ func (vs *VolumeServer) VolumeUnmount(ctx context.Context, req *volume_server_pb resp := &volume_server_pb.VolumeUnmountResponse{} - err := vs.store.UnmountVolume(storage.VolumeId(req.VolumdId)) + err := vs.store.UnmountVolume(needle.VolumeId(req.VolumeId)) if err != nil { glog.Errorf("volume unmount %v: %v", req, err) @@ -83,7 +83,7 @@ func (vs *VolumeServer) VolumeDelete(ctx context.Context, req *volume_server_pb. resp := &volume_server_pb.VolumeDeleteResponse{} - err := vs.store.DeleteVolume(storage.VolumeId(req.VolumdId)) + err := vs.store.DeleteVolume(needle.VolumeId(req.VolumeId)) if err != nil { glog.Errorf("volume delete %v: %v", req, err) @@ -94,3 +94,19 @@ func (vs *VolumeServer) VolumeDelete(ctx context.Context, req *volume_server_pb. return resp, err } + +func (vs *VolumeServer) VolumeMarkReadonly(ctx context.Context, req *volume_server_pb.VolumeMarkReadonlyRequest) (*volume_server_pb.VolumeMarkReadonlyResponse, error) { + + resp := &volume_server_pb.VolumeMarkReadonlyResponse{} + + err := vs.store.MarkVolumeReadonly(needle.VolumeId(req.VolumeId)) + + if err != nil { + glog.Errorf("volume mark readonly %v: %v", req, err) + } else { + glog.V(2).Infof("volume mark readonly %v", req) + } + + return resp, err + +} diff --git a/weed/server/volume_grpc_batch_delete.go b/weed/server/volume_grpc_batch_delete.go index 3554d97ae..d7fbb6edf 100644 --- a/weed/server/volume_grpc_batch_delete.go +++ b/weed/server/volume_grpc_batch_delete.go @@ -7,7 +7,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" - "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" ) func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.BatchDeleteRequest) (*volume_server_pb.BatchDeleteResponse, error) { @@ -26,8 +26,8 @@ func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.B continue } - n := new(storage.Needle) - volumeId, _ := storage.NewVolumeId(vid) + n := new(needle.Needle) + volumeId, _ := needle.NewVolumeId(vid) n.ParsePath(id_cookie) cookie := n.Cookie diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go index bd3ffd7b3..731675b48 100644 --- a/weed/server/volume_grpc_client_to_master.go +++ b/weed/server/volume_grpc_client_to_master.go @@ -2,11 +2,16 @@ package weed_server import ( "fmt" + "net" "time" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" + "github.com/spf13/viper" + "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" - "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util" "golang.org/x/net/context" ) @@ -16,41 +21,46 @@ func (vs *VolumeServer) GetMaster() string { } func (vs *VolumeServer) heartbeat() { - glog.V(0).Infof("Volume server start with masters: %v", vs.MasterNodes) + glog.V(0).Infof("Volume server start with seed master nodes: %v", vs.SeedMasterNodes) vs.store.SetDataCenter(vs.dataCenter) vs.store.SetRack(vs.rack) + grpcDialOption := security.LoadClientTLS(viper.Sub("grpc"), "volume") + var err error var newLeader string for { - for _, master := range vs.MasterNodes { + for _, master := range vs.SeedMasterNodes { if newLeader != "" { master = newLeader } - masterGrpcAddress, parseErr := util.ParseServerToGrpcAddress(master, 0) + masterGrpcAddress, parseErr := util.ParseServerToGrpcAddress(master) if parseErr != nil { - glog.V(0).Infof("failed to parse master grpc %v", masterGrpcAddress) + glog.V(0).Infof("failed to parse master grpc %v: %v", masterGrpcAddress, parseErr) continue } - newLeader, err = vs.doHeartbeat(master, masterGrpcAddress, time.Duration(vs.pulseSeconds)*time.Second) + vs.store.MasterAddress = master + newLeader, err = vs.doHeartbeat(context.Background(), master, masterGrpcAddress, grpcDialOption, time.Duration(vs.pulseSeconds)*time.Second) if err != nil { glog.V(0).Infof("heartbeat error: %v", err) time.Sleep(time.Duration(vs.pulseSeconds) * time.Second) + newLeader = "" + vs.store.MasterAddress = "" } } } } -func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, sleepInterval time.Duration) (newLeader string, err error) { +func (vs *VolumeServer) doHeartbeat(ctx context.Context, masterNode, masterGrpcAddress string, grpcDialOption grpc.DialOption, sleepInterval time.Duration) (newLeader string, err error) { - grpcConection, err := util.GrpcDial(masterGrpcAddress) + grpcConection, err := util.GrpcDial(ctx, masterGrpcAddress, grpcDialOption) if err != nil { return "", fmt.Errorf("fail to dial %s : %v", masterNode, err) } defer grpcConection.Close() client := master_pb.NewSeaweedClient(grpcConection) - stream, err := client.SendHeartbeat(context.Background()) + stream, err := client.SendHeartbeat(ctx) if err != nil { glog.V(0).Infof("SendHeartbeat to %s: %v", masterNode, err) return "", err @@ -58,9 +68,6 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, sleepI glog.V(0).Infof("Heartbeat to: %v", masterNode) vs.currentMaster = masterNode - vs.store.Client = stream - defer func() { vs.store.Client = nil }() - doneChan := make(chan error, 1) go func() { @@ -71,17 +78,18 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, sleepI return } if in.GetVolumeSizeLimit() != 0 { - vs.store.VolumeSizeLimit = in.GetVolumeSizeLimit() - } - if in.GetSecretKey() != "" { - vs.guard.SecretKey = security.Secret(in.GetSecretKey()) + vs.store.SetVolumeSizeLimit(in.GetVolumeSizeLimit()) } - if in.GetLeader() != "" && masterNode != in.GetLeader() { + if in.GetLeader() != "" && masterNode != in.GetLeader() && !isSameIP(in.GetLeader(), masterNode) { glog.V(0).Infof("Volume Server found a new master newLeader: %v instead of %v", in.GetLeader(), masterNode) newLeader = in.GetLeader() doneChan <- nil return } + if in.GetMetricsAddress() != "" && vs.MetricsAddress != in.GetMetricsAddress() { + vs.MetricsAddress = in.GetMetricsAddress() + vs.MetricsIntervalSec = int(in.GetMetricsIntervalSeconds()) + } } }() @@ -90,33 +98,89 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, sleepI return "", err } - tickChan := time.Tick(sleepInterval) + if err = stream.Send(vs.store.CollectErasureCodingHeartbeat()); err != nil { + glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err) + return "", err + } + + volumeTickChan := time.Tick(sleepInterval) + ecShardTickChan := time.Tick(17 * sleepInterval) for { select { - case vid := <-vs.store.NewVolumeIdChan: + case volumeMessage := <-vs.store.NewVolumesChan: + deltaBeat := &master_pb.Heartbeat{ + NewVolumes: []*master_pb.VolumeShortInformationMessage{ + &volumeMessage, + }, + } + glog.V(1).Infof("volume server %s:%d adds volume %d", vs.store.Ip, vs.store.Port, volumeMessage.Id) + if err = stream.Send(deltaBeat); err != nil { + glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterNode, err) + return "", err + } + case ecShardMessage := <-vs.store.NewEcShardsChan: + deltaBeat := &master_pb.Heartbeat{ + NewEcShards: []*master_pb.VolumeEcShardInformationMessage{ + &ecShardMessage, + }, + } + glog.V(1).Infof("volume server %s:%d adds ec shard %d:%d", vs.store.Ip, vs.store.Port, ecShardMessage.Id, + erasure_coding.ShardBits(ecShardMessage.EcIndexBits).ShardIds()) + if err = stream.Send(deltaBeat); err != nil { + glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterNode, err) + return "", err + } + case volumeMessage := <-vs.store.DeletedVolumesChan: deltaBeat := &master_pb.Heartbeat{ - NewVids: []uint32{uint32(vid)}, + DeletedVolumes: []*master_pb.VolumeShortInformationMessage{ + &volumeMessage, + }, } + glog.V(1).Infof("volume server %s:%d deletes volume %d", vs.store.Ip, vs.store.Port, volumeMessage.Id) if err = stream.Send(deltaBeat); err != nil { glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterNode, err) return "", err } - case vid := <-vs.store.DeletedVolumeIdChan: + case ecShardMessage := <-vs.store.DeletedEcShardsChan: deltaBeat := &master_pb.Heartbeat{ - DeletedVids: []uint32{uint32(vid)}, + DeletedEcShards: []*master_pb.VolumeEcShardInformationMessage{ + &ecShardMessage, + }, } + glog.V(1).Infof("volume server %s:%d deletes ec shard %d:%d", vs.store.Ip, vs.store.Port, ecShardMessage.Id, + erasure_coding.ShardBits(ecShardMessage.EcIndexBits).ShardIds()) if err = stream.Send(deltaBeat); err != nil { glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterNode, err) return "", err } - case <-tickChan: + case <-volumeTickChan: + glog.V(4).Infof("volume server %s:%d heartbeat", vs.store.Ip, vs.store.Port) if err = stream.Send(vs.store.CollectHeartbeat()); err != nil { glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err) return "", err } + case <-ecShardTickChan: + glog.V(4).Infof("volume server %s:%d ec heartbeat", vs.store.Ip, vs.store.Port) + if err = stream.Send(vs.store.CollectErasureCodingHeartbeat()); err != nil { + glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err) + return "", err + } case err = <-doneChan: return } } } + +func isSameIP(ip string, host string) bool { + ips, err := net.LookupIP(host) + if err != nil { + return false + } + for _, t := range ips { + if ip == t.String() { + return true + } + } + return false +} diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go new file mode 100644 index 000000000..8b39146ee --- /dev/null +++ b/weed/server/volume_grpc_copy.go @@ -0,0 +1,263 @@ +package weed_server + +import ( + "context" + "fmt" + "io" + "math" + "os" + "path" + "time" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/util" +) + +const BufferSizeLimit = 1024 * 1024 * 2 + +// VolumeCopy copy the .idx .dat files, and mount the volume +func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.VolumeCopyRequest) (*volume_server_pb.VolumeCopyResponse, error) { + + v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) + if v != nil { + return nil, fmt.Errorf("volume %d already exists", req.VolumeId) + } + + location := vs.store.FindFreeLocation() + if location == nil { + return nil, fmt.Errorf("no space left") + } + + // the master will not start compaction for read-only volumes, so it is safe to just copy files directly + // copy .dat and .idx files + // read .idx .dat file size and timestamp + // send .idx file + // send .dat file + // confirm size and timestamp + var volFileInfoResp *volume_server_pb.ReadVolumeFileStatusResponse + var volumeFileName, idxFileName, datFileName string + err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + var err error + volFileInfoResp, err = client.ReadVolumeFileStatus(ctx, + &volume_server_pb.ReadVolumeFileStatusRequest{ + VolumeId: req.VolumeId, + }) + if nil != err { + return fmt.Errorf("read volume file status failed, %v", err) + } + + volumeFileName = storage.VolumeFileName(location.Directory, volFileInfoResp.Collection, int(req.VolumeId)) + + // println("source:", volFileInfoResp.String()) + // copy ecx file + if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, volumeFileName, ".idx", false); err != nil { + return err + } + + if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".dat", false); err != nil { + return err + } + + return nil + }) + + idxFileName = volumeFileName + ".idx" + datFileName = volumeFileName + ".dat" + + if err != nil && volumeFileName != "" { + if idxFileName != "" { + os.Remove(idxFileName) + } + if datFileName != "" { + os.Remove(datFileName) + } + return nil, err + } + + if err = checkCopyFiles(volFileInfoResp, idxFileName, datFileName); err != nil { // added by panyc16 + return nil, err + } + + // mount the volume + err = vs.store.MountVolume(needle.VolumeId(req.VolumeId)) + if err != nil { + return nil, fmt.Errorf("failed to mount volume %d: %v", req.VolumeId, err) + } + + return &volume_server_pb.VolumeCopyResponse{ + LastAppendAtNs: volFileInfoResp.DatFileTimestampSeconds * uint64(time.Second), + }, err +} + +func (vs *VolumeServer) doCopyFile(ctx context.Context, client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid uint32, + compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend bool) error { + + copyFileClient, err := client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{ + VolumeId: vid, + Ext: ext, + CompactionRevision: compactRevision, + StopOffset: stopOffset, + Collection: collection, + IsEcVolume: isEcVolume, + }) + if err != nil { + return fmt.Errorf("failed to start copying volume %d %s file: %v", vid, ext, err) + } + + err = writeToFile(copyFileClient, baseFileName+ext, util.NewWriteThrottler(vs.compactionBytePerSecond), isAppend) + if err != nil { + return fmt.Errorf("failed to copy %s file: %v", baseFileName+ext, err) + } + + return nil + +} + +/** +only check the the differ of the file size +todo: maybe should check the received count and deleted count of the volume +*/ +func checkCopyFiles(originFileInf *volume_server_pb.ReadVolumeFileStatusResponse, idxFileName, datFileName string) error { + stat, err := os.Stat(idxFileName) + if err != nil { + return fmt.Errorf("stat idx file %s failed, %v", idxFileName, err) + } + if originFileInf.IdxFileSize != uint64(stat.Size()) { + return fmt.Errorf("idx file %s size [%v] is not same as origin file size [%v]", + idxFileName, stat.Size(), originFileInf.IdxFileSize) + } + + stat, err = os.Stat(datFileName) + if err != nil { + return fmt.Errorf("get dat file info failed, %v", err) + } + if originFileInf.DatFileSize != uint64(stat.Size()) { + return fmt.Errorf("the dat file size [%v] is not same as origin file size [%v]", + stat.Size(), originFileInf.DatFileSize) + } + return nil +} + +func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string, wt *util.WriteThrottler, isAppend bool) error { + glog.V(4).Infof("writing to %s", fileName) + flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC + if isAppend { + flags = os.O_WRONLY | os.O_CREATE + } + dst, err := os.OpenFile(fileName, flags, 0644) + if err != nil { + return nil + } + defer dst.Close() + + for { + resp, receiveErr := client.Recv() + if receiveErr == io.EOF { + break + } + if receiveErr != nil { + return fmt.Errorf("receiving %s: %v", fileName, receiveErr) + } + dst.Write(resp.FileContent) + wt.MaybeSlowdown(int64(len(resp.FileContent))) + } + return nil +} + +func (vs *VolumeServer) ReadVolumeFileStatus(ctx context.Context, req *volume_server_pb.ReadVolumeFileStatusRequest) (*volume_server_pb.ReadVolumeFileStatusResponse, error) { + resp := &volume_server_pb.ReadVolumeFileStatusResponse{} + v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) + if v == nil { + return nil, fmt.Errorf("not found volume id %d", req.VolumeId) + } + + resp.VolumeId = req.VolumeId + datSize, idxSize, modTime := v.FileStat() + resp.DatFileSize = datSize + resp.IdxFileSize = idxSize + resp.DatFileTimestampSeconds = uint64(modTime.Unix()) + resp.IdxFileTimestampSeconds = uint64(modTime.Unix()) + resp.FileCount = v.FileCount() + resp.CompactionRevision = uint32(v.CompactionRevision) + resp.Collection = v.Collection + return resp, nil +} + +// CopyFile client pulls the volume related file from the source server. +// if req.CompactionRevision != math.MaxUint32, it ensures the compact revision is as expected +// The copying still stop at req.StopOffset, but you can set it to math.MaxUint64 in order to read all data. +func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream volume_server_pb.VolumeServer_CopyFileServer) error { + + var fileName string + if !req.IsEcVolume { + v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) + if v == nil { + return fmt.Errorf("not found volume id %d", req.VolumeId) + } + + if uint32(v.CompactionRevision) != req.CompactionRevision && req.CompactionRevision != math.MaxUint32 { + return fmt.Errorf("volume %d is compacted", req.VolumeId) + } + fileName = v.FileName() + req.Ext + } else { + baseFileName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId)) + req.Ext + for _, location := range vs.store.Locations { + tName := path.Join(location.Directory, baseFileName) + if util.FileExists(tName) { + fileName = tName + } + } + if fileName == "" { + return fmt.Errorf("CopyFile not found ec volume id %d", req.VolumeId) + } + } + + bytesToRead := int64(req.StopOffset) + + file, err := os.Open(fileName) + if err != nil { + return err + } + defer file.Close() + + buffer := make([]byte, BufferSizeLimit) + + for bytesToRead > 0 { + bytesread, err := file.Read(buffer) + + // println(fileName, "read", bytesread, "bytes, with target", bytesToRead) + + if err != nil { + if err != io.EOF { + return err + } + // println(fileName, "read", bytesread, "bytes, with target", bytesToRead, "err", err.Error()) + break + } + + if int64(bytesread) > bytesToRead { + bytesread = int(bytesToRead) + } + err = stream.Send(&volume_server_pb.CopyFileResponse{ + FileContent: buffer[:bytesread], + }) + if err != nil { + // println("sending", bytesread, "bytes err", err.Error()) + return err + } + + bytesToRead -= int64(bytesread) + + } + + return nil +} + +func (vs *VolumeServer) findVolumeOrEcVolumeLocation(volumeId needle.VolumeId) { + +} diff --git a/weed/server/volume_grpc_copy_incremental.go b/weed/server/volume_grpc_copy_incremental.go new file mode 100644 index 000000000..f56fbeef4 --- /dev/null +++ b/weed/server/volume_grpc_copy_incremental.go @@ -0,0 +1,66 @@ +package weed_server + +import ( + "context" + "fmt" + "io" + "os" + + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/storage/needle" +) + +func (vs *VolumeServer) VolumeIncrementalCopy(req *volume_server_pb.VolumeIncrementalCopyRequest, stream volume_server_pb.VolumeServer_VolumeIncrementalCopyServer) error { + + v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) + if v == nil { + return fmt.Errorf("not found volume id %d", req.VolumeId) + } + + stopOffset, _, _ := v.FileStat() + foundOffset, isLastOne, err := v.BinarySearchByAppendAtNs(req.SinceNs) + if err != nil { + return fmt.Errorf("fail to locate by appendAtNs %d: %s", req.SinceNs, err) + } + + if isLastOne { + return nil + } + + startOffset := foundOffset.ToAcutalOffset() + + buf := make([]byte, 1024*1024*2) + return sendFileContent(v.DataFile(), buf, startOffset, int64(stopOffset), stream) + +} + +func (vs *VolumeServer) VolumeSyncStatus(ctx context.Context, req *volume_server_pb.VolumeSyncStatusRequest) (*volume_server_pb.VolumeSyncStatusResponse, error) { + + v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) + if v == nil { + return nil, fmt.Errorf("not found volume id %d", req.VolumeId) + } + + resp := v.GetVolumeSyncStatus() + + return resp, nil + +} + +func sendFileContent(datFile *os.File, buf []byte, startOffset, stopOffset int64, stream volume_server_pb.VolumeServer_VolumeIncrementalCopyServer) error { + var blockSizeLimit = int64(len(buf)) + for i := int64(0); i < stopOffset-startOffset; i += blockSizeLimit { + n, readErr := datFile.ReadAt(buf, startOffset+i) + if readErr == nil || readErr == io.EOF { + resp := &volume_server_pb.VolumeIncrementalCopyResponse{} + resp.FileContent = buf[:int64(n)] + sendErr := stream.Send(resp) + if sendErr != nil { + return sendErr + } + } else { + return readErr + } + } + return nil +} diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go new file mode 100644 index 000000000..8140a06f6 --- /dev/null +++ b/weed/server/volume_grpc_erasure_coding.go @@ -0,0 +1,313 @@ +package weed_server + +import ( + "context" + "fmt" + "io" + "io/ioutil" + "math" + "os" + "path" + "strings" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/types" + "github.com/chrislusf/seaweedfs/weed/util" +) + +/* + +Steps to apply erasure coding to .dat .idx files +0. ensure the volume is readonly +1. client call VolumeEcShardsGenerate to generate the .ecx and .ec01~.ec14 files +2. client ask master for possible servers to hold the ec files, at least 4 servers +3. client call VolumeEcShardsCopy on above target servers to copy ec files from the source server +4. target servers report the new ec files to the master +5. master stores vid -> [14]*DataNode +6. client checks master. If all 14 slices are ready, delete the original .idx, .idx files + +*/ + +// VolumeEcShardsGenerate generates the .ecx and .ec01 ~ .ec14 files +func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_server_pb.VolumeEcShardsGenerateRequest) (*volume_server_pb.VolumeEcShardsGenerateResponse, error) { + + v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) + if v == nil { + return nil, fmt.Errorf("volume %d not found", req.VolumeId) + } + baseFileName := v.FileName() + + if v.Collection != req.Collection { + return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection) + } + + // write .ecx file + if err := erasure_coding.WriteSortedEcxFile(baseFileName); err != nil { + return nil, fmt.Errorf("WriteSortedEcxFile %s: %v", baseFileName, err) + } + + // write .ec01 ~ .ec14 files + if err := erasure_coding.WriteEcFiles(baseFileName); err != nil { + return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err) + } + + return &volume_server_pb.VolumeEcShardsGenerateResponse{}, nil +} + +// VolumeEcShardsRebuild generates the any of the missing .ec01 ~ .ec14 files +func (vs *VolumeServer) VolumeEcShardsRebuild(ctx context.Context, req *volume_server_pb.VolumeEcShardsRebuildRequest) (*volume_server_pb.VolumeEcShardsRebuildResponse, error) { + + baseFileName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId)) + + var rebuiltShardIds []uint32 + + for _, location := range vs.store.Locations { + if util.FileExists(path.Join(location.Directory, baseFileName+".ecx")) { + // write .ec01 ~ .ec14 files + baseFileName = path.Join(location.Directory, baseFileName) + if generatedShardIds, err := erasure_coding.RebuildEcFiles(baseFileName); err != nil { + return nil, fmt.Errorf("RebuildEcFiles %s: %v", baseFileName, err) + } else { + rebuiltShardIds = generatedShardIds + } + + if err := erasure_coding.RebuildEcxFile(baseFileName); err != nil { + return nil, fmt.Errorf("RebuildEcxFile %s: %v", baseFileName, err) + } + + break + } + } + + return &volume_server_pb.VolumeEcShardsRebuildResponse{ + RebuiltShardIds: rebuiltShardIds, + }, nil +} + +// VolumeEcShardsCopy copy the .ecx and some ec data slices +func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_server_pb.VolumeEcShardsCopyRequest) (*volume_server_pb.VolumeEcShardsCopyResponse, error) { + + location := vs.store.FindFreeLocation() + if location == nil { + return nil, fmt.Errorf("no space left") + } + + baseFileName := storage.VolumeFileName(location.Directory, req.Collection, int(req.VolumeId)) + + err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + + // copy ec data slices + for _, shardId := range req.ShardIds { + if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, erasure_coding.ToExt(int(shardId)), false); err != nil { + return err + } + } + + if !req.CopyEcxFile { + return nil + } + + // copy ecx file + if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecx", false); err != nil { + return err + } + + // copy ecj file + if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecj", true); err != nil { + return err + } + + return nil + }) + if err != nil { + return nil, fmt.Errorf("VolumeEcShardsCopy volume %d: %v", req.VolumeId, err) + } + + return &volume_server_pb.VolumeEcShardsCopyResponse{}, nil +} + +// VolumeEcShardsDelete local delete the .ecx and some ec data slices if not needed +// the shard should not be mounted before calling this. +func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_server_pb.VolumeEcShardsDeleteRequest) (*volume_server_pb.VolumeEcShardsDeleteResponse, error) { + + baseFilename := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId)) + + found := false + for _, location := range vs.store.Locations { + if util.FileExists(path.Join(location.Directory, baseFilename+".ecx")) { + found = true + baseFilename = path.Join(location.Directory, baseFilename) + for _, shardId := range req.ShardIds { + os.Remove(baseFilename + erasure_coding.ToExt(int(shardId))) + } + break + } + } + + if !found { + return nil, nil + } + + // check whether to delete the ecx file also + hasEcxFile := false + existingShardCount := 0 + + for _, location := range vs.store.Locations { + fileInfos, err := ioutil.ReadDir(location.Directory) + if err != nil { + continue + } + for _, fileInfo := range fileInfos { + if fileInfo.Name() == baseFilename+".ecx" { + hasEcxFile = true + continue + } + if strings.HasPrefix(fileInfo.Name(), baseFilename+".ec") { + existingShardCount++ + } + } + } + + if hasEcxFile && existingShardCount == 0 { + if err := os.Remove(baseFilename + ".ecx"); err != nil { + return nil, err + } + if err := os.Remove(baseFilename + ".ecj"); err != nil { + return nil, err + } + } + + return &volume_server_pb.VolumeEcShardsDeleteResponse{}, nil +} + +func (vs *VolumeServer) VolumeEcShardsMount(ctx context.Context, req *volume_server_pb.VolumeEcShardsMountRequest) (*volume_server_pb.VolumeEcShardsMountResponse, error) { + + for _, shardId := range req.ShardIds { + err := vs.store.MountEcShards(req.Collection, needle.VolumeId(req.VolumeId), erasure_coding.ShardId(shardId)) + + if err != nil { + glog.Errorf("ec shard mount %v: %v", req, err) + } else { + glog.V(2).Infof("ec shard mount %v", req) + } + + if err != nil { + return nil, fmt.Errorf("mount %d.%d: %v", req.VolumeId, shardId, err) + } + } + + return &volume_server_pb.VolumeEcShardsMountResponse{}, nil +} + +func (vs *VolumeServer) VolumeEcShardsUnmount(ctx context.Context, req *volume_server_pb.VolumeEcShardsUnmountRequest) (*volume_server_pb.VolumeEcShardsUnmountResponse, error) { + + for _, shardId := range req.ShardIds { + err := vs.store.UnmountEcShards(needle.VolumeId(req.VolumeId), erasure_coding.ShardId(shardId)) + + if err != nil { + glog.Errorf("ec shard unmount %v: %v", req, err) + } else { + glog.V(2).Infof("ec shard unmount %v", req) + } + + if err != nil { + return nil, fmt.Errorf("unmount %d.%d: %v", req.VolumeId, shardId, err) + } + } + + return &volume_server_pb.VolumeEcShardsUnmountResponse{}, nil +} + +func (vs *VolumeServer) VolumeEcShardRead(req *volume_server_pb.VolumeEcShardReadRequest, stream volume_server_pb.VolumeServer_VolumeEcShardReadServer) error { + + ecVolume, found := vs.store.FindEcVolume(needle.VolumeId(req.VolumeId)) + if !found { + return fmt.Errorf("VolumeEcShardRead not found ec volume id %d", req.VolumeId) + } + ecShard, found := ecVolume.FindEcVolumeShard(erasure_coding.ShardId(req.ShardId)) + if !found { + return fmt.Errorf("not found ec shard %d.%d", req.VolumeId, req.ShardId) + } + + if req.FileKey != 0 { + _, size, _ := ecVolume.FindNeedleFromEcx(types.Uint64ToNeedleId(req.FileKey)) + if size == types.TombstoneFileSize { + return stream.Send(&volume_server_pb.VolumeEcShardReadResponse{ + IsDeleted: true, + }) + } + } + + bufSize := req.Size + if bufSize > BufferSizeLimit { + bufSize = BufferSizeLimit + } + buffer := make([]byte, bufSize) + + startOffset, bytesToRead := req.Offset, req.Size + + for bytesToRead > 0 { + bytesread, err := ecShard.ReadAt(buffer, startOffset) + + // println(fileName, "read", bytesread, "bytes, with target", bytesToRead) + if bytesread > 0 { + + if int64(bytesread) > bytesToRead { + bytesread = int(bytesToRead) + } + err = stream.Send(&volume_server_pb.VolumeEcShardReadResponse{ + Data: buffer[:bytesread], + }) + if err != nil { + // println("sending", bytesread, "bytes err", err.Error()) + return err + } + + bytesToRead -= int64(bytesread) + + } + + if err != nil { + if err != io.EOF { + return err + } + return nil + } + + } + + return nil + +} + +func (vs *VolumeServer) VolumeEcBlobDelete(ctx context.Context, req *volume_server_pb.VolumeEcBlobDeleteRequest) (*volume_server_pb.VolumeEcBlobDeleteResponse, error) { + + resp := &volume_server_pb.VolumeEcBlobDeleteResponse{} + + for _, location := range vs.store.Locations { + if localEcVolume, found := location.FindEcVolume(needle.VolumeId(req.VolumeId)); found { + + _, size, _, err := localEcVolume.LocateEcShardNeedle(types.NeedleId(req.FileKey), needle.Version(req.Version)) + if err != nil { + return nil, fmt.Errorf("locate in local ec volume: %v", err) + } + if size == types.TombstoneFileSize { + return resp, nil + } + + err = localEcVolume.DeleteNeedleFromEcx(types.NeedleId(req.FileKey)) + if err != nil { + return nil, err + } + + break + } + } + + return resp, nil +} diff --git a/weed/server/volume_grpc_sync.go b/weed/server/volume_grpc_sync.go deleted file mode 100644 index 5f56ec17d..000000000 --- a/weed/server/volume_grpc_sync.go +++ /dev/null @@ -1,101 +0,0 @@ -package weed_server - -import ( - "context" - "fmt" - - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" - "github.com/chrislusf/seaweedfs/weed/storage" - "github.com/chrislusf/seaweedfs/weed/storage/types" -) - -func (vs *VolumeServer) VolumeSyncStatus(ctx context.Context, req *volume_server_pb.VolumeSyncStatusRequest) (*volume_server_pb.VolumeSyncStatusResponse, error) { - - v := vs.store.GetVolume(storage.VolumeId(req.VolumdId)) - if v == nil { - return nil, fmt.Errorf("Not Found Volume Id %d", req.VolumdId) - } - - resp := v.GetVolumeSyncStatus() - - glog.V(2).Infof("volume sync status %d", req.VolumdId) - - return resp, nil - -} - -func (vs *VolumeServer) VolumeSyncIndex(req *volume_server_pb.VolumeSyncIndexRequest, stream volume_server_pb.VolumeServer_VolumeSyncIndexServer) error { - - v := vs.store.GetVolume(storage.VolumeId(req.VolumdId)) - if v == nil { - return fmt.Errorf("Not Found Volume Id %d", req.VolumdId) - } - - content, err := v.IndexFileContent() - - if err != nil { - glog.Errorf("sync volume %d index: %v", req.VolumdId, err) - } else { - glog.V(2).Infof("sync volume %d index", req.VolumdId) - } - - const blockSizeLimit = 1024 * 1024 * 2 - for i := 0; i < len(content); i += blockSizeLimit { - blockSize := len(content) - i - if blockSize > blockSizeLimit { - blockSize = blockSizeLimit - } - resp := &volume_server_pb.VolumeSyncIndexResponse{} - resp.IndexFileContent = content[i : i+blockSize] - stream.Send(resp) - } - - return nil - -} - -func (vs *VolumeServer) VolumeSyncData(req *volume_server_pb.VolumeSyncDataRequest, stream volume_server_pb.VolumeServer_VolumeSyncDataServer) error { - - v := vs.store.GetVolume(storage.VolumeId(req.VolumdId)) - if v == nil { - return fmt.Errorf("Not Found Volume Id %d", req.VolumdId) - } - - if uint32(v.SuperBlock.CompactRevision) != req.Revision { - return fmt.Errorf("Requested Volume Revision is %d, but current revision is %d", req.Revision, v.SuperBlock.CompactRevision) - } - - content, err := storage.ReadNeedleBlob(v.DataFile(), int64(req.Offset)*types.NeedlePaddingSize, req.Size, v.Version()) - if err != nil { - return fmt.Errorf("read offset:%d size:%d", req.Offset, req.Size) - } - - id, err := types.ParseNeedleId(req.NeedleId) - if err != nil { - return fmt.Errorf("parsing needle id %s: %v", req.NeedleId, err) - } - n := new(storage.Needle) - n.ParseNeedleHeader(content) - if id != n.Id { - return fmt.Errorf("Expected file entry id %d, but found %d", id, n.Id) - } - - if err != nil { - glog.Errorf("sync volume %d data: %v", req.VolumdId, err) - } - - const blockSizeLimit = 1024 * 1024 * 2 - for i := 0; i < len(content); i += blockSizeLimit { - blockSize := len(content) - i - if blockSize > blockSizeLimit { - blockSize = blockSizeLimit - } - resp := &volume_server_pb.VolumeSyncDataResponse{} - resp.FileContent = content[i : i+blockSize] - stream.Send(resp) - } - - return nil - -} diff --git a/weed/server/volume_grpc_tail.go b/weed/server/volume_grpc_tail.go new file mode 100644 index 000000000..698bad5b8 --- /dev/null +++ b/weed/server/volume_grpc_tail.go @@ -0,0 +1,117 @@ +package weed_server + +import ( + "context" + "fmt" + "time" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" +) + +func (vs *VolumeServer) VolumeTailSender(req *volume_server_pb.VolumeTailSenderRequest, stream volume_server_pb.VolumeServer_VolumeTailSenderServer) error { + + v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) + if v == nil { + return fmt.Errorf("not found volume id %d", req.VolumeId) + } + + defer glog.V(1).Infof("tailing volume %d finished", v.Id) + + lastTimestampNs := req.SinceNs + drainingSeconds := req.IdleTimeoutSeconds + + for { + lastProcessedTimestampNs, err := sendNeedlesSince(stream, v, lastTimestampNs) + if err != nil { + glog.Infof("sendNeedlesSince: %v", err) + return fmt.Errorf("streamFollow: %v", err) + } + time.Sleep(2 * time.Second) + + if req.IdleTimeoutSeconds == 0 { + lastTimestampNs = lastProcessedTimestampNs + continue + } + if lastProcessedTimestampNs == lastTimestampNs { + drainingSeconds-- + if drainingSeconds <= 0 { + return nil + } + glog.V(1).Infof("tailing volume %d drains requests with %d seconds remaining", v.Id, drainingSeconds) + } else { + lastTimestampNs = lastProcessedTimestampNs + drainingSeconds = req.IdleTimeoutSeconds + glog.V(1).Infof("tailing volume %d resets draining wait time to %d seconds", v.Id, drainingSeconds) + } + + } + +} + +func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeTailSenderServer, v *storage.Volume, lastTimestampNs uint64) (lastProcessedTimestampNs uint64, err error) { + + foundOffset, isLastOne, err := v.BinarySearchByAppendAtNs(lastTimestampNs) + if err != nil { + return 0, fmt.Errorf("fail to locate by appendAtNs %d: %s", lastTimestampNs, err) + } + + // log.Printf("reading ts %d offset %d isLast %v", lastTimestampNs, foundOffset, isLastOne) + + if isLastOne { + // need to heart beat to the client to ensure the connection health + sendErr := stream.Send(&volume_server_pb.VolumeTailSenderResponse{IsLastChunk: true}) + return lastTimestampNs, sendErr + } + + err = storage.ScanVolumeFileNeedleFrom(v.Version(), v.DataFile(), foundOffset.ToAcutalOffset(), func(needleHeader, needleBody []byte, needleAppendAtNs uint64) error { + + isLastChunk := false + + // need to send body by chunks + for i := 0; i < len(needleBody); i += BufferSizeLimit { + stopOffset := i + BufferSizeLimit + if stopOffset >= len(needleBody) { + isLastChunk = true + stopOffset = len(needleBody) + } + + sendErr := stream.Send(&volume_server_pb.VolumeTailSenderResponse{ + NeedleHeader: needleHeader, + NeedleBody: needleBody[i:stopOffset], + IsLastChunk: isLastChunk, + }) + if sendErr != nil { + return sendErr + } + } + + lastProcessedTimestampNs = needleAppendAtNs + return nil + + }) + + return + +} + +func (vs *VolumeServer) VolumeTailReceiver(ctx context.Context, req *volume_server_pb.VolumeTailReceiverRequest) (*volume_server_pb.VolumeTailReceiverResponse, error) { + + resp := &volume_server_pb.VolumeTailReceiverResponse{} + + v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) + if v == nil { + return resp, fmt.Errorf("receiver not found volume id %d", req.VolumeId) + } + + defer glog.V(1).Infof("receive tailing volume %d finished", v.Id) + + return resp, operation.TailVolumeFromSource(req.SourceVolumeServer, vs.grpcDialOption, v.Id, req.SinceNs, int(req.IdleTimeoutSeconds), func(n *needle.Needle) error { + _, _, err := vs.store.Write(v.Id, n) + return err + }) + +} diff --git a/weed/server/volume_grpc_vacuum.go b/weed/server/volume_grpc_vacuum.go index f0c87b582..24f982241 100644 --- a/weed/server/volume_grpc_vacuum.go +++ b/weed/server/volume_grpc_vacuum.go @@ -5,19 +5,19 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" - "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" ) func (vs *VolumeServer) VacuumVolumeCheck(ctx context.Context, req *volume_server_pb.VacuumVolumeCheckRequest) (*volume_server_pb.VacuumVolumeCheckResponse, error) { resp := &volume_server_pb.VacuumVolumeCheckResponse{} - garbageRatio, err := vs.store.CheckCompactVolume(storage.VolumeId(req.VolumdId)) + garbageRatio, err := vs.store.CheckCompactVolume(needle.VolumeId(req.VolumeId)) resp.GarbageRatio = garbageRatio if err != nil { - glog.V(3).Infof("check volume %d: %v", req.VolumdId, err) + glog.V(3).Infof("check volume %d: %v", req.VolumeId, err) } return resp, err @@ -28,12 +28,12 @@ func (vs *VolumeServer) VacuumVolumeCompact(ctx context.Context, req *volume_ser resp := &volume_server_pb.VacuumVolumeCompactResponse{} - err := vs.store.CompactVolume(storage.VolumeId(req.VolumdId), req.Preallocate) + err := vs.store.CompactVolume(needle.VolumeId(req.VolumeId), req.Preallocate, vs.compactionBytePerSecond) if err != nil { - glog.Errorf("compact volume %d: %v", req.VolumdId, err) + glog.Errorf("compact volume %d: %v", req.VolumeId, err) } else { - glog.V(1).Infof("compact volume %d", req.VolumdId) + glog.V(1).Infof("compact volume %d", req.VolumeId) } return resp, err @@ -44,12 +44,12 @@ func (vs *VolumeServer) VacuumVolumeCommit(ctx context.Context, req *volume_serv resp := &volume_server_pb.VacuumVolumeCommitResponse{} - err := vs.store.CommitCompactVolume(storage.VolumeId(req.VolumdId)) + err := vs.store.CommitCompactVolume(needle.VolumeId(req.VolumeId)) if err != nil { - glog.Errorf("commit volume %d: %v", req.VolumdId, err) + glog.Errorf("commit volume %d: %v", req.VolumeId, err) } else { - glog.V(1).Infof("commit volume %d", req.VolumdId) + glog.V(1).Infof("commit volume %d", req.VolumeId) } return resp, err @@ -60,12 +60,12 @@ func (vs *VolumeServer) VacuumVolumeCleanup(ctx context.Context, req *volume_ser resp := &volume_server_pb.VacuumVolumeCleanupResponse{} - err := vs.store.CommitCleanupVolume(storage.VolumeId(req.VolumdId)) + err := vs.store.CommitCleanupVolume(needle.VolumeId(req.VolumeId)) if err != nil { - glog.Errorf("cleanup volume %d: %v", req.VolumdId, err) + glog.Errorf("cleanup volume %d: %v", req.VolumeId, err) } else { - glog.V(1).Infof("cleanup volume %d", req.VolumdId) + glog.V(1).Infof("cleanup volume %d", req.VolumeId) } return resp, err diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go index 0914e81b0..6cf654738 100644 --- a/weed/server/volume_server.go +++ b/weed/server/volume_server.go @@ -1,25 +1,34 @@ package weed_server import ( + "fmt" "net/http" + "github.com/chrislusf/seaweedfs/weed/stats" + "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/spf13/viper" ) type VolumeServer struct { - MasterNodes []string - currentMaster string - pulseSeconds int - dataCenter string - rack string - store *storage.Store - guard *security.Guard + SeedMasterNodes []string + currentMaster string + pulseSeconds int + dataCenter string + rack string + store *storage.Store + guard *security.Guard + grpcDialOption grpc.DialOption - needleMapKind storage.NeedleMapType - FixJpgOrientation bool - ReadRedirect bool + needleMapKind storage.NeedleMapType + FixJpgOrientation bool + ReadRedirect bool + compactionBytePerSecond int64 + MetricsAddress string + MetricsIntervalSec int } func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, @@ -30,26 +39,44 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, dataCenter string, rack string, whiteList []string, fixJpgOrientation bool, - readRedirect bool) *VolumeServer { + readRedirect bool, + compactionMBPerSecond int, +) *VolumeServer { + + v := viper.GetViper() + signingKey := v.GetString("jwt.signing.key") + v.SetDefault("jwt.signing.expires_after_seconds", 10) + expiresAfterSec := v.GetInt("jwt.signing.expires_after_seconds") + enableUiAccess := v.GetBool("access.ui") + + readSigningKey := v.GetString("jwt.signing.read.key") + v.SetDefault("jwt.signing.read.expires_after_seconds", 60) + readExpiresAfterSec := v.GetInt("jwt.signing.read.expires_after_seconds") + vs := &VolumeServer{ - pulseSeconds: pulseSeconds, - dataCenter: dataCenter, - rack: rack, - needleMapKind: needleMapKind, - FixJpgOrientation: fixJpgOrientation, - ReadRedirect: readRedirect, + pulseSeconds: pulseSeconds, + dataCenter: dataCenter, + rack: rack, + needleMapKind: needleMapKind, + FixJpgOrientation: fixJpgOrientation, + ReadRedirect: readRedirect, + grpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "volume"), + compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024, } - vs.MasterNodes = masterNodes - vs.store = storage.NewStore(port, ip, publicUrl, folders, maxCounts, vs.needleMapKind) + vs.SeedMasterNodes = masterNodes + vs.store = storage.NewStore(vs.grpcDialOption, port, ip, publicUrl, folders, maxCounts, vs.needleMapKind) - vs.guard = security.NewGuard(whiteList, "") + vs.guard = security.NewGuard(whiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec) handleStaticResources(adminMux) - adminMux.HandleFunc("/ui/index.html", vs.uiStatusHandler) - adminMux.HandleFunc("/status", vs.guard.WhiteList(vs.statusHandler)) - adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler)) - adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler)) - adminMux.HandleFunc("/stats/disk", vs.guard.WhiteList(vs.statsDiskHandler)) + if signingKey == "" || enableUiAccess { + // only expose the volume server details for safe environments + adminMux.HandleFunc("/ui/index.html", vs.uiStatusHandler) + adminMux.HandleFunc("/status", vs.guard.WhiteList(vs.statusHandler)) + adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler)) + adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler)) + adminMux.HandleFunc("/stats/disk", vs.guard.WhiteList(vs.statsDiskHandler)) + } adminMux.HandleFunc("/", vs.privateStoreHandler) if publicMux != adminMux { // separated admin and public port @@ -58,6 +85,11 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, } go vs.heartbeat() + hostAddress := fmt.Sprintf("%s:%d", ip, port) + go stats.LoopPushingMetric("volumeServer", hostAddress, stats.VolumeServerGather, + func() (addr string, intervalSeconds int) { + return vs.MetricsAddress, vs.MetricsIntervalSec + }) return vs } @@ -67,7 +99,3 @@ func (vs *VolumeServer) Shutdown() { vs.store.Close() glog.V(0).Infoln("Shut down successfully!") } - -func (vs *VolumeServer) jwt(fileId string) security.EncodedJwt { - return security.GenJwt(vs.guard.SecretKey, fileId) -} diff --git a/weed/server/volume_server_handlers.go b/weed/server/volume_server_handlers.go index 77b1274fd..14ad27d42 100644 --- a/weed/server/volume_server_handlers.go +++ b/weed/server/volume_server_handlers.go @@ -2,7 +2,10 @@ package weed_server import ( "net/http" + "strings" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/stats" ) @@ -45,3 +48,47 @@ func (vs *VolumeServer) publicReadOnlyHandler(w http.ResponseWriter, r *http.Req vs.GetOrHeadHandler(w, r) } } + +func (vs *VolumeServer) maybeCheckJwtAuthorization(r *http.Request, vid, fid string, isWrite bool) bool { + + var signingKey security.SigningKey + + if isWrite { + if len(vs.guard.SigningKey) == 0 { + return true + } else { + signingKey = vs.guard.SigningKey + } + } else { + if len(vs.guard.ReadSigningKey) == 0 { + return true + } else { + signingKey = vs.guard.ReadSigningKey + } + } + + tokenStr := security.GetJwt(r) + if tokenStr == "" { + glog.V(1).Infof("missing jwt from %s", r.RemoteAddr) + return false + } + + token, err := security.DecodeJwt(signingKey, tokenStr) + if err != nil { + glog.V(1).Infof("jwt verification error from %s: %v", r.RemoteAddr, err) + return false + } + if !token.Valid { + glog.V(1).Infof("jwt invalid from %s: %v", r.RemoteAddr, tokenStr) + return false + } + + if sc, ok := token.Claims.(*security.SeaweedFileIdClaims); ok { + if sepIndex := strings.LastIndex(fid, "_"); sepIndex > 0 { + fid = fid[:sepIndex] + } + return sc.Fid == vid+","+fid + } + glog.V(1).Infof("unexpected jwt from %s: %v", r.RemoteAddr, tokenStr) + return false +} diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go index 92c728141..f30ffefaf 100644 --- a/weed/server/volume_server_handlers_read.go +++ b/weed/server/volume_server_handlers_read.go @@ -2,6 +2,8 @@ package weed_server import ( "bytes" + "context" + "errors" "io" "mime" "mime/multipart" @@ -17,16 +19,28 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/images" "github.com/chrislusf/seaweedfs/weed/operation" - "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/stats" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/util" ) var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"") func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) { - n := new(storage.Needle) + + stats.VolumeServerRequestCounter.WithLabelValues("get").Inc() + start := time.Now() + defer func() { stats.VolumeServerRequestHistogram.WithLabelValues("get").Observe(time.Since(start).Seconds()) }() + + n := new(needle.Needle) vid, fid, filename, ext, _ := parseURLPath(r.URL.Path) - volumeId, err := storage.NewVolumeId(vid) + + if !vs.maybeCheckJwtAuthorization(r, vid, fid, false) { + writeJsonError(w, r, http.StatusUnauthorized, errors.New("wrong jwt")) + return + } + + volumeId, err := needle.NewVolumeId(vid) if err != nil { glog.V(2).Infoln("parsing error:", err, r.URL.Path) w.WriteHeader(http.StatusBadRequest) @@ -40,7 +54,9 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) } glog.V(4).Infoln("volume", volumeId, "reading", n) - if !vs.store.HasVolume(volumeId) { + hasVolume := vs.store.HasVolume(volumeId) + _, hasEcVolume := vs.store.FindEcVolume(volumeId) + if !hasVolume && !hasEcVolume { if !vs.ReadRedirect { glog.V(2).Infoln("volume is not local:", err, r.URL.Path) w.WriteHeader(http.StatusNotFound) @@ -65,10 +81,15 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) return } cookie := n.Cookie - count, e := vs.store.ReadVolumeNeedle(volumeId, n) - glog.V(4).Infoln("read bytes", count, "error", e) - if e != nil || count < 0 { - glog.V(0).Infof("read %s error: %v", r.URL.Path, e) + var count int + if hasVolume { + count, err = vs.store.ReadVolumeNeedle(volumeId, n) + } else if hasEcVolume { + count, err = vs.store.ReadEcShardNeedle(context.Background(), volumeId, n) + } + glog.V(4).Infoln("read bytes", count, "error", err) + if err != nil || count < 0 { + glog.V(0).Infof("read %s isNormalVolume %v error: %v", r.URL.Path, hasVolume, err) w.WriteHeader(http.StatusNotFound) return } @@ -132,7 +153,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") { w.Header().Set("Content-Encoding", "gzip") } else { - if n.Data, err = operation.UnGzipData(n.Data); err != nil { + if n.Data, err = util.UnGzipData(n.Data); err != nil { glog.V(0).Infoln("ungzip error:", err, r.URL.Path) } } @@ -146,7 +167,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) } } -func (vs *VolumeServer) tryHandleChunkedFile(n *storage.Needle, fileName string, w http.ResponseWriter, r *http.Request) (processed bool) { +func (vs *VolumeServer) tryHandleChunkedFile(n *needle.Needle, fileName string, w http.ResponseWriter, r *http.Request) (processed bool) { if !n.IsChunkedManifest() || r.URL.Query().Get("cm") == "false" { return false } diff --git a/weed/server/volume_server_handlers_ui.go b/weed/server/volume_server_handlers_ui.go index b3d9a21fd..852f0b751 100644 --- a/weed/server/volume_server_handlers_ui.go +++ b/weed/server/volume_server_handlers_ui.go @@ -24,13 +24,15 @@ func (vs *VolumeServer) uiStatusHandler(w http.ResponseWriter, r *http.Request) Version string Masters []string Volumes interface{} + EcVolumes interface{} DiskStatuses interface{} Stats interface{} Counters *stats.ServerStats }{ util.VERSION, - vs.MasterNodes, + vs.SeedMasterNodes, vs.store.Status(), + vs.store.EcVolumes(), ds, infos, serverStats, diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go index fd93142e1..db8fcb555 100644 --- a/weed/server/volume_server_handlers_write.go +++ b/weed/server/volume_server_handlers_write.go @@ -1,6 +1,7 @@ package weed_server import ( + "context" "errors" "fmt" "net/http" @@ -10,36 +11,53 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" - "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/stats" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/topology" ) func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) { + + stats.VolumeServerRequestCounter.WithLabelValues("post").Inc() + start := time.Now() + defer func() { + stats.VolumeServerRequestHistogram.WithLabelValues("post").Observe(time.Since(start).Seconds()) + }() + if e := r.ParseForm(); e != nil { glog.V(0).Infoln("form parse error:", e) writeJsonError(w, r, http.StatusBadRequest, e) return } - vid, _, _, _, _ := parseURLPath(r.URL.Path) - volumeId, ve := storage.NewVolumeId(vid) + + vid, fid, _, _, _ := parseURLPath(r.URL.Path) + volumeId, ve := needle.NewVolumeId(vid) if ve != nil { glog.V(0).Infoln("NewVolumeId error:", ve) writeJsonError(w, r, http.StatusBadRequest, ve) return } - needle, originalSize, ne := storage.CreateNeedleFromRequest(r, vs.FixJpgOrientation) + + if !vs.maybeCheckJwtAuthorization(r, vid, fid, true) { + writeJsonError(w, r, http.StatusUnauthorized, errors.New("wrong jwt")) + return + } + + needle, originalSize, ne := needle.CreateNeedleFromRequest(r, vs.FixJpgOrientation) if ne != nil { writeJsonError(w, r, http.StatusBadRequest, ne) return } ret := operation.UploadResult{} - _, errorStatus := topology.ReplicatedWrite(vs.GetMaster(), - vs.store, volumeId, needle, r) + _, isUnchanged, writeError := topology.ReplicatedWrite(vs.GetMaster(), vs.store, volumeId, needle, r) httpStatus := http.StatusCreated - if errorStatus != "" { + if isUnchanged { + httpStatus = http.StatusNotModified + } + if writeError != nil { httpStatus = http.StatusInternalServerError - ret.Error = errorStatus + ret.Error = writeError.Error() } if needle.HasName() { ret.Name = string(needle.Name) @@ -51,15 +69,35 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) { } func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { - n := new(storage.Needle) + + stats.VolumeServerRequestCounter.WithLabelValues("delete").Inc() + start := time.Now() + defer func() { + stats.VolumeServerRequestHistogram.WithLabelValues("delete").Observe(time.Since(start).Seconds()) + }() + + n := new(needle.Needle) vid, fid, _, _, _ := parseURLPath(r.URL.Path) - volumeId, _ := storage.NewVolumeId(vid) + volumeId, _ := needle.NewVolumeId(vid) n.ParsePath(fid) + if !vs.maybeCheckJwtAuthorization(r, vid, fid, true) { + writeJsonError(w, r, http.StatusUnauthorized, errors.New("wrong jwt")) + return + } + // glog.V(2).Infof("volume %s deleting %s", vid, n) cookie := n.Cookie + ecVolume, hasEcVolume := vs.store.FindEcVolume(volumeId) + + if hasEcVolume { + count, err := vs.store.DeleteEcShardNeedle(context.Background(), ecVolume, n, cookie) + writeDeleteResult(err, count, w, r) + return + } + _, ok := vs.store.ReadVolumeNeedle(volumeId, n) if ok != nil { m := make(map[string]uint32) @@ -83,7 +121,7 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { return } // make sure all chunks had deleted before delete manifest - if e := chunkManifest.DeleteChunks(vs.GetMaster()); e != nil { + if e := chunkManifest.DeleteChunks(vs.GetMaster(), vs.grpcDialOption); e != nil { writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("Delete chunks error: %v", e)) return } @@ -100,6 +138,11 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { _, err := topology.ReplicatedDelete(vs.GetMaster(), vs.store, volumeId, n, r) + writeDeleteResult(err, count, w, r) + +} + +func writeDeleteResult(err error, count int64, w http.ResponseWriter, r *http.Request) { if err == nil { m := make(map[string]int64) m["size"] = count @@ -107,7 +150,6 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { } else { writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("Deletion Failed: %v", err)) } - } func setEtag(w http.ResponseWriter, etag string) { diff --git a/weed/server/volume_server_ui/templates.go b/weed/server/volume_server_ui/templates.go index b9740510f..eafc0aaeb 100644 --- a/weed/server/volume_server_ui/templates.go +++ b/weed/server/volume_server_ui/templates.go @@ -128,6 +128,32 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC </table> </div> + <div class="row"> + <h2>Erasure Coding Shards</h2> + <table class="table table-striped"> + <thead> + <tr> + <th>Id</th> + <th>Collection</th> + <th>Shard Size</th> + <th>Shards</th> + <th>CreatedAt</th> + </tr> + </thead> + <tbody> + {{ range .EcVolumes }} + <tr> + <td><code>{{ .VolumeId }}</code></td> + <td>{{ .Collection }}</td> + <td>{{ .ShardSize }} Bytes</td> + <td>{{ .ShardIdList }}</td> + <td>{{ .CreatedAt.Format "02 Jan 06 15:04 -0700" }}</td> + </tr> + {{ end }} + </tbody> + </table> + </div> + </div> </body> </html> diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go new file mode 100644 index 000000000..151b48a78 --- /dev/null +++ b/weed/server/webdav_server.go @@ -0,0 +1,578 @@ +package weed_server + +import ( + "bytes" + "context" + "fmt" + "io" + "os" + "path" + "strings" + "time" + + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" + "golang.org/x/net/webdav" + "google.golang.org/grpc" + + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/spf13/viper" +) + +type WebDavOption struct { + Filer string + FilerGrpcAddress string + DomainName string + BucketsPath string + GrpcDialOption grpc.DialOption + Collection string + Uid uint32 + Gid uint32 +} + +type WebDavServer struct { + option *WebDavOption + secret security.SigningKey + filer *filer2.Filer + grpcDialOption grpc.DialOption + Handler *webdav.Handler +} + +func NewWebDavServer(option *WebDavOption) (ws *WebDavServer, err error) { + + fs, _ := NewWebDavFileSystem(option) + + ws = &WebDavServer{ + option: option, + grpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "filer"), + Handler: &webdav.Handler{ + FileSystem: fs, + LockSystem: webdav.NewMemLS(), + }, + } + + return ws, nil +} + +// adapted from https://github.com/mattn/davfs/blob/master/plugin/mysql/mysql.go + +type WebDavFileSystem struct { + option *WebDavOption + secret security.SigningKey + filer *filer2.Filer + grpcDialOption grpc.DialOption +} + +type FileInfo struct { + name string + size int64 + mode os.FileMode + modifiledTime time.Time + isDirectory bool +} + +func (fi *FileInfo) Name() string { return fi.name } +func (fi *FileInfo) Size() int64 { return fi.size } +func (fi *FileInfo) Mode() os.FileMode { return fi.mode } +func (fi *FileInfo) ModTime() time.Time { return fi.modifiledTime } +func (fi *FileInfo) IsDir() bool { return fi.isDirectory } +func (fi *FileInfo) Sys() interface{} { return nil } + +type WebDavFile struct { + fs *WebDavFileSystem + name string + isDirectory bool + off int64 + entry *filer_pb.Entry + entryViewCache []filer2.VisibleInterval +} + +func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) { + return &WebDavFileSystem{ + option: option, + }, nil +} + +func (fs *WebDavFileSystem) WithFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error { + + return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error { + client := filer_pb.NewSeaweedFilerClient(grpcConnection) + return fn(client) + }, fs.option.FilerGrpcAddress, fs.option.GrpcDialOption) + +} + +func clearName(name string) (string, error) { + slashed := strings.HasSuffix(name, "/") + name = path.Clean(name) + if !strings.HasSuffix(name, "/") && slashed { + name += "/" + } + if !strings.HasPrefix(name, "/") { + return "", os.ErrInvalid + } + return name, nil +} + +func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm os.FileMode) error { + + glog.V(2).Infof("WebDavFileSystem.Mkdir %v", fullDirPath) + + if !strings.HasSuffix(fullDirPath, "/") { + fullDirPath += "/" + } + + var err error + if fullDirPath, err = clearName(fullDirPath); err != nil { + return err + } + + _, err = fs.stat(ctx, fullDirPath) + if err == nil { + return os.ErrExist + } + + return fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + dir, name := filer2.FullPath(fullDirPath).DirAndName() + request := &filer_pb.CreateEntryRequest{ + Directory: dir, + Entry: &filer_pb.Entry{ + Name: name, + IsDirectory: true, + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Unix(), + Crtime: time.Now().Unix(), + FileMode: uint32(perm | os.ModeDir), + Uid: fs.option.Uid, + Gid: fs.option.Gid, + }, + }, + } + + glog.V(1).Infof("mkdir: %v", request) + if _, err := client.CreateEntry(ctx, request); err != nil { + return fmt.Errorf("mkdir %s/%s: %v", dir, name, err) + } + + return nil + }) +} + +func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, flag int, perm os.FileMode) (webdav.File, error) { + + glog.V(2).Infof("WebDavFileSystem.OpenFile %v", fullFilePath) + + var err error + if fullFilePath, err = clearName(fullFilePath); err != nil { + return nil, err + } + + if flag&os.O_CREATE != 0 { + // file should not have / suffix. + if strings.HasSuffix(fullFilePath, "/") { + return nil, os.ErrInvalid + } + // based directory should be exists. + dir, _ := path.Split(fullFilePath) + _, err := fs.stat(ctx, dir) + if err != nil { + return nil, os.ErrInvalid + } + _, err = fs.stat(ctx, fullFilePath) + if err == nil { + if flag&os.O_EXCL != 0 { + return nil, os.ErrExist + } + fs.removeAll(ctx, fullFilePath) + } + + dir, name := filer2.FullPath(fullFilePath).DirAndName() + err = fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + if _, err := client.CreateEntry(ctx, &filer_pb.CreateEntryRequest{ + Directory: dir, + Entry: &filer_pb.Entry{ + Name: name, + IsDirectory: perm&os.ModeDir > 0, + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Unix(), + Crtime: time.Now().Unix(), + FileMode: uint32(perm), + Uid: fs.option.Uid, + Gid: fs.option.Gid, + Collection: fs.option.Collection, + Replication: "000", + TtlSec: 0, + }, + }, + }); err != nil { + return fmt.Errorf("create %s: %v", fullFilePath, err) + } + return nil + }) + if err != nil { + return nil, err + } + return &WebDavFile{ + fs: fs, + name: fullFilePath, + isDirectory: false, + }, nil + } + + fi, err := fs.stat(ctx, fullFilePath) + if err != nil { + return nil, os.ErrNotExist + } + if !strings.HasSuffix(fullFilePath, "/") && fi.IsDir() { + fullFilePath += "/" + } + + return &WebDavFile{ + fs: fs, + name: fullFilePath, + isDirectory: false, + }, nil + +} + +func (fs *WebDavFileSystem) removeAll(ctx context.Context, fullFilePath string) error { + var err error + if fullFilePath, err = clearName(fullFilePath); err != nil { + return err + } + + fi, err := fs.stat(ctx, fullFilePath) + if err != nil { + return err + } + + if fi.IsDir() { + //_, err = fs.db.Exec(`delete from filesystem where fullFilePath like $1 escape '\'`, strings.Replace(fullFilePath, `%`, `\%`, -1)+`%`) + } else { + //_, err = fs.db.Exec(`delete from filesystem where fullFilePath = ?`, fullFilePath) + } + dir, name := filer2.FullPath(fullFilePath).DirAndName() + err = fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + + request := &filer_pb.DeleteEntryRequest{ + Directory: dir, + Name: name, + IsDeleteData: true, + } + + glog.V(3).Infof("removing entry: %v", request) + _, err := client.DeleteEntry(ctx, request) + if err != nil { + return fmt.Errorf("remove %s: %v", fullFilePath, err) + } + + return nil + }) + return err +} + +func (fs *WebDavFileSystem) RemoveAll(ctx context.Context, name string) error { + + glog.V(2).Infof("WebDavFileSystem.RemoveAll %v", name) + + return fs.removeAll(ctx, name) +} + +func (fs *WebDavFileSystem) Rename(ctx context.Context, oldName, newName string) error { + + glog.V(2).Infof("WebDavFileSystem.Rename %v to %v", oldName, newName) + + var err error + if oldName, err = clearName(oldName); err != nil { + return err + } + if newName, err = clearName(newName); err != nil { + return err + } + + of, err := fs.stat(ctx, oldName) + if err != nil { + return os.ErrExist + } + if of.IsDir() { + if strings.HasSuffix(oldName, "/") { + oldName = strings.TrimRight(oldName, "/") + } + if strings.HasSuffix(newName, "/") { + newName = strings.TrimRight(newName, "/") + } + } + + _, err = fs.stat(ctx, newName) + if err == nil { + return os.ErrExist + } + + oldDir, oldBaseName := filer2.FullPath(oldName).DirAndName() + newDir, newBaseName := filer2.FullPath(newName).DirAndName() + + return fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + + request := &filer_pb.AtomicRenameEntryRequest{ + OldDirectory: oldDir, + OldName: oldBaseName, + NewDirectory: newDir, + NewName: newBaseName, + } + + _, err := client.AtomicRenameEntry(ctx, request) + if err != nil { + return fmt.Errorf("renaming %s/%s => %s/%s: %v", oldDir, oldBaseName, newDir, newBaseName, err) + } + + return nil + + }) +} + +func (fs *WebDavFileSystem) stat(ctx context.Context, fullFilePath string) (os.FileInfo, error) { + var err error + if fullFilePath, err = clearName(fullFilePath); err != nil { + return nil, err + } + + var fi FileInfo + entry, err := filer2.GetEntry(ctx, fs, fullFilePath) + if entry == nil { + return nil, os.ErrNotExist + } + if err != nil { + return nil, err + } + fi.size = int64(filer2.TotalSize(entry.GetChunks())) + fi.name = fullFilePath + fi.mode = os.FileMode(entry.Attributes.FileMode) + fi.modifiledTime = time.Unix(entry.Attributes.Mtime, 0) + fi.isDirectory = entry.IsDirectory + + _, fi.name = path.Split(path.Clean(fi.name)) + if fi.name == "" { + fi.name = "/" + fi.modifiledTime = time.Now() + fi.isDirectory = true + } + return &fi, nil +} + +func (fs *WebDavFileSystem) Stat(ctx context.Context, name string) (os.FileInfo, error) { + + glog.V(2).Infof("WebDavFileSystem.Stat %v", name) + + return fs.stat(ctx, name) +} + +func (f *WebDavFile) Write(buf []byte) (int, error) { + + glog.V(2).Infof("WebDavFileSystem.Write %v", f.name) + + var err error + ctx := context.Background() + if f.entry == nil { + f.entry, err = filer2.GetEntry(ctx, f.fs, f.name) + } + + if f.entry == nil { + return 0, err + } + if err != nil { + return 0, err + } + + var fileId, host string + var auth security.EncodedJwt + + if err = f.fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + + request := &filer_pb.AssignVolumeRequest{ + Count: 1, + Replication: "000", + Collection: f.fs.option.Collection, + } + + resp, err := client.AssignVolume(ctx, request) + if err != nil { + glog.V(0).Infof("assign volume failure %v: %v", request, err) + return err + } + + fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth) + + return nil + }); err != nil { + return 0, fmt.Errorf("filerGrpcAddress assign volume: %v", err) + } + + fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) + bufReader := bytes.NewReader(buf) + uploadResult, err := operation.Upload(fileUrl, f.name, bufReader, false, "application/octet-stream", nil, auth) + if err != nil { + glog.V(0).Infof("upload data %v to %s: %v", f.name, fileUrl, err) + return 0, fmt.Errorf("upload data: %v", err) + } + if uploadResult.Error != "" { + glog.V(0).Infof("upload failure %v to %s: %v", f.name, fileUrl, err) + return 0, fmt.Errorf("upload result: %v", uploadResult.Error) + } + + chunk := &filer_pb.FileChunk{ + FileId: fileId, + Offset: f.off, + Size: uint64(len(buf)), + Mtime: time.Now().UnixNano(), + ETag: uploadResult.ETag, + } + + f.entry.Chunks = append(f.entry.Chunks, chunk) + dir, _ := filer2.FullPath(f.name).DirAndName() + + err = f.fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + f.entry.Attributes.Mtime = time.Now().Unix() + + request := &filer_pb.UpdateEntryRequest{ + Directory: dir, + Entry: f.entry, + } + + if _, err := client.UpdateEntry(ctx, request); err != nil { + return fmt.Errorf("update %s: %v", f.name, err) + } + + return nil + }) + + if err != nil { + f.off += int64(len(buf)) + } + return len(buf), err +} + +func (f *WebDavFile) Close() error { + + glog.V(2).Infof("WebDavFileSystem.Close %v", f.name) + + if f.entry != nil { + f.entry = nil + f.entryViewCache = nil + } + + return nil +} + +func (f *WebDavFile) Read(p []byte) (readSize int, err error) { + + glog.V(2).Infof("WebDavFileSystem.Read %v", f.name) + ctx := context.Background() + + if f.entry == nil { + f.entry, err = filer2.GetEntry(ctx, f.fs, f.name) + } + if f.entry == nil { + return 0, err + } + if err != nil { + return 0, err + } + if len(f.entry.Chunks) == 0 { + return 0, io.EOF + } + if f.entryViewCache == nil { + f.entryViewCache = filer2.NonOverlappingVisibleIntervals(f.entry.Chunks) + } + chunkViews := filer2.ViewFromVisibleIntervals(f.entryViewCache, f.off, len(p)) + + totalRead, err := filer2.ReadIntoBuffer(ctx, f.fs, f.name, p, chunkViews, f.off) + if err != nil { + return 0, err + } + readSize = int(totalRead) + + f.off += totalRead + if readSize == 0 { + return 0, io.EOF + } + return +} + +func (f *WebDavFile) Readdir(count int) (ret []os.FileInfo, err error) { + + glog.V(2).Infof("WebDavFileSystem.Readdir %v count %d", f.name, count) + ctx := context.Background() + + dir := f.name + if dir != "/" && strings.HasSuffix(dir, "/") { + dir = dir[:len(dir)-1] + } + + err = filer2.ReadDirAllEntries(ctx, f.fs, dir, func(entry *filer_pb.Entry) { + fi := FileInfo{ + size: int64(filer2.TotalSize(entry.GetChunks())), + name: entry.Name, + mode: os.FileMode(entry.Attributes.FileMode), + modifiledTime: time.Unix(entry.Attributes.Mtime, 0), + isDirectory: entry.IsDirectory, + } + + if !strings.HasSuffix(fi.name, "/") && fi.IsDir() { + fi.name += "/" + } + glog.V(4).Infof("entry: %v", fi.name) + ret = append(ret, &fi) + }) + + old := f.off + if old >= int64(len(ret)) { + if count > 0 { + return nil, io.EOF + } + return nil, nil + } + if count > 0 { + f.off += int64(count) + if f.off > int64(len(ret)) { + f.off = int64(len(ret)) + } + } else { + f.off = int64(len(ret)) + old = 0 + } + + return ret[old:f.off], nil +} + +func (f *WebDavFile) Seek(offset int64, whence int) (int64, error) { + + glog.V(2).Infof("WebDavFile.Seek %v %v %v", f.name, offset, whence) + + ctx := context.Background() + + var err error + switch whence { + case 0: + f.off = 0 + case 2: + if fi, err := f.fs.stat(ctx, f.name); err != nil { + return 0, err + } else { + f.off = fi.Size() + } + } + f.off += offset + return f.off, err +} + +func (f *WebDavFile) Stat() (os.FileInfo, error) { + + glog.V(2).Infof("WebDavFile.Stat %v", f.name) + + ctx := context.Background() + + return f.fs.stat(ctx, f.name) +} |
