diff options
Diffstat (limited to 'weed')
| -rw-r--r-- | weed/command/command.go | 4 | ||||
| -rw-r--r-- | weed/command/filer.go | 3 | ||||
| -rw-r--r-- | weed/command/filer_sync.go | 25 | ||||
| -rw-r--r-- | weed/command/server.go | 6 | ||||
| -rw-r--r-- | weed/command/update.go | 382 | ||||
| -rw-r--r-- | weed/command/update_full.go | 9 | ||||
| -rw-r--r-- | weed/command/volume.go | 5 | ||||
| -rw-r--r-- | weed/filer/filechunk_manifest.go | 7 | ||||
| -rw-r--r-- | weed/filer/filechunks.go | 8 | ||||
| -rw-r--r-- | weed/s3api/stats.go | 6 | ||||
| -rw-r--r-- | weed/server/common.go | 4 | ||||
| -rw-r--r-- | weed/server/filer_grpc_server_sub_meta.go | 4 | ||||
| -rw-r--r-- | weed/server/filer_server.go | 3 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_read_dir.go | 7 | ||||
| -rw-r--r-- | weed/server/filer_ui/filer.html | 40 | ||||
| -rw-r--r-- | weed/server/volume_server.go | 6 | ||||
| -rw-r--r-- | weed/server/volume_server_handlers.go | 28 | ||||
| -rw-r--r-- | weed/stats/metrics.go | 23 | ||||
| -rw-r--r-- | weed/storage/needle/needle_read_page.go | 7 | ||||
| -rw-r--r-- | weed/storage/needle_map/compact_map.go | 2 | ||||
| -rw-r--r-- | weed/util/constants.go | 2 | ||||
| -rw-r--r-- | weed/wdclient/vid_map.go | 2 |
22 files changed, 531 insertions, 52 deletions
diff --git a/weed/command/command.go b/weed/command/command.go index dbc18a053..7635405dc 100644 --- a/weed/command/command.go +++ b/weed/command/command.go @@ -2,9 +2,10 @@ package command import ( "fmt" - flag "github.com/chrislusf/seaweedfs/weed/util/fla9" "os" "strings" + + flag "github.com/chrislusf/seaweedfs/weed/util/fla9" ) var Commands = []*Command{ @@ -36,6 +37,7 @@ var Commands = []*Command{ cmdScaffold, cmdServer, cmdShell, + cmdUpdate, cmdUpload, cmdVersion, cmdVolume, diff --git a/weed/command/filer.go b/weed/command/filer.go index 2c91e6fec..c9f9a1956 100644 --- a/weed/command/filer.go +++ b/weed/command/filer.go @@ -55,6 +55,7 @@ type FilerOptions struct { debug *bool debugPort *int localSocket *string + showUIDirectoryDelete *bool } func init() { @@ -82,6 +83,7 @@ func init() { f.debug = cmdFiler.Flag.Bool("debug", false, "serves runtime profiling data, e.g., http://localhost:<debug.port>/debug/pprof/goroutine?debug=2") f.debugPort = cmdFiler.Flag.Int("debug.port", 6060, "http port for debugging") f.localSocket = cmdFiler.Flag.String("localSocket", "", "default to /tmp/seaweedfs-filer-<port>.sock") + f.showUIDirectoryDelete = cmdFiler.Flag.Bool("ui.deleteDir", true, "enable filer UI show delete directory button") // start s3 on filer filerStartS3 = cmdFiler.Flag.Bool("s3", false, "whether to start S3 gateway") @@ -216,6 +218,7 @@ func (fo *FilerOptions) startFiler() { Cipher: *fo.cipher, SaveToFilerLimit: int64(*fo.saveToFilerLimit), ConcurrentUploadLimit: int64(*fo.concurrentUploadLimitMB) * 1024 * 1024, + ShowUIDirectoryDelete: *fo.showUIDirectoryDelete, }) if nfs_err != nil { glog.Fatalf("Filer startup error: %v", nfs_err) diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go index 7aa9c1e8d..b7da1baf9 100644 --- a/weed/command/filer_sync.go +++ b/weed/command/filer_sync.go @@ -12,6 +12,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/replication/sink/filersink" "github.com/chrislusf/seaweedfs/weed/replication/source" "github.com/chrislusf/seaweedfs/weed/security" + statsCollect "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util/grace" "google.golang.org/grpc" @@ -40,6 +41,7 @@ type SyncOptions struct { bFromTsMs *int64 aProxyByFiler *bool bProxyByFiler *bool + metricsHttpPort *int clientId int32 } @@ -72,6 +74,7 @@ func init() { syncOptions.bFromTsMs = cmdFilerSynchronize.Flag.Int64("b.fromTsMs", 0, "synchronization from timestamp on filer B. The unit is millisecond") syncCpuProfile = cmdFilerSynchronize.Flag.String("cpuprofile", "", "cpu profile output file") syncMemProfile = cmdFilerSynchronize.Flag.String("memprofile", "", "memory profile output file") + syncOptions.metricsHttpPort = cmdFilerSynchronize.Flag.Int("metricsPort", 0, "metrics listen port") syncOptions.clientId = util.RandomInt32() } @@ -103,6 +106,9 @@ func runFilerSynchronize(cmd *Command, args []string) bool { filerA := pb.ServerAddress(*syncOptions.filerA) filerB := pb.ServerAddress(*syncOptions.filerB) + // start filer.sync metrics server + go statsCollect.StartMetricsServer(*syncOptions.metricsHttpPort) + // read a filer signature aFilerSignature, aFilerErr := replication.ReadFilerSignature(grpcDialOption, filerA) if aFilerErr != nil { @@ -182,7 +188,7 @@ func doSubscribeFilerMetaChanges(clientId int32, grpcDialOption grpc.DialOption, // if first time, start from now // if has previously synced, resume from that point of time - sourceFilerOffsetTsNs, err := getOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature) + sourceFilerOffsetTsNs, err := getOffset(grpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature) if err != nil { return err } @@ -210,14 +216,17 @@ func doSubscribeFilerMetaChanges(clientId int32, grpcDialOption grpc.DialOption, } var lastLogTsNs = time.Now().Nanosecond() + var clientName = fmt.Sprintf("syncFrom_%s_To_%s", string(sourceFiler), string(targetFiler)) processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3*time.Second, func(counter int64, lastTsNs int64) error { now := time.Now().Nanosecond() glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, lastTsNs), float64(counter)/(float64(now-lastLogTsNs)/1e9)) lastLogTsNs = now - return setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, lastTsNs) + // collect synchronous offset + statsCollect.FilerSyncOffsetGauge.WithLabelValues(sourceFiler.String(), targetFiler.String(), clientName, sourcePath).Set(float64(lastTsNs)) + return setOffset(grpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature, lastTsNs) }) - return pb.FollowMetadata(sourceFiler, grpcDialOption, "syncTo_"+string(targetFiler), clientId, + return pb.FollowMetadata(sourceFiler, grpcDialOption, clientName, clientId, sourcePath, nil, sourceFilerOffsetTsNs, 0, targetFilerSignature, processEventFnWithOffset, pb.RetryForeverOnError) } @@ -226,6 +235,16 @@ const ( SyncKeyPrefix = "sync." ) +// When each business is distinguished according to path, and offsets need to be maintained separately. +func getSignaturePrefixByPath(path string) string { + // compatible historical version + if path == "/" { + return SyncKeyPrefix + } else { + return SyncKeyPrefix + path + } +} + func getOffset(grpcDialOption grpc.DialOption, filer pb.ServerAddress, signaturePrefix string, signature int32) (lastOffsetTsNs int64, readErr error) { readErr = pb.WithFilerClient(false, filer, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { diff --git a/weed/command/server.go b/weed/command/server.go index 4b6b6c642..b1812bb9b 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -2,8 +2,6 @@ package command import ( "fmt" - "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/util/grace" "net/http" "os" "strings" @@ -12,7 +10,9 @@ import ( stats_collect "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/util/grace" ) type ServerOptions struct { @@ -114,6 +114,7 @@ func init() { filerOptions.saveToFilerLimit = cmdServer.Flag.Int("filer.saveToFilerLimit", 0, "Small files smaller than this limit can be cached in filer store.") filerOptions.concurrentUploadLimitMB = cmdServer.Flag.Int("filer.concurrentUploadLimitMB", 64, "limit total concurrent upload size") filerOptions.localSocket = cmdServer.Flag.String("filer.localSocket", "", "default to /tmp/seaweedfs-filer-<port>.sock") + filerOptions.showUIDirectoryDelete = cmdServer.Flag.Bool("filer.ui.deleteDir", true, "enable filer UI show delete directory button") serverOptions.v.port = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port") serverOptions.v.portGrpc = cmdServer.Flag.Int("volume.port.grpc", 0, "volume server grpc listen port") @@ -131,6 +132,7 @@ func init() { serverOptions.v.pprof = cmdServer.Flag.Bool("volume.pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile") serverOptions.v.idxFolder = cmdServer.Flag.String("volume.dir.idx", "", "directory to store .idx files") serverOptions.v.enableTcp = cmdServer.Flag.Bool("volume.tcp", false, "<exprimental> enable tcp port") + serverOptions.v.inflightUploadDataTimeout = cmdServer.Flag.Duration("volume.inflightUploadDataTimeout", 60*time.Second, "inflight upload data wait timeout of volume servers") s3Options.port = cmdServer.Flag.Int("s3.port", 8333, "s3 server http listen port") s3Options.portGrpc = cmdServer.Flag.Int("s3.port.grpc", 0, "s3 server grpc listen port") diff --git a/weed/command/update.go b/weed/command/update.go new file mode 100644 index 000000000..2d0dc42ad --- /dev/null +++ b/weed/command/update.go @@ -0,0 +1,382 @@ +package command + +import ( + "archive/tar" + "archive/zip" + "bytes" + "compress/gzip" + "context" + "crypto/md5" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "net/http" + "os" + "path/filepath" + "runtime" + "strings" + "time" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util" + "golang.org/x/net/context/ctxhttp" +) + +//copied from https://github.com/restic/restic/tree/master/internal/selfupdate + +// Release collects data about a single release on GitHub. +type Release struct { + Name string `json:"name"` + TagName string `json:"tag_name"` + Draft bool `json:"draft"` + PreRelease bool `json:"prerelease"` + PublishedAt time.Time `json:"published_at"` + Assets []Asset `json:"assets"` + + Version string `json:"-"` // set manually in the code +} + +// Asset is a file uploaded and attached to a release. +type Asset struct { + ID int `json:"id"` + Name string `json:"name"` + URL string `json:"url"` +} + +const githubAPITimeout = 30 * time.Second + +// githubError is returned by the GitHub API, e.g. for rate-limiting. +type githubError struct { + Message string +} + +//default version is not full version +var isFullVersion = false + +var ( + updateOpt UpdateOptions +) + +type UpdateOptions struct { + dir *string + name *string + Version *string +} + +func init() { + path, _ := os.Executable() + _, name := filepath.Split(path) + updateOpt.dir = cmdUpdate.Flag.String("dir", filepath.Dir(path), "directory to save new weed.") + updateOpt.name = cmdUpdate.Flag.String("name", name, "name of new weed. On windows, name shouldn't be same to the orignial name.") + updateOpt.Version = cmdUpdate.Flag.String("version", "0", "specific version of weed you want to download. If not specified, get the latest version.") + cmdUpdate.Run = runUpdate +} + +var cmdUpdate = &Command{ + UsageLine: "update [-dir=/path/to/dir] [-name=name] [-version=x.xx]", + Short: "get latest or specific version from https://github.com/chrislusf/seaweedfs", + Long: `get latest or specific version from https://github.com/chrislusf/seaweedfs`, +} + +func runUpdate(cmd *Command, args []string) bool { + path, _ := os.Executable() + _, name := filepath.Split(path) + + if *updateOpt.dir != "" { + if err := util.TestFolderWritable(util.ResolvePath(*updateOpt.dir)); err != nil { + glog.Fatalf("Check Folder(-dir) Writable %s : %s", *updateOpt.dir, err) + return false + } + } else { + *updateOpt.dir = filepath.Dir(path) + } + + if *updateOpt.name == "" { + *updateOpt.name = name + } + + target := filepath.Join(*updateOpt.dir, *updateOpt.name) + + if runtime.GOOS == "windows" { + if target == path { + glog.Fatalf("On windows, name of the new weed shouldn't be same to the orignial name.") + return false + } + } + + glog.V(0).Infof("new weed will be saved to %s", target) + + _, err := downloadRelease(context.Background(), target, *updateOpt.Version) + if err != nil { + glog.Errorf("unable to download weed: %v", err) + return false + } + return true +} + +func downloadRelease(ctx context.Context, target string, ver string) (version string, err error) { + currentVersion := util.VERSION_NUMBER + rel, err := GitHubLatestRelease(ctx, ver, "chrislusf", "seaweedfs") + if err != nil { + return "", err + } + + if rel.Version == currentVersion { + if ver == "0" { + glog.V(0).Infof("weed is up to date") + } else { + glog.V(0).Infof("no need to download the same version of weed ") + } + return currentVersion, nil + } + + glog.V(0).Infof("download version: %s", rel.Version) + + largeDiskSuffix := "" + if util.VolumeSizeLimitGB == 8000 { + largeDiskSuffix = "_large_disk" + } + + fullSuffix := "" + if isFullVersion { + fullSuffix = "_full" + } + + ext := "tar.gz" + if runtime.GOOS == "windows" { + ext = "zip" + } + + suffix := fmt.Sprintf("%s_%s%s%s.%s", runtime.GOOS, runtime.GOARCH, fullSuffix, largeDiskSuffix, ext) + md5Filename := fmt.Sprintf("%s.md5", suffix) + _, md5Val, err := getGithubDataFile(ctx, rel.Assets, md5Filename) + if err != nil { + return "", err + } + + downloadFilename, buf, err := getGithubDataFile(ctx, rel.Assets, suffix) + if err != nil { + return "", err + } + + md5Ctx := md5.New() + md5Ctx.Write(buf) + binaryMd5 := md5Ctx.Sum(nil) + if hex.EncodeToString(binaryMd5) != string(md5Val[0:32]) { + glog.Errorf("md5:'%s' '%s'", hex.EncodeToString(binaryMd5), string(md5Val[0:32])) + err = fmt.Errorf("binary md5sum doesn't match") + return "", err + } + + err = extractToFile(buf, downloadFilename, target) + if err != nil { + return "", err + } else { + glog.V(0).Infof("successfully updated weed to version %v\n", rel.Version) + } + + return rel.Version, nil +} + +// GitHubLatestRelease uses the GitHub API to get information about the specific +// release of a repository. +func GitHubLatestRelease(ctx context.Context, ver string, owner, repo string) (Release, error) { + ctx, cancel := context.WithTimeout(ctx, githubAPITimeout) + defer cancel() + + url := fmt.Sprintf("https://api.github.com/repos/%s/%s/releases", owner, repo) + req, err := http.NewRequest(http.MethodGet, url, nil) + if err != nil { + return Release{}, err + } + + // pin API version 3 + req.Header.Set("Accept", "application/vnd.github.v3+json") + + res, err := ctxhttp.Do(ctx, http.DefaultClient, req) + if err != nil { + return Release{}, err + } + + if res.StatusCode != http.StatusOK { + content := res.Header.Get("Content-Type") + if strings.Contains(content, "application/json") { + // try to decode error message + var msg githubError + jerr := json.NewDecoder(res.Body).Decode(&msg) + if jerr == nil { + return Release{}, fmt.Errorf("unexpected status %v (%v) returned, message:\n %v", res.StatusCode, res.Status, msg.Message) + } + } + + _ = res.Body.Close() + return Release{}, fmt.Errorf("unexpected status %v (%v) returned", res.StatusCode, res.Status) + } + + buf, err := ioutil.ReadAll(res.Body) + if err != nil { + _ = res.Body.Close() + return Release{}, err + } + + err = res.Body.Close() + if err != nil { + return Release{}, err + } + + var release Release + var releaseList []Release + err = json.Unmarshal(buf, &releaseList) + if err != nil { + return Release{}, err + } + if ver == "0" { + release = releaseList[0] + glog.V(0).Infof("latest version is %v\n", release.TagName) + } else { + for _, r := range releaseList { + if r.TagName == ver { + release = r + break + } + } + } + + if release.TagName == "" { + return Release{}, fmt.Errorf("can not find the specific version") + } + + release.Version = release.TagName + return release, nil +} + +func getGithubData(ctx context.Context, url string) ([]byte, error) { + req, err := http.NewRequest(http.MethodGet, url, nil) + if err != nil { + return nil, err + } + + // request binary data + req.Header.Set("Accept", "application/octet-stream") + + res, err := ctxhttp.Do(ctx, http.DefaultClient, req) + if err != nil { + return nil, err + } + + if res.StatusCode != http.StatusOK { + return nil, fmt.Errorf("unexpected status %v (%v) returned", res.StatusCode, res.Status) + } + + buf, err := ioutil.ReadAll(res.Body) + if err != nil { + _ = res.Body.Close() + return nil, err + } + + err = res.Body.Close() + if err != nil { + return nil, err + } + + return buf, nil +} + +func getGithubDataFile(ctx context.Context, assets []Asset, suffix string) (filename string, data []byte, err error) { + var url string + for _, a := range assets { + if strings.HasSuffix(a.Name, suffix) { + url = a.URL + filename = a.Name + break + } + } + + if url == "" { + return "", nil, fmt.Errorf("unable to find file with suffix %v", suffix) + } + + glog.V(0).Infof("download %v\n", filename) + data, err = getGithubData(ctx, url) + if err != nil { + return "", nil, err + } + + return filename, data, nil +} + +func extractToFile(buf []byte, filename, target string) error { + var rd io.Reader = bytes.NewReader(buf) + + switch filepath.Ext(filename) { + case ".gz": + gr, err := gzip.NewReader(rd) + if err != nil { + return err + } + defer gr.Close() + trd := tar.NewReader(gr) + hdr, terr := trd.Next() + if terr != nil { + glog.Errorf("uncompress file(%s) failed:%s", hdr.Name, terr) + return terr + } + rd = trd + case ".zip": + zrd, err := zip.NewReader(bytes.NewReader(buf), int64(len(buf))) + if err != nil { + return err + } + + if len(zrd.File) != 1 { + return fmt.Errorf("ZIP archive contains more than one file") + } + + file, err := zrd.File[0].Open() + if err != nil { + return err + } + + defer func() { + _ = file.Close() + }() + + rd = file + } + + // Write everything to a temp file + dir := filepath.Dir(target) + new, err := ioutil.TempFile(dir, "weed") + if err != nil { + return err + } + + n, err := io.Copy(new, rd) + if err != nil { + _ = new.Close() + _ = os.Remove(new.Name()) + return err + } + if err = new.Sync(); err != nil { + return err + } + if err = new.Close(); err != nil { + return err + } + + mode := os.FileMode(0755) + // attempt to find the original mode + if fi, err := os.Lstat(target); err == nil { + mode = fi.Mode() + } + + // Rename the temp file to the final location atomically. + if err := os.Rename(new.Name(), target); err != nil { + return err + } + + glog.V(0).Infof("saved %d bytes in %v\n", n, target) + return os.Chmod(target, mode) +} diff --git a/weed/command/update_full.go b/weed/command/update_full.go new file mode 100644 index 000000000..529f38219 --- /dev/null +++ b/weed/command/update_full.go @@ -0,0 +1,9 @@ +//go:build elastic && ydb && gocdk && hdfs +// +build elastic,ydb,gocdk,hdfs + +package command + +//set true if gtags are set +func init() { + isFullVersion = true +} diff --git a/weed/command/volume.go b/weed/command/volume.go index b1455352c..158bdf162 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -65,7 +65,8 @@ type VolumeServerOptions struct { preStopSeconds *int metricsHttpPort *int // pulseSeconds *int - enableTcp *bool + enableTcp *bool + inflightUploadDataTimeout *time.Duration } func init() { @@ -96,6 +97,7 @@ func init() { v.metricsHttpPort = cmdVolume.Flag.Int("metricsPort", 0, "Prometheus metrics listen port") v.idxFolder = cmdVolume.Flag.String("dir.idx", "", "directory to store .idx files") v.enableTcp = cmdVolume.Flag.Bool("tcp", false, "<experimental> enable tcp port") + v.inflightUploadDataTimeout = cmdVolume.Flag.Duration("inflightUploadDataTimeout", 60*time.Second, "inflight upload data wait timeout of volume servers") } var cmdVolume = &Command{ @@ -244,6 +246,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v *v.fileSizeLimitMB, int64(*v.concurrentUploadLimitMB)*1024*1024, int64(*v.concurrentDownloadLimitMB)*1024*1024, + *v.inflightUploadDataTimeout, ) // starting grpc server grpcS := v.startGrpcService(volumeServer) diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go index 091bbee5a..4eb657dfa 100644 --- a/weed/filer/filechunk_manifest.go +++ b/weed/filer/filechunk_manifest.go @@ -3,7 +3,6 @@ package filer import ( "bytes" "fmt" - "github.com/chrislusf/seaweedfs/weed/wdclient" "io" "math" "net/url" @@ -11,6 +10,8 @@ import ( "sync" "time" + "github.com/chrislusf/seaweedfs/weed/wdclient" + "github.com/golang/protobuf/proto" "github.com/chrislusf/seaweedfs/weed/glog" @@ -63,14 +64,14 @@ func ResolveChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chun resolvedChunks, err := ResolveOneChunkManifest(lookupFileIdFn, chunk) if err != nil { - return chunks, nil, err + return dataChunks, nil, err } manifestChunks = append(manifestChunks, chunk) // recursive subDataChunks, subManifestChunks, subErr := ResolveChunkManifest(lookupFileIdFn, resolvedChunks, startOffset, stopOffset) if subErr != nil { - return chunks, nil, subErr + return dataChunks, nil, subErr } dataChunks = append(dataChunks, subDataChunks...) manifestChunks = append(manifestChunks, subManifestChunks...) diff --git a/weed/filer/filechunks.go b/weed/filer/filechunks.go index 208ef8095..48b344bf8 100644 --- a/weed/filer/filechunks.go +++ b/weed/filer/filechunks.go @@ -3,11 +3,12 @@ package filer import ( "bytes" "fmt" - "github.com/chrislusf/seaweedfs/weed/wdclient" - "golang.org/x/exp/slices" "math" "sync" + "github.com/chrislusf/seaweedfs/weed/wdclient" + "golang.org/x/exp/slices" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -248,6 +249,9 @@ func MergeIntoVisibles(visibles []VisibleInterval, chunk *filer_pb.FileChunk) (n func NonOverlappingVisibleIntervals(lookupFileIdFn wdclient.LookupFileIdFunctionType, chunks []*filer_pb.FileChunk, startOffset int64, stopOffset int64) (visibles []VisibleInterval, err error) { chunks, _, err = ResolveChunkManifest(lookupFileIdFn, chunks, startOffset, stopOffset) + if err != nil { + return + } visibles2 := readResolvedChunks(chunks) diff --git a/weed/s3api/stats.go b/weed/s3api/stats.go index 973d8c0eb..003807a25 100644 --- a/weed/s3api/stats.go +++ b/weed/s3api/stats.go @@ -1,6 +1,7 @@ package s3api import ( + "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants" stats_collect "github.com/chrislusf/seaweedfs/weed/stats" "net/http" "strconv" @@ -27,11 +28,12 @@ func (r *StatusRecorder) Flush() { func track(f http.HandlerFunc, action string) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { + bucket, _ := s3_constants.GetBucketAndObject(r) w.Header().Set("Server", "SeaweedFS S3") recorder := NewStatusResponseWriter(w) start := time.Now() f(recorder, r) - stats_collect.S3RequestHistogram.WithLabelValues(action).Observe(time.Since(start).Seconds()) - stats_collect.S3RequestCounter.WithLabelValues(action, strconv.Itoa(recorder.Status)).Inc() + stats_collect.S3RequestHistogram.WithLabelValues(action, bucket).Observe(time.Since(start).Seconds()) + stats_collect.S3RequestCounter.WithLabelValues(action, strconv.Itoa(recorder.Status), bucket).Inc() } } diff --git a/weed/server/common.go b/weed/server/common.go index 39a8637ac..f02ec67ac 100644 --- a/weed/server/common.go +++ b/weed/server/common.go @@ -284,6 +284,7 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64 if rangeReq == "" { w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10)) if err := writeFn(bufferedWriter, 0, totalSize); err != nil { + glog.Errorf("processRangeRequest headers: %+v err: %v", w.Header(), err) http.Error(w, err.Error(), http.StatusInternalServerError) return } @@ -294,6 +295,7 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64 //mostly copy from src/pkg/net/http/fs.go ranges, err := parseRange(rangeReq, totalSize) if err != nil { + glog.Errorf("processRangeRequest headers: %+v err: %v", w.Header(), err) http.Error(w, err.Error(), http.StatusRequestedRangeNotSatisfiable) return } @@ -326,6 +328,7 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64 w.WriteHeader(http.StatusPartialContent) err = writeFn(bufferedWriter, ra.start, ra.length) if err != nil { + glog.Errorf("processRangeRequest headers: %+v err: %v", w.Header(), err) http.Error(w, err.Error(), http.StatusInternalServerError) return } @@ -365,6 +368,7 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64 } w.WriteHeader(http.StatusPartialContent) if _, err := io.CopyN(bufferedWriter, sendContent, sendSize); err != nil { + glog.Errorf("processRangeRequest err: %v", err) http.Error(w, "Internal Error", http.StatusInternalServerError) return } diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go index 0540400a3..da710234b 100644 --- a/weed/server/filer_grpc_server_sub_meta.go +++ b/weed/server/filer_grpc_server_sub_meta.go @@ -2,6 +2,7 @@ package weed_server import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/stats" "strings" "time" @@ -229,6 +230,9 @@ func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRe } } + // collect timestamps for path + stats.FilerServerLastSendTsOfSubscribeGauge.WithLabelValues(fs.option.Host.String(), req.ClientName, req.PathPrefix).Set(float64(tsNs)) + message := &filer_pb.SubscribeMetadataResponse{ Directory: dirPath, EventNotification: eventNotification, diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 8908b5e5f..6bf0261ee 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -3,7 +3,6 @@ package weed_server import ( "context" "fmt" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "net/http" "os" "sync" @@ -17,6 +16,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/util" @@ -67,6 +67,7 @@ type FilerOption struct { Cipher bool SaveToFilerLimit int64 ConcurrentUploadLimit int64 + ShowUIDirectoryDelete bool } type FilerServer struct { diff --git a/weed/server/filer_server_handlers_read_dir.go b/weed/server/filer_server_handlers_read_dir.go index 8382cfc76..eaf17fa18 100644 --- a/weed/server/filer_server_handlers_read_dir.go +++ b/weed/server/filer_server_handlers_read_dir.go @@ -73,7 +73,7 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque return } - ui.StatusTpl.Execute(w, struct { + err = ui.StatusTpl.Execute(w, struct { Path string Breadcrumbs []ui.Breadcrumb Entries interface{} @@ -81,6 +81,7 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque LastFileName string ShouldDisplayLoadMore bool EmptyFolder bool + ShowDirectoryDelete bool }{ path, ui.ToBreadcrumb(path), @@ -89,5 +90,9 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque lastFileName, shouldDisplayLoadMore, emptyFolder, + fs.option.ShowUIDirectoryDelete, }) + if err != nil { + glog.V(0).Infof("Template Execute Error: %v", err) + } } diff --git a/weed/server/filer_ui/filer.html b/weed/server/filer_ui/filer.html index f9c35440e..c9d832e8f 100644 --- a/weed/server/filer_ui/filer.html +++ b/weed/server/filer_ui/filer.html @@ -109,38 +109,37 @@ <form class="upload-form"> <input type="file" id="fileElem" multiple onchange="handleFiles(this.files)"> - {{if .EmptyFolder}} + {{ if .EmptyFolder }} <div class="row add-files"> + </div> - {{else}} + {{ else }} <table width="100%" class="table table-hover"> - {{$path := .Path }} + {{ $path := .Path }} + {{ $showDirDel := .ShowDirectoryDelete }} {{ range $entry_index, $entry := .Entries }} <tr> <td> - {{if $entry.IsDirectory}} + {{ if $entry.IsDirectory }} <span class="glyphicon glyphicon-folder-open" aria-hidden="true"></span> <a href="{{ printpath $path "/" $entry.Name "/"}}" > {{ $entry.Name }} </a> - {{else}} + {{ else }} <a href="{{ printpath $path "/" $entry.Name }}" > {{ $entry.Name }} </a> - {{end}} + {{ end }} </td> <td align="right" nowrap> - {{if $entry.IsDirectory}} - {{else}} + {{ if not $entry.IsDirectory }} {{ $entry.Mime }} - {{end}} + {{ end }} </td> <td align="right" nowrap> - {{if $entry.IsDirectory}} - {{else}} + {{ if not $entry.IsDirectory }} {{ $entry.Size | humanizeBytes }} - {{end}} + {{ end }} </td> <td align="right" nowrap> {{ $entry.Timestamp.Format "2006-01-02 15:04" }} @@ -150,31 +149,32 @@ <label class="btn" onclick="handleRename('{{ $entry.Name }}', '{{ printpath $path "/" }}')"> <span class="glyphicon glyphicon-edit" aria-hidden="true"></span> </label> - {{if $entry.IsDirectory}} - <label class="btn" onclick="handleDelete('{{ printpath $path "/" $entry.Name "/" }}')"> + {{ if and $entry.IsDirectory $showDirDel }} + <label class="btn" onclick="handleDelete('{{ printpath $path "/" $entry.Name "/" }}')"> <span class="glyphicon glyphicon-trash" aria-hidden="true"></span> </label> - {{else}} + {{ end }} + {{ if not $entry.IsDirectory }} <label class="btn" onclick="handleDelete('{{ printpath $path "/" $entry.Name }}')"> <span class="glyphicon glyphicon-trash" aria-hidden="true"></span> </label> - {{end}} + {{ end }} </div> </td> </tr> {{ end }} </table> - {{end}} + {{ end }} </form> </div> - {{if .ShouldDisplayLoadMore}} + {{ if .ShouldDisplayLoadMore }} <div class="row"> - <a href={{ print .Path "?limit=" .Limit "&lastFileName=" .LastFileName}} > + <a href={{ print .Path "?limit=" .Limit "&lastFileName=" .LastFileName }} > Load more </a> </div> - {{end}} + {{ end }} <br/> <br/> diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go index 477a3709c..abb30229a 100644 --- a/weed/server/volume_server.go +++ b/weed/server/volume_server.go @@ -3,6 +3,7 @@ package weed_server import ( "net/http" "sync" + "time" "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" @@ -24,7 +25,9 @@ type VolumeServer struct { inFlightDownloadDataSize int64 concurrentUploadLimit int64 concurrentDownloadLimit int64 + inFlightUploadDataLimitCond *sync.Cond inFlightDownloadDataLimitCond *sync.Cond + inflightUploadDataTimeout time.Duration SeedMasterNodes []pb.ServerAddress currentMaster pb.ServerAddress @@ -60,6 +63,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, fileSizeLimitMB int, concurrentUploadLimit int64, concurrentDownloadLimit int64, + inflightUploadDataTimeout time.Duration, ) *VolumeServer { v := util.GetViper() @@ -84,9 +88,11 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024, isHeartbeating: true, stopChan: make(chan bool), + inFlightUploadDataLimitCond: sync.NewCond(new(sync.Mutex)), inFlightDownloadDataLimitCond: sync.NewCond(new(sync.Mutex)), concurrentUploadLimit: concurrentUploadLimit, concurrentDownloadLimit: concurrentDownloadLimit, + inflightUploadDataTimeout: inflightUploadDataTimeout, } vs.SeedMasterNodes = masterNodes diff --git a/weed/server/volume_server_handlers.go b/weed/server/volume_server_handlers.go index 49bc297fb..293f36f14 100644 --- a/weed/server/volume_server_handlers.go +++ b/weed/server/volume_server_handlers.go @@ -6,6 +6,7 @@ import ( "strconv" "strings" "sync/atomic" + "time" "github.com/chrislusf/seaweedfs/weed/util" @@ -56,20 +57,31 @@ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Reque vs.guard.WhiteList(vs.DeleteHandler)(w, r) case "PUT", "POST": - // wait until in flight data is less than the limit contentLength := getContentLength(r) - // exclude the replication from the concurrentUploadLimitMB - if vs.concurrentUploadLimit != 0 && r.URL.Query().Get("type") != "replicate" && - atomic.LoadInt64(&vs.inFlightUploadDataSize) > vs.concurrentUploadLimit { - err := fmt.Errorf("reject because inflight upload data %d > %d", vs.inFlightUploadDataSize, vs.concurrentUploadLimit) - glog.V(1).Infof("too many requests: %v", err) - writeJsonError(w, r, http.StatusTooManyRequests, err) - return + if r.URL.Query().Get("type") != "replicate" && vs.concurrentUploadLimit != 0 { + startTime := time.Now() + vs.inFlightUploadDataLimitCond.L.Lock() + for vs.inFlightUploadDataSize > vs.concurrentUploadLimit { + //wait timeout check + if startTime.Add(vs.inflightUploadDataTimeout).Before(time.Now()) { + vs.inFlightUploadDataLimitCond.L.Unlock() + err := fmt.Errorf("reject because inflight upload data %d > %d, and wait timeout", vs.inFlightUploadDataSize, vs.concurrentUploadLimit) + glog.V(1).Infof("too many requests: %v", err) + writeJsonError(w, r, http.StatusTooManyRequests, err) + return + } + glog.V(4).Infof("wait because inflight upload data %d > %d", vs.inFlightUploadDataSize, vs.concurrentUploadLimit) + vs.inFlightUploadDataLimitCond.Wait() + } + vs.inFlightUploadDataLimitCond.L.Unlock() } atomic.AddInt64(&vs.inFlightUploadDataSize, contentLength) defer func() { atomic.AddInt64(&vs.inFlightUploadDataSize, -contentLength) + if vs.concurrentUploadLimit != 0 { + vs.inFlightUploadDataLimitCond.Signal() + } }() // processs uploads diff --git a/weed/stats/metrics.go b/weed/stats/metrics.go index 943aafff9..f0b810608 100644 --- a/weed/stats/metrics.go +++ b/weed/stats/metrics.go @@ -77,6 +77,14 @@ var ( Buckets: prometheus.ExponentialBuckets(0.0001, 2, 24), }, []string{"type"}) + FilerServerLastSendTsOfSubscribeGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "SeaweedFS", + Subsystem: "filer", + Name: "last_send_timestamp_of_subscribe", + Help: "The last send timestamp of the filer subscription.", + }, []string{"sourceFiler", "clientName", "path"}) + FilerStoreCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "SeaweedFS", @@ -94,6 +102,14 @@ var ( Buckets: prometheus.ExponentialBuckets(0.0001, 2, 24), }, []string{"store", "type"}) + FilerSyncOffsetGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "SeaweedFS", + Subsystem: "filerSync", + Name: "sync_offset", + Help: "The offset of the filer synchronization service.", + }, []string{"sourceFiler", "targetFiler", "clientName", "path"}) + VolumeServerRequestCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: "SeaweedFS", @@ -157,7 +173,8 @@ var ( Subsystem: "s3", Name: "request_total", Help: "Counter of s3 requests.", - }, []string{"type", "code"}) + }, []string{"type", "code", "bucket"}) + S3RequestHistogram = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "SeaweedFS", @@ -165,7 +182,7 @@ var ( Name: "request_seconds", Help: "Bucketed histogram of s3 request processing time.", Buckets: prometheus.ExponentialBuckets(0.0001, 2, 24), - }, []string{"type"}) + }, []string{"type", "bucket"}) ) func init() { @@ -179,6 +196,8 @@ func init() { Gather.MustRegister(FilerRequestHistogram) Gather.MustRegister(FilerStoreCounter) Gather.MustRegister(FilerStoreHistogram) + Gather.MustRegister(FilerSyncOffsetGauge) + Gather.MustRegister(FilerServerLastSendTsOfSubscribeGauge) Gather.MustRegister(collectors.NewGoCollector()) Gather.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) diff --git a/weed/storage/needle/needle_read_page.go b/weed/storage/needle/needle_read_page.go index c00195e93..e909869f3 100644 --- a/weed/storage/needle/needle_read_page.go +++ b/weed/storage/needle/needle_read_page.go @@ -14,9 +14,10 @@ func (n *Needle) ReadNeedleDataInto(r backend.BackendStorageFile, volumeOffset i crc := CRC(0) for x := needleOffset; x < needleOffset+size; x += int64(len(buf)) { count, err := n.ReadNeedleData(r, volumeOffset, buf, x) - if count > 0 { - crc = crc.Update(buf[0:count]) - if _, err = writer.Write(buf[0:count]); err != nil { + toWrite := min(int64(count), needleOffset+size-x) + if toWrite > 0 { + crc = crc.Update(buf[0:toWrite]) + if _, err = writer.Write(buf[0:toWrite]); err != nil { return fmt.Errorf("ReadNeedleData write: %v", err) } } diff --git a/weed/storage/needle_map/compact_map.go b/weed/storage/needle_map/compact_map.go index 3d2047f99..ccce8f108 100644 --- a/weed/storage/needle_map/compact_map.go +++ b/weed/storage/needle_map/compact_map.go @@ -8,7 +8,7 @@ import ( ) const ( - batch = 100000 + batch = 10000 ) type SectionalNeedleId uint32 diff --git a/weed/util/constants.go b/weed/util/constants.go index 213eafae0..c0fea8b17 100644 --- a/weed/util/constants.go +++ b/weed/util/constants.go @@ -5,7 +5,7 @@ import ( ) var ( - VERSION_NUMBER = fmt.Sprintf("%.02f", 3.10) + VERSION_NUMBER = fmt.Sprintf("%.02f", 3.11) VERSION = sizeLimit + " " + VERSION_NUMBER COMMIT = "" ) diff --git a/weed/wdclient/vid_map.go b/weed/wdclient/vid_map.go index cdd783d91..f7a9a0f1a 100644 --- a/weed/wdclient/vid_map.go +++ b/weed/wdclient/vid_map.go @@ -133,7 +133,7 @@ func (vc *vidMap) GetLocations(vid uint32) (locations []Location, found bool) { return } locations, found = vc.ecVid2Locations[vid] - return + return locations, found && len(locations) > 0 } func (vc *vidMap) addLocation(vid uint32, location Location) { |
