diff options
Diffstat (limited to 'weed')
40 files changed, 716 insertions, 184 deletions
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index 2295faa8a..9afa65d23 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -219,7 +219,7 @@ func genFileCopyTask(fileOrDir string, destPath string, fileCopyTaskChan chan Fi fileCopyTaskChan <- FileCopyTask{ sourceLocation: fileOrDir, - destinationUrlPath: destPath+fi.Name(), + destinationUrlPath: destPath, fileSize: fi.Size(), fileMode: fi.Mode(), uid: uid, @@ -405,7 +405,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, Replication: *worker.options.replication, Collection: *worker.options.collection, TtlSec: worker.options.ttlSec, - Path: task.destinationUrlPath+fileName, + Path: task.destinationUrlPath + fileName, } assignResult, assignError = client.AssignVolume(context.Background(), request) diff --git a/weed/command/mount.go b/weed/command/mount.go index 7fdb21254..f325cb0a5 100644 --- a/weed/command/mount.go +++ b/weed/command/mount.go @@ -14,6 +14,7 @@ type MountOptions struct { replication *string ttlSec *int chunkSizeLimitMB *int + concurrentWriters *int cacheDir *string cacheSizeMB *int64 dataCenter *string @@ -42,6 +43,7 @@ func init() { mountOptions.replication = cmdMount.Flag.String("replication", "", "replication(e.g. 000, 001) to create to files. If empty, let filer decide.") mountOptions.ttlSec = cmdMount.Flag.Int("ttl", 0, "file ttl in seconds") mountOptions.chunkSizeLimitMB = cmdMount.Flag.Int("chunkSizeLimitMB", 2, "local write buffer size, also chunk large files") + mountOptions.concurrentWriters = cmdMount.Flag.Int("concurrentWriters", 0, "limit concurrent goroutine writers if not 0") mountOptions.cacheDir = cmdMount.Flag.String("cacheDir", os.TempDir(), "local cache directory for file chunks and meta data") mountOptions.cacheSizeMB = cmdMount.Flag.Int64("cacheCapacityMB", 1000, "local file chunk cache capacity in MB (0 will disable cache)") mountOptions.dataCenter = cmdMount.Flag.String("dataCenter", "", "prefer to write to the data center") diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index 20d08314c..83cb352ff 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -5,8 +5,6 @@ package command import ( "context" "fmt" - "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache" "os" "os/user" "path" @@ -15,6 +13,8 @@ import ( "strings" "time" + "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache" + "github.com/seaweedfs/fuse" "github.com/seaweedfs/fuse/fs" @@ -33,7 +33,7 @@ func runMount(cmd *Command, args []string) bool { if *mountReadRetryTime < time.Second { *mountReadRetryTime = time.Second } - filer.ReadWaitTime = *mountReadRetryTime + util.RetryWaitTime = *mountReadRetryTime umask, umaskErr := strconv.ParseUint(*mountOptions.umaskString, 8, 64) if umaskErr != nil { @@ -175,6 +175,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { Replication: *option.replication, TtlSec: int32(*option.ttlSec), ChunkSizeLimit: int64(chunkSizeLimitMB) * 1024 * 1024, + ConcurrentWriters: *option.concurrentWriters, CacheDir: *option.cacheDir, CacheSizeMB: *option.cacheSizeMB, DataCenter: *option.dataCenter, diff --git a/weed/command/server.go b/weed/command/server.go index 6a78fb3f4..4a9c2411a 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -2,13 +2,14 @@ package command import ( "fmt" - stats_collect "github.com/chrislusf/seaweedfs/weed/stats" "os" "runtime" "runtime/pprof" "strings" "time" + stats_collect "github.com/chrislusf/seaweedfs/weed/stats" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -60,9 +61,10 @@ var ( serverMetricsHttpPort = cmdServer.Flag.Int("metricsPort", 0, "Prometheus metrics listen port") // pulseSeconds = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") - isStartingFiler = cmdServer.Flag.Bool("filer", false, "whether to start filer") - isStartingS3 = cmdServer.Flag.Bool("s3", false, "whether to start S3 gateway") - isStartingMsgBroker = cmdServer.Flag.Bool("msgBroker", false, "whether to start message broker") + isStartingVolumeServer = cmdServer.Flag.Bool("volume", true, "whether to start volume server") + isStartingFiler = cmdServer.Flag.Bool("filer", false, "whether to start filer") + isStartingS3 = cmdServer.Flag.Bool("s3", false, "whether to start S3 gateway") + isStartingMsgBroker = cmdServer.Flag.Bool("msgBroker", false, "whether to start message broker") serverWhiteList []string @@ -99,7 +101,7 @@ func init() { serverOptions.v.fixJpgOrientation = cmdServer.Flag.Bool("volume.images.fix.orientation", false, "Adjust jpg orientation when uploading.") serverOptions.v.readRedirect = cmdServer.Flag.Bool("volume.read.redirect", true, "Redirect moved or non-local volumes.") serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second") - serverOptions.v.fileSizeLimitMB = cmdServer.Flag.Int("volume.fileSizeLimitMB", 1024, "limit file size to avoid out of memory") + serverOptions.v.fileSizeLimitMB = cmdServer.Flag.Int("volume.fileSizeLimitMB", 256, "limit file size to avoid out of memory") serverOptions.v.publicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address") serverOptions.v.preStopSeconds = cmdServer.Flag.Int("volume.preStopSeconds", 10, "number of seconds between stop send heartbeats and stop volume server") serverOptions.v.pprof = cmdServer.Flag.Bool("volume.pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile") @@ -214,7 +216,7 @@ func runServer(cmd *Command, args []string) bool { } // start volume server - { + if *isStartingVolumeServer { go serverOptions.v.startVolumeServer(*volumeDataFolders, *volumeMaxDataVolumeCounts, *serverWhiteListOption, *volumeMinFreeSpacePercent) } diff --git a/weed/command/volume.go b/weed/command/volume.go index d73c24ed1..ce5992665 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -80,7 +80,7 @@ func init() { v.cpuProfile = cmdVolume.Flag.String("cpuprofile", "", "cpu profile output file") v.memProfile = cmdVolume.Flag.String("memprofile", "", "memory profile output file") v.compactionMBPerSecond = cmdVolume.Flag.Int("compactionMBps", 0, "limit background compaction or copying speed in mega bytes per second") - v.fileSizeLimitMB = cmdVolume.Flag.Int("fileSizeLimitMB", 1024, "limit file size to avoid out of memory") + v.fileSizeLimitMB = cmdVolume.Flag.Int("fileSizeLimitMB", 256, "limit file size to avoid out of memory") v.pprof = cmdVolume.Flag.Bool("pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile") v.metricsHttpPort = cmdVolume.Flag.Int("metricsPort", 0, "Prometheus metrics listen port") } diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go index 0d01a4a36..f5ab36d37 100644 --- a/weed/filer/filechunk_manifest.go +++ b/weed/filer/filechunk_manifest.go @@ -99,7 +99,7 @@ func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool var buffer bytes.Buffer var shouldRetry bool - for waitTime := time.Second; waitTime < ReadWaitTime; waitTime += waitTime / 2 { + for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 { for _, urlString := range urlStrings { shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) { buffer.Write(data) diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go index 04c64d449..ccc746b90 100644 --- a/weed/filer/reader_at.go +++ b/weed/filer/reader_at.go @@ -3,19 +3,16 @@ package filer import ( "context" "fmt" + "io" + "math/rand" + "sync" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util/chunk_cache" "github.com/chrislusf/seaweedfs/weed/wdclient" "github.com/golang/groupcache/singleflight" - "io" - "math/rand" - "sync" - "time" -) - -var ( - ReadWaitTime = 6 * time.Second ) type ChunkReadAt struct { @@ -45,34 +42,29 @@ func LookupFn(filerClient filer_pb.FilerClient) LookupFileIdFunctionType { locations, found := vidCache[vid] vicCacheLock.RUnlock() - waitTime := time.Second - for !found && waitTime < ReadWaitTime { - // println("looking up volume", vid) - err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{ - VolumeIds: []string{vid}, + if !found { + util.Retry("lookup volume "+vid, func() error { + err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{ + VolumeIds: []string{vid}, + }) + if err != nil { + return err + } + + locations = resp.LocationsMap[vid] + if locations == nil || len(locations.Locations) == 0 { + glog.V(0).Infof("failed to locate %s", fileId) + return fmt.Errorf("failed to locate %s", fileId) + } + vicCacheLock.Lock() + vidCache[vid] = locations + vicCacheLock.Unlock() + + return nil }) - if err != nil { - return err - } - - locations = resp.LocationsMap[vid] - if locations == nil || len(locations.Locations) == 0 { - glog.V(0).Infof("failed to locate %s", fileId) - return fmt.Errorf("failed to locate %s", fileId) - } - vicCacheLock.Lock() - vidCache[vid] = locations - vicCacheLock.Unlock() - - return nil + return err }) - if err == nil { - break - } - glog.V(1).Infof("wait for volume %s", vid) - time.Sleep(waitTime) - waitTime += waitTime / 2 } if err != nil { diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go index ae2ae3418..a8481a435 100644 --- a/weed/filesys/dir.go +++ b/weed/filesys/dir.go @@ -27,6 +27,7 @@ type Dir struct { var _ = fs.Node(&Dir{}) var _ = fs.NodeCreater(&Dir{}) +var _ = fs.NodeMknoder(&Dir{}) var _ = fs.NodeMkdirer(&Dir{}) var _ = fs.NodeFsyncer(&Dir{}) var _ = fs.NodeRequestLookuper(&Dir{}) @@ -179,6 +180,20 @@ func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest, } +func (dir *Dir) Mknod(ctx context.Context, req *fuse.MknodRequest) (fs.Node, error) { + if req.Mode&os.ModeNamedPipe != 0 { + glog.V(1).Infof("mknod named pipe %s", req.String()) + return nil, fuse.ENOSYS + } + if req.Mode&req.Mode&os.ModeSocket != 0 { + glog.V(1).Infof("mknod socket %s", req.String()) + return nil, fuse.ENOSYS + } + // not going to support mknod for normal files either + glog.V(1).Infof("mknod %s", req.String()) + return nil, fuse.ENOSYS +} + func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, error) { glog.V(4).Infof("mkdir %s: %s", dir.FullPath(), req.Name) @@ -347,8 +362,22 @@ func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error { // then, delete meta cache and fsNode cache dir.wfs.metaCache.DeleteEntry(context.Background(), filePath) + + // clear entry inside the file + fsNode := dir.wfs.fsNodeCache.GetFsNode(filePath) + if fsNode != nil { + if file, ok := fsNode.(*File); ok { + file.clearEntry() + } + } dir.wfs.fsNodeCache.DeleteFsNode(filePath) + // remove current file handle if any + dir.wfs.handlesLock.Lock() + defer dir.wfs.handlesLock.Unlock() + inodeId := util.NewFullPath(dir.FullPath(), req.Name).AsInode() + delete(dir.wfs.handles, inodeId) + // delete the chunks last if isDeleteData { dir.wfs.deleteFileChunks(entry.Chunks) diff --git a/weed/filesys/dir_link.go b/weed/filesys/dir_link.go index f6bc41b56..ba3280f03 100644 --- a/weed/filesys/dir_link.go +++ b/weed/filesys/dir_link.go @@ -31,7 +31,7 @@ func (dir *Dir) Link(ctx context.Context, req *fuse.LinkRequest, old fs.Node) (f glog.V(4).Infof("Link: %v/%v -> %v/%v", oldFile.dir.FullPath(), oldFile.Name, dir.FullPath(), req.NewName) - if err := oldFile.maybeLoadEntry(ctx); err != nil { + if _, err := oldFile.maybeLoadEntry(ctx); err != nil { return nil, err } @@ -86,7 +86,7 @@ func (dir *Dir) Link(ctx context.Context, req *fuse.LinkRequest, old fs.Node) (f // create new file node newNode := dir.newFile(req.NewName, request.Entry) newFile := newNode.(*File) - if err := newFile.maybeLoadEntry(ctx); err != nil { + if _, err := newFile.maybeLoadEntry(ctx); err != nil { return nil, err } @@ -138,16 +138,17 @@ func (dir *Dir) Symlink(ctx context.Context, req *fuse.SymlinkRequest) (fs.Node, func (file *File) Readlink(ctx context.Context, req *fuse.ReadlinkRequest) (string, error) { - if err := file.maybeLoadEntry(ctx); err != nil { + entry, err := file.maybeLoadEntry(ctx) + if err != nil { return "", err } - if os.FileMode(file.entry.Attributes.FileMode)&os.ModeSymlink == 0 { + if os.FileMode(entry.Attributes.FileMode)&os.ModeSymlink == 0 { return "", fuse.Errno(syscall.EINVAL) } - glog.V(4).Infof("Readlink: %v/%v => %v", file.dir.FullPath(), file.Name, file.entry.Attributes.SymlinkTarget) + glog.V(4).Infof("Readlink: %v/%v => %v", file.dir.FullPath(), file.Name, entry.Attributes.SymlinkTarget) - return file.entry.Attributes.SymlinkTarget, nil + return entry.Attributes.SymlinkTarget, nil } diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go index dd0c48796..11089186f 100644 --- a/weed/filesys/dirty_page.go +++ b/weed/filesys/dirty_page.go @@ -9,22 +9,16 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" -) - -var ( - concurrentWriterLimit = runtime.NumCPU() - concurrentWriters = util.NewLimitedConcurrentExecutor(4 * concurrentWriterLimit) ) type ContinuousDirtyPages struct { intervals *ContinuousIntervals f *File writeWaitGroup sync.WaitGroup + chunkAddLock sync.Mutex chunkSaveErrChan chan error chunkSaveErrChanClosed bool lastErr error - lock sync.Mutex collection string replication string } @@ -33,7 +27,7 @@ func newDirtyPages(file *File) *ContinuousDirtyPages { dirtyPages := &ContinuousDirtyPages{ intervals: &ContinuousIntervals{}, f: file, - chunkSaveErrChan: make(chan error, concurrentWriterLimit), + chunkSaveErrChan: make(chan error, runtime.NumCPU()), } go func() { for t := range dirtyPages.chunkSaveErrChan { @@ -100,14 +94,18 @@ func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (hasSavedD func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, size int64) { + errChanSize := pages.f.wfs.option.ConcurrentWriters + if errChanSize == 0 { + errChanSize = runtime.NumCPU() + } if pages.chunkSaveErrChanClosed { - pages.chunkSaveErrChan = make(chan error, concurrentWriterLimit) + pages.chunkSaveErrChan = make(chan error, errChanSize) pages.chunkSaveErrChanClosed = false } mtime := time.Now().UnixNano() pages.writeWaitGroup.Add(1) - go func() { + writer := func() { defer pages.writeWaitGroup.Done() reader = io.LimitReader(reader, size) @@ -119,9 +117,17 @@ func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, } chunk.Mtime = mtime pages.collection, pages.replication = collection, replication + pages.chunkAddLock.Lock() + defer pages.chunkAddLock.Unlock() pages.f.addChunks([]*filer_pb.FileChunk{chunk}) glog.V(3).Infof("%s saveToStorage [%d,%d)", pages.f.fullpath(), offset, offset+size) - }() + } + + if pages.f.wfs.concurrentWriters != nil { + pages.f.wfs.concurrentWriters.Execute(writer) + } else { + go writer() + } } func max(x, y int64) int64 { diff --git a/weed/filesys/file.go b/weed/filesys/file.go index 7aa1016d7..9e1342370 100644 --- a/weed/filesys/file.go +++ b/weed/filesys/file.go @@ -43,32 +43,33 @@ func (file *File) fullpath() util.FullPath { return util.NewFullPath(file.dir.FullPath(), file.Name) } -func (file *File) Attr(ctx context.Context, attr *fuse.Attr) error { +func (file *File) Attr(ctx context.Context, attr *fuse.Attr) (err error) { glog.V(4).Infof("file Attr %s, open:%v, existing attr: %+v", file.fullpath(), file.isOpen, attr) - if file.isOpen <= 0 { - if err := file.maybeLoadEntry(ctx); err != nil { + entry := file.entry + if file.isOpen <= 0 || entry == nil { + if entry, err = file.maybeLoadEntry(ctx); err != nil { return err } } attr.Inode = file.fullpath().AsInode() attr.Valid = time.Second - attr.Mode = os.FileMode(file.entry.Attributes.FileMode) - attr.Size = filer.FileSize(file.entry) + attr.Mode = os.FileMode(entry.Attributes.FileMode) + attr.Size = filer.FileSize(entry) if file.isOpen > 0 { - attr.Size = file.entry.Attributes.FileSize + attr.Size = entry.Attributes.FileSize glog.V(4).Infof("file Attr %s, open:%v, size: %d", file.fullpath(), file.isOpen, attr.Size) } - attr.Crtime = time.Unix(file.entry.Attributes.Crtime, 0) - attr.Mtime = time.Unix(file.entry.Attributes.Mtime, 0) - attr.Gid = file.entry.Attributes.Gid - attr.Uid = file.entry.Attributes.Uid + attr.Crtime = time.Unix(entry.Attributes.Crtime, 0) + attr.Mtime = time.Unix(entry.Attributes.Mtime, 0) + attr.Gid = entry.Attributes.Gid + attr.Uid = entry.Attributes.Uid attr.Blocks = attr.Size/blockSize + 1 attr.BlockSize = uint32(file.wfs.option.ChunkSizeLimit) - if file.entry.HardLinkCounter > 0 { - attr.Nlink = uint32(file.entry.HardLinkCounter) + if entry.HardLinkCounter > 0 { + attr.Nlink = uint32(entry.HardLinkCounter) } return nil @@ -79,11 +80,12 @@ func (file *File) Getxattr(ctx context.Context, req *fuse.GetxattrRequest, resp glog.V(4).Infof("file Getxattr %s", file.fullpath()) - if err := file.maybeLoadEntry(ctx); err != nil { + entry, err := file.maybeLoadEntry(ctx) + if err != nil { return err } - return getxattr(file.entry, req, resp) + return getxattr(entry, req, resp) } func (file *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fs.Handle, error) { @@ -104,7 +106,8 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f glog.V(4).Infof("%v file setattr %+v", file.fullpath(), req) - if err := file.maybeLoadEntry(ctx); err != nil { + _, err := file.maybeLoadEntry(ctx) + if err != nil { return err } if file.isOpen > 0 { @@ -141,7 +144,7 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f } } file.entry.Chunks = chunks - file.entryViewCache = nil + file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(file.wfs), chunks) file.reader = nil file.wfs.deleteFileChunks(truncatedChunks) } @@ -186,7 +189,7 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f return nil } - return file.saveEntry() + return file.saveEntry(file.entry) } @@ -194,15 +197,16 @@ func (file *File) Setxattr(ctx context.Context, req *fuse.SetxattrRequest) error glog.V(4).Infof("file Setxattr %s: %s", file.fullpath(), req.Name) - if err := file.maybeLoadEntry(ctx); err != nil { + entry, err := file.maybeLoadEntry(ctx) + if err != nil { return err } - if err := setxattr(file.entry, req); err != nil { + if err := setxattr(entry, req); err != nil { return err } - return file.saveEntry() + return file.saveEntry(entry) } @@ -210,15 +214,16 @@ func (file *File) Removexattr(ctx context.Context, req *fuse.RemovexattrRequest) glog.V(4).Infof("file Removexattr %s: %s", file.fullpath(), req.Name) - if err := file.maybeLoadEntry(ctx); err != nil { + entry, err := file.maybeLoadEntry(ctx) + if err != nil { return err } - if err := removexattr(file.entry, req); err != nil { + if err := removexattr(entry, req); err != nil { return err } - return file.saveEntry() + return file.saveEntry(entry) } @@ -226,11 +231,12 @@ func (file *File) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, res glog.V(4).Infof("file Listxattr %s", file.fullpath()) - if err := file.maybeLoadEntry(ctx); err != nil { + entry, err := file.maybeLoadEntry(ctx) + if err != nil { return err } - if err := listxattr(file.entry, req, resp); err != nil { + if err := listxattr(entry, req, resp); err != nil { return err } @@ -252,30 +258,61 @@ func (file *File) Forget() { file.wfs.fsNodeCache.DeleteFsNode(t) } -func (file *File) maybeLoadEntry(ctx context.Context) error { - if (file.entry != nil && len(file.entry.HardLinkId) != 0) || file.isOpen > 0 { - return nil +func (file *File) maybeLoadEntry(ctx context.Context) (entry *filer_pb.Entry, err error) { + entry = file.entry + if file.isOpen > 0 { + return entry, nil } - entry, err := file.wfs.maybeLoadEntry(file.dir.FullPath(), file.Name) + if entry != nil { + if len(entry.HardLinkId) == 0 { + // only always reload hard link + return entry, nil + } + } + entry, err = file.wfs.maybeLoadEntry(file.dir.FullPath(), file.Name) if err != nil { glog.V(3).Infof("maybeLoadEntry file %s/%s: %v", file.dir.FullPath(), file.Name, err) - return err + return entry, err } if entry != nil { file.setEntry(entry) + } else { + glog.Warningf("maybeLoadEntry not found entry %s/%s: %v", file.dir.FullPath(), file.Name, err) } - return nil + return entry, nil +} + +func lessThan(a, b *filer_pb.FileChunk) bool { + if a.Mtime == b.Mtime { + return a.Fid.FileKey < b.Fid.FileKey + } + return a.Mtime < b.Mtime } func (file *File) addChunks(chunks []*filer_pb.FileChunk) { - sort.Slice(chunks, func(i, j int) bool { - if chunks[i].Mtime == chunks[j].Mtime { - return chunks[i].Fid.FileKey < chunks[j].Fid.FileKey + // find the earliest incoming chunk + newChunks := chunks + earliestChunk := newChunks[0] + for i:=1;i<len(newChunks);i++{ + if lessThan(earliestChunk, newChunks[i]) { + earliestChunk = newChunks[i] + } + } + + // pick out-of-order chunks from existing chunks + for _, chunk := range file.entry.Chunks { + if lessThan(earliestChunk, chunk) { + chunks = append(chunks, chunk) } - return chunks[i].Mtime < chunks[j].Mtime + } + + // sort incoming chunks + sort.Slice(chunks, func(i, j int) bool { + return lessThan(chunks[i], chunks[j]) }) + // add to entry view cache for _, chunk := range chunks { file.entryViewCache = filer.MergeIntoVisibles(file.entryViewCache, chunk) } @@ -284,24 +321,30 @@ func (file *File) addChunks(chunks []*filer_pb.FileChunk) { glog.V(4).Infof("%s existing %d chunks adds %d more", file.fullpath(), len(file.entry.Chunks), len(chunks)) - file.entry.Chunks = append(file.entry.Chunks, chunks...) + file.entry.Chunks = append(file.entry.Chunks, newChunks...) } func (file *File) setEntry(entry *filer_pb.Entry) { file.entry = entry - file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(file.wfs), file.entry.Chunks) + file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(file.wfs), entry.Chunks) + file.reader = nil +} + +func (file *File) clearEntry() { + file.entry = nil + file.entryViewCache = nil file.reader = nil } -func (file *File) saveEntry() error { +func (file *File) saveEntry(entry *filer_pb.Entry) error { return file.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - file.wfs.mapPbIdFromLocalToFiler(file.entry) - defer file.wfs.mapPbIdFromFilerToLocal(file.entry) + file.wfs.mapPbIdFromLocalToFiler(entry) + defer file.wfs.mapPbIdFromFilerToLocal(entry) request := &filer_pb.UpdateEntryRequest{ Directory: file.dir.FullPath(), - Entry: file.entry, + Entry: entry, Signatures: []int32{file.wfs.signature}, } diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index 54bde3494..54410a0ba 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -183,16 +183,18 @@ func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) err } if fh.f.isOpen == 0 { + if err := fh.doFlush(ctx, req.Header); err != nil { glog.Errorf("Release doFlush %s: %v", fh.f.Name, err) } - fh.f.wfs.ReleaseHandle(fh.f.fullpath(), fuse.HandleID(fh.handle)) - } - // stop the goroutine - if !fh.dirtyPages.chunkSaveErrChanClosed { - fh.dirtyPages.chunkSaveErrChanClosed = true - close(fh.dirtyPages.chunkSaveErrChan) + // stop the goroutine + if !fh.dirtyPages.chunkSaveErrChanClosed { + fh.dirtyPages.chunkSaveErrChanClosed = true + close(fh.dirtyPages.chunkSaveErrChan) + } + + fh.f.wfs.ReleaseHandle(fh.f.fullpath(), fuse.HandleID(fh.handle)) } return nil @@ -262,7 +264,6 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error { glog.V(0).Infof("MaybeManifestize: %v", manifestErr) } fh.f.entry.Chunks = append(chunks, manifestChunks...) - fh.f.entryViewCache = nil fh.f.wfs.mapPbIdFromLocalToFiler(request.Entry) defer fh.f.wfs.mapPbIdFromFilerToLocal(request.Entry) diff --git a/weed/filesys/meta_cache/meta_cache_init.go b/weed/filesys/meta_cache/meta_cache_init.go index f42d61230..4089cea28 100644 --- a/weed/filesys/meta_cache/meta_cache_init.go +++ b/weed/filesys/meta_cache/meta_cache_init.go @@ -3,8 +3,6 @@ package meta_cache import ( "context" "fmt" - "strings" - "time" "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" @@ -18,7 +16,7 @@ func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.Full glog.V(4).Infof("ReadDirAllEntries %s ...", path) - for waitTime := time.Second; waitTime < filer.ReadWaitTime; waitTime += waitTime / 2 { + util.Retry("ReadDirAllEntries", func() error { err = filer_pb.ReadDirAllEntries(client, dirPath, "", func(pbEntry *filer_pb.Entry, isLast bool) error { entry := filer.FromPbEntry(string(dirPath), pbEntry) if err := mc.doInsertEntry(context.Background(), entry); err != nil { @@ -30,17 +28,13 @@ func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.Full } return nil }) - if err == nil { - break - } - if strings.Contains(err.Error(), "transport: ") { - glog.V(0).Infof("ReadDirAllEntries %s: %v. Retry in %v", path, err, waitTime) - time.Sleep(waitTime) - continue - } + return err + }) + + if err != nil { err = fmt.Errorf("list %s: %v", dirPath, err) - break } + return }) } diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index 759e21b15..cd14e8032 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -31,6 +31,7 @@ type Option struct { Replication string TtlSec int32 ChunkSizeLimit int64 + ConcurrentWriters int CacheDir string CacheSizeMB int64 DataCenter string @@ -68,6 +69,9 @@ type WFS struct { chunkCache *chunk_cache.TieredChunkCache metaCache *meta_cache.MetaCache signature int32 + + // throttle writers + concurrentWriters *util.LimitedConcurrentExecutor } type statsCache struct { filer_pb.StatisticsResponse @@ -96,7 +100,7 @@ func NewSeaweedFileSystem(option *Option) *WFS { fsNode := wfs.fsNodeCache.GetFsNode(filePath) if fsNode != nil { if file, ok := fsNode.(*File); ok { - file.entry = nil + file.clearEntry() } } }) @@ -110,6 +114,10 @@ func NewSeaweedFileSystem(option *Option) *WFS { wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs, entry: entry} wfs.fsNodeCache = newFsCache(wfs.root) + if wfs.option.ConcurrentWriters > 0 { + wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters) + } + return wfs } diff --git a/weed/filesys/wfs_filer_client.go b/weed/filesys/wfs_filer_client.go index 096ee555f..dd76f5669 100644 --- a/weed/filesys/wfs_filer_client.go +++ b/weed/filesys/wfs_filer_client.go @@ -1,6 +1,7 @@ package filesys import ( + "github.com/chrislusf/seaweedfs/weed/util" "google.golang.org/grpc" "github.com/chrislusf/seaweedfs/weed/pb" @@ -11,10 +12,12 @@ var _ = filer_pb.FilerClient(&WFS{}) func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { - err := pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { - client := filer_pb.NewSeaweedFilerClient(grpcConnection) - return fn(client) - }, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption) + err := util.Retry("filer grpc", func() error { + return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + client := filer_pb.NewSeaweedFilerClient(grpcConnection) + return fn(client) + }, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption) + }) if err == nil { return nil diff --git a/weed/ftpd/ftp_server.go b/weed/ftpd/ftp_server.go new file mode 100644 index 000000000..4a0dca2c3 --- /dev/null +++ b/weed/ftpd/ftp_server.go @@ -0,0 +1,81 @@ +package ftpd + +import ( + "crypto/tls" + "errors" + "fmt" + "net" + + ftpserver "github.com/fclairamb/ftpserverlib" + "google.golang.org/grpc" +) + +type FtpServerOption struct { + Filer string + IP string + IpBind string + Port int + FilerGrpcAddress string + FtpRoot string + GrpcDialOption grpc.DialOption + PassivePortStart int + PassivePortStop int +} + +type SftpServer struct { + option *FtpServerOption + ftpListener net.Listener +} + +var _ = ftpserver.MainDriver(&SftpServer{}) + +// NewServer returns a new FTP server driver +func NewFtpServer(ftpListener net.Listener, option *FtpServerOption) (*SftpServer, error) { + var err error + server := &SftpServer{ + option: option, + ftpListener: ftpListener, + } + return server, err +} + +// GetSettings returns some general settings around the server setup +func (s *SftpServer) GetSettings() (*ftpserver.Settings, error) { + var portRange *ftpserver.PortRange + if s.option.PassivePortStart > 0 && s.option.PassivePortStop > s.option.PassivePortStart { + portRange = &ftpserver.PortRange{ + Start: s.option.PassivePortStart, + End: s.option.PassivePortStop, + } + } + + return &ftpserver.Settings{ + Listener: s.ftpListener, + ListenAddr: fmt.Sprintf("%s:%d", s.option.IpBind, s.option.Port), + PublicHost: s.option.IP, + PassiveTransferPortRange: portRange, + ActiveTransferPortNon20: true, + IdleTimeout: -1, + ConnectionTimeout: 20, + }, nil +} + +// ClientConnected is called to send the very first welcome message +func (s *SftpServer) ClientConnected(cc ftpserver.ClientContext) (string, error) { + return "Welcome to SeaweedFS FTP Server", nil +} + +// ClientDisconnected is called when the user disconnects, even if he never authenticated +func (s *SftpServer) ClientDisconnected(cc ftpserver.ClientContext) { +} + +// AuthUser authenticates the user and selects an handling driver +func (s *SftpServer) AuthUser(cc ftpserver.ClientContext, username, password string) (ftpserver.ClientDriver, error) { + return nil, nil +} + +// GetTLSConfig returns a TLS Certificate to use +// The certificate could frequently change if we use something like "let's encrypt" +func (s *SftpServer) GetTLSConfig() (*tls.Config, error) { + return nil, errors.New("no TLS certificate configured") +} diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go index a4148cb22..ac0b477cb 100644 --- a/weed/operation/upload_content.go +++ b/weed/operation/upload_content.go @@ -81,21 +81,18 @@ func doUpload(uploadUrl string, filename string, cipher bool, reader io.Reader, if ok { data = bytesReader.Bytes } else { - buf := bytebufferpool.Get() - _, err = buf.ReadFrom(reader) - defer bytebufferpool.Put(buf) + data, err = ioutil.ReadAll(reader) if err != nil { err = fmt.Errorf("read input: %v", err) return } - data = buf.Bytes() } uploadResult, uploadErr := retriedUploadData(uploadUrl, filename, cipher, data, isInputCompressed, mtype, pairMap, jwt) return uploadResult, uploadErr, data } func retriedUploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) { - for i := 0; i < 1; i++ { + for i := 0; i < 3; i++ { uploadResult, err = doUploadData(uploadUrl, filename, cipher, data, isInputCompressed, mtype, pairMap, jwt) if err == nil { return diff --git a/weed/pb/grpc_client_server.go b/weed/pb/grpc_client_server.go index ce706e282..f19af43b2 100644 --- a/weed/pb/grpc_client_server.go +++ b/weed/pb/grpc_client_server.go @@ -62,6 +62,7 @@ func GrpcDial(ctx context.Context, address string, opts ...grpc.DialOption) (*gr grpc.WithDefaultCallOptions( grpc.MaxCallSendMsgSize(Max_Message_Size), grpc.MaxCallRecvMsgSize(Max_Message_Size), + grpc.WaitForReady(true), ), grpc.WithKeepaliveParams(keepalive.ClientParameters{ Time: 30 * time.Second, // client ping server if no activity for this long diff --git a/weed/s3api/filer_util_tags.go b/weed/s3api/filer_util_tags.go index 3d4da7825..75d3b37d0 100644 --- a/weed/s3api/filer_util_tags.go +++ b/weed/s3api/filer_util_tags.go @@ -4,10 +4,11 @@ import ( "strings" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + xhttp "github.com/chrislusf/seaweedfs/weed/s3api/http" ) const ( - S3TAG_PREFIX = "s3-" + S3TAG_PREFIX = xhttp.AmzObjectTagging + "-" ) func (s3a *S3ApiServer) getTags(parentDirectoryPath string, entryName string) (tags map[string]string, err error) { diff --git a/weed/s3api/http/header.go b/weed/s3api/http/header.go new file mode 100644 index 000000000..2802b560f --- /dev/null +++ b/weed/s3api/http/header.go @@ -0,0 +1,30 @@ +/* + * MinIO Cloud Storage, (C) 2019 MinIO, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package http + +// Standard S3 HTTP request constants +const ( + // S3 storage class + AmzStorageClass = "x-amz-storage-class" + + // S3 user-defined metadata + AmzUserMetaPrefix = "X-Amz-Meta-" + + // S3 object tagging + AmzObjectTagging = "X-Amz-Tagging" + AmzTagCount = "x-amz-tagging-count" +) diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go index fa628f44e..fe134c102 100644 --- a/weed/s3api/s3api_object_handlers.go +++ b/weed/s3api/s3api_object_handlers.go @@ -266,7 +266,7 @@ func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, des resp, postErr := client.Do(proxyReq) - if resp.ContentLength == -1 && !strings.HasSuffix(destUrl, "/") { + if (resp.ContentLength == -1 || resp.StatusCode == 404) && !strings.HasSuffix(destUrl, "/") { writeErrorResponse(w, s3err.ErrNoSuchKey, r.URL) return } diff --git a/weed/s3api/s3api_objects_list_handlers.go b/weed/s3api/s3api_objects_list_handlers.go index 23406d6df..5d63f1039 100644 --- a/weed/s3api/s3api_objects_list_handlers.go +++ b/weed/s3api/s3api_objects_list_handlers.go @@ -4,7 +4,6 @@ import ( "context" "encoding/xml" "fmt" - "github.com/chrislusf/seaweedfs/weed/s3api/s3err" "io" "net/http" "net/url" @@ -15,6 +14,8 @@ import ( "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + xhttp "github.com/chrislusf/seaweedfs/weed/s3api/http" + "github.com/chrislusf/seaweedfs/weed/s3api/s3err" ) type ListBucketResultV2 struct { @@ -137,6 +138,10 @@ func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, m }) } } else { + storageClass := "STANDARD" + if v, ok := entry.Extended[xhttp.AmzStorageClass]; ok { + storageClass = string(v) + } contents = append(contents, ListEntry{ Key: fmt.Sprintf("%s/%s", dir, entry.Name)[len(bucketPrefix):], LastModified: time.Unix(entry.Attributes.Mtime, 0).UTC(), @@ -146,7 +151,7 @@ func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, m ID: fmt.Sprintf("%x", entry.Attributes.Uid), DisplayName: entry.Attributes.UserName, }, - StorageClass: "STANDARD", + StorageClass: StorageClass(storageClass), }) } }) diff --git a/weed/server/filer_server_handlers.go b/weed/server/filer_server_handlers.go index 18f78881c..555036feb 100644 --- a/weed/server/filer_server_handlers.go +++ b/weed/server/filer_server_handlers.go @@ -10,6 +10,10 @@ import ( func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) { w.Header().Set("Server", "SeaweedFS Filer "+util.VERSION) + if r.Header.Get("Origin") != "" { + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Credentials", "true") + } start := time.Now() switch r.Method { case "GET": @@ -32,11 +36,19 @@ func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) { stats.FilerRequestCounter.WithLabelValues("post").Inc() fs.PostHandler(w, r) stats.FilerRequestHistogram.WithLabelValues("post").Observe(time.Since(start).Seconds()) + case "OPTIONS": + stats.FilerRequestCounter.WithLabelValues("options").Inc() + OptionsHandler(w, r, false) + stats.FilerRequestHistogram.WithLabelValues("head").Observe(time.Since(start).Seconds()) } } func (fs *FilerServer) readonlyFilerHandler(w http.ResponseWriter, r *http.Request) { w.Header().Set("Server", "SeaweedFS Filer "+util.VERSION) + if r.Header.Get("Origin") != "" { + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Credentials", "true") + } start := time.Now() switch r.Method { case "GET": @@ -47,5 +59,18 @@ func (fs *FilerServer) readonlyFilerHandler(w http.ResponseWriter, r *http.Reque stats.FilerRequestCounter.WithLabelValues("head").Inc() fs.GetOrHeadHandler(w, r, false) stats.FilerRequestHistogram.WithLabelValues("head").Observe(time.Since(start).Seconds()) + case "OPTIONS": + stats.FilerRequestCounter.WithLabelValues("options").Inc() + OptionsHandler(w, r, true) + stats.FilerRequestHistogram.WithLabelValues("head").Observe(time.Since(start).Seconds()) + } +} + +func OptionsHandler(w http.ResponseWriter, r *http.Request, isReadOnly bool) { + if isReadOnly { + w.Header().Add("Access-Control-Allow-Methods", "GET, OPTIONS") + } else { + w.Header().Add("Access-Control-Allow-Methods", "PUT, POST, GET, DELETE, OPTIONS") } + w.Header().Add("Access-Control-Allow-Headers", "*") } diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go index fbd45d6b9..69d485e90 100644 --- a/weed/server/filer_server_handlers_read.go +++ b/weed/server/filer_server_handlers_read.go @@ -15,6 +15,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/images" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + xhttp "github.com/chrislusf/seaweedfs/weed/s3api/http" "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -93,6 +94,24 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, } } + // print out the header from extended properties + for k, v := range entry.Extended { + w.Header().Set(k, string(v)) + } + + //set tag count + if r.Method == "GET" { + tagCount := 0 + for k := range entry.Extended { + if strings.HasPrefix(k, xhttp.AmzObjectTagging+"-") { + tagCount++ + } + } + if tagCount > 0 { + w.Header().Set(xhttp.AmzTagCount, strconv.Itoa(tagCount)) + } + } + // set etag etag := filer.ETagEntry(entry) if inm := r.Header.Get("If-None-Match"); inm == "\""+etag+"\"" { diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index d86d49b2a..ee0af9aab 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -18,8 +18,10 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + xhttp "github.com/chrislusf/seaweedfs/weed/s3api/http" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/stats" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -176,6 +178,18 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa Size: chunkOffset, } + if entry.Extended == nil { + entry.Extended = make(map[string][]byte) + } + + fs.saveAmzMetaData(r, entry) + + for k, v := range r.Header { + if len(v) > 0 && strings.HasPrefix(k, needle.PairNamePrefix) { + entry.Extended[k] = []byte(v[0]) + } + } + if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil); dbErr != nil { fs.filer.DeleteChunks(entry.Chunks) replyerr = dbErr @@ -308,3 +322,27 @@ func (fs *FilerServer) mkdir(ctx context.Context, w http.ResponseWriter, r *http } return filerResult, replyerr } + +func (fs *FilerServer) saveAmzMetaData(r *http.Request, entry *filer.Entry) { + + if sc := r.Header.Get(xhttp.AmzStorageClass); sc != "" { + entry.Extended[xhttp.AmzStorageClass] = []byte(sc) + } + + if tags := r.Header.Get(xhttp.AmzObjectTagging); tags != "" { + for _, v := range strings.Split(tags, "&") { + tag := strings.Split(v, "=") + if len(tag) == 2 { + entry.Extended[xhttp.AmzObjectTagging+"-"+tag[0]] = []byte(tag[1]) + } + } + } + + for header, values := range r.Header { + if strings.HasPrefix(header, xhttp.AmzUserMetaPrefix) { + for _, value := range values { + entry.Extended[header] = []byte(value) + } + } + } +} diff --git a/weed/server/filer_ui/templates.go b/weed/server/filer_ui/templates.go index f86dde5b1..3f0647119 100644 --- a/weed/server/filer_ui/templates.go +++ b/weed/server/filer_ui/templates.go @@ -25,7 +25,7 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC <meta name="viewport" content="width=device-width, initial-scale=1"> <link rel="stylesheet" href="/seaweedfsstatic/bootstrap/3.3.1/css/bootstrap.min.css"> <style> -body { padding-bottom: 70px; } +body { padding-bottom: 128px; } #drop-area { border: 1px transparent; } diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go index 199f8faba..2f594fa2b 100644 --- a/weed/server/volume_grpc_client_to_master.go +++ b/weed/server/volume_grpc_client_to_master.go @@ -203,6 +203,7 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi } case <-volumeTickChan: glog.V(4).Infof("volume server %s:%d heartbeat", vs.store.Ip, vs.store.Port) + vs.store.MaybeAdjustVolumeMax() if err = stream.Send(vs.store.CollectHeartbeat()); err != nil { glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err) return "", err @@ -216,6 +217,23 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi case err = <-doneChan: return case <-vs.stopChan: + var volumeMessages []*master_pb.VolumeInformationMessage + emptyBeat := &master_pb.Heartbeat{ + Ip: vs.store.Ip, + Port: uint32(vs.store.Port), + PublicUrl: vs.store.PublicUrl, + MaxVolumeCount: uint32(0), + MaxFileKey: uint64(0), + DataCenter: vs.store.GetDataCenter(), + Rack: vs.store.GetRack(), + Volumes: volumeMessages, + HasNoVolumes: len(volumeMessages) == 0, + } + glog.V(1).Infof("volume server %s:%d stops and deletes all volumes", vs.store.Ip, vs.store.Port) + if err = stream.Send(emptyBeat); err != nil { + glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterNode, err) + return "", err + } return } } diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go index cd2b53c8a..2aecb140f 100644 --- a/weed/server/volume_grpc_copy.go +++ b/weed/server/volume_grpc_copy.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "io/ioutil" "math" "os" "time" @@ -60,13 +61,14 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo volumeFileName = storage.VolumeFileName(location.Directory, volFileInfoResp.Collection, int(req.VolumeId)) + ioutil.WriteFile(volumeFileName+".note", []byte(fmt.Sprintf("copying from %s", req.SourceDataNode)), 0755) + // println("source:", volFileInfoResp.String()) - // copy ecx file - if err := vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, volumeFileName, ".idx", false, false); err != nil { + if err := vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".dat", false, true); err != nil { return err } - if err := vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".dat", false, true); err != nil { + if err := vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, volumeFileName, ".idx", false, false); err != nil { return err } @@ -74,6 +76,8 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo return err } + os.Remove(volumeFileName + ".note") + return nil }) diff --git a/weed/server/volume_server_handlers.go b/weed/server/volume_server_handlers.go index ad13cdf3b..7852c950a 100644 --- a/weed/server/volume_server_handlers.go +++ b/weed/server/volume_server_handlers.go @@ -1,10 +1,11 @@ package weed_server import ( - "github.com/chrislusf/seaweedfs/weed/util" "net/http" "strings" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/stats" @@ -27,6 +28,10 @@ security settings: func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Request) { w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION) + if r.Header.Get("Origin") != "" { + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Credentials", "true") + } switch r.Method { case "GET", "HEAD": stats.ReadRequest() @@ -37,11 +42,19 @@ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Reque case "PUT", "POST": stats.WriteRequest() vs.guard.WhiteList(vs.PostHandler)(w, r) + case "OPTIONS": + stats.ReadRequest() + w.Header().Add("Access-Control-Allow-Methods", "PUT, POST, GET, DELETE, OPTIONS") + w.Header().Add("Access-Control-Allow-Headers", "*") } } func (vs *VolumeServer) publicReadOnlyHandler(w http.ResponseWriter, r *http.Request) { w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION) + if r.Header.Get("Origin") != "" { + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Credentials", "true") + } switch r.Method { case "GET": stats.ReadRequest() @@ -49,6 +62,10 @@ func (vs *VolumeServer) publicReadOnlyHandler(w http.ResponseWriter, r *http.Req case "HEAD": stats.ReadRequest() vs.GetOrHeadHandler(w, r) + case "OPTIONS": + stats.ReadRequest() + w.Header().Add("Access-Control-Allow-Methods", "GET, OPTIONS") + w.Header().Add("Access-Control-Allow-Headers", "*") } } diff --git a/weed/shell/command_volume_mark.go b/weed/shell/command_volume_mark.go new file mode 100644 index 000000000..19b614310 --- /dev/null +++ b/weed/shell/command_volume_mark.go @@ -0,0 +1,55 @@ +package shell + +import ( + "flag" + "fmt" + "io" + + "github.com/chrislusf/seaweedfs/weed/storage/needle" +) + +func init() { + Commands = append(Commands, &commandVolumeMark{}) +} + +type commandVolumeMark struct { +} + +func (c *commandVolumeMark) Name() string { + return "volume.mark" +} + +func (c *commandVolumeMark) Help() string { + return `Mark volume writable or readonly from one volume server + + volume.mark -node <volume server host:port> -volumeId <volume id> -writable or -readonly +` +} + +func (c *commandVolumeMark) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + if err = commandEnv.confirmIsLocked(); err != nil { + return + } + + volMarkCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + volumeIdInt := volMarkCommand.Int("volumeId", 0, "the volume id") + nodeStr := volMarkCommand.String("node", "", "the volume server <host>:<port>") + writable := volMarkCommand.Bool("writable", false, "volume mark writable") + readonly := volMarkCommand.Bool("readonly", false, "volume mark readonly") + if err = volMarkCommand.Parse(args); err != nil { + return nil + } + markWritable := false + if (*writable && *readonly) || (!*writable && !*readonly) { + return fmt.Errorf("use -readonly or -writable") + } else if *writable { + markWritable = true + } + + sourceVolumeServer := *nodeStr + + volumeId := needle.VolumeId(*volumeIdInt) + + return markVolumeWritable(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer, markWritable) +} diff --git a/weed/shell/command_volume_move.go b/weed/shell/command_volume_move.go index b136604e5..2bc8dfad8 100644 --- a/weed/shell/command_volume_move.go +++ b/weed/shell/command_volume_move.go @@ -166,3 +166,18 @@ func deleteVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sour return deleteErr }) } + +func markVolumeWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string, writable bool) (err error) { + return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + if writable { + _, err = volumeServerClient.VolumeMarkWritable(context.Background(), &volume_server_pb.VolumeMarkWritableRequest{ + VolumeId: uint32(volumeId), + }) + } else { + _, err = volumeServerClient.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{ + VolumeId: uint32(volumeId), + }) + } + return err + }) +} diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go index ed57aa54b..775ebf092 100644 --- a/weed/storage/disk_location.go +++ b/weed/storage/disk_location.go @@ -13,14 +13,16 @@ import ( "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/util" ) type DiskLocation struct { - Directory string - MaxVolumeCount int - MinFreeSpacePercent float32 - volumes map[needle.VolumeId]*Volume - volumesLock sync.RWMutex + Directory string + MaxVolumeCount int + OriginalMaxVolumeCount int + MinFreeSpacePercent float32 + volumes map[needle.VolumeId]*Volume + volumesLock sync.RWMutex // erasure coding ecVolumes map[needle.VolumeId]*erasure_coding.EcVolume @@ -30,7 +32,7 @@ type DiskLocation struct { } func NewDiskLocation(dir string, maxVolumeCount int, minFreeSpacePercent float32) *DiskLocation { - location := &DiskLocation{Directory: dir, MaxVolumeCount: maxVolumeCount, MinFreeSpacePercent: minFreeSpacePercent} + location := &DiskLocation{Directory: dir, MaxVolumeCount: maxVolumeCount, OriginalMaxVolumeCount: maxVolumeCount, MinFreeSpacePercent: minFreeSpacePercent} location.volumes = make(map[needle.VolumeId]*Volume) location.ecVolumes = make(map[needle.VolumeId]*erasure_coding.EcVolume) go location.CheckDiskSpace() @@ -60,6 +62,14 @@ func parseCollectionVolumeId(base string) (collection string, vid needle.VolumeI func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind NeedleMapType) bool { name := fileInfo.Name() if !fileInfo.IsDir() && strings.HasSuffix(name, ".idx") { + name := name[:len(name)-len(".idx")] + noteFile := l.Directory + "/" + name + ".note" + if util.FileExists(noteFile) { + note, _ := ioutil.ReadFile(noteFile) + glog.Warningf("volume %s was not completed: %s", name, string(note)) + removeVolumeFiles(l.Directory + "/" + name) + return false + } vid, collection, err := l.volumeIdFromPath(fileInfo) if err != nil { glog.Warningf("get volume id failed, %s, err : %s", name, err) @@ -85,7 +95,7 @@ func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind Ne size, _, _ := v.FileStat() glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", - l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), size, v.Ttl.String()) + l.Directory+"/"+name+".dat", v.ReplicaPlacement, v.Version(), size, v.Ttl.String()) return true } return false diff --git a/weed/storage/needle/needle_read_write.go b/weed/storage/needle/needle_read_write.go index 89fc85b0d..e758a6fee 100644 --- a/weed/storage/needle/needle_read_write.go +++ b/weed/storage/needle/needle_read_write.go @@ -24,6 +24,8 @@ const ( TtlBytesLength = 2 ) +var ErrorSizeMismatch = errors.New("size mismatch") + func (n *Needle) DiskSize(version Version) int64 { return GetActualSize(n.Size, version) } @@ -168,6 +170,11 @@ func ReadNeedleBlob(r backend.BackendStorageFile, offset int64, size Size, versi func (n *Needle) ReadBytes(bytes []byte, offset int64, size Size, version Version) (err error) { n.ParseNeedleHeader(bytes) if n.Size != size { + // cookie is not always passed in for this API. Use size to do preliminary checking. + if OffsetSize == 4 && offset < int64(MaxPossibleVolumeSize) { + glog.Errorf("entry not found1: offset %d found id %x size %d, expected size %d", offset, n.Id, n.Size, size) + return ErrorSizeMismatch + } return fmt.Errorf("entry not found: offset %d found id %x size %d, expected size %d", offset, n.Id, n.Size, size) } switch version { diff --git a/weed/storage/store.go b/weed/storage/store.go index b9fcfcba9..38f167cef 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -194,13 +194,19 @@ func (s *Store) SetDataCenter(dataCenter string) { func (s *Store) SetRack(rack string) { s.rack = rack } +func (s *Store) GetDataCenter() string { + return s.dataCenter +} +func (s *Store) GetRack() string { + return s.rack +} func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { var volumeMessages []*master_pb.VolumeInformationMessage maxVolumeCount := 0 var maxFileKey NeedleId collectionVolumeSize := make(map[string]uint64) - collectionVolumeReadOnlyCount := make(map[string]uint8) + collectionVolumeReadOnlyCount := make(map[string]map[string]uint8) for _, location := range s.Locations { var deleteVids []needle.VolumeId maxVolumeCount = maxVolumeCount + location.MaxVolumeCount @@ -220,11 +226,24 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { } } collectionVolumeSize[v.Collection] += volumeMessage.Size + if _, exist := collectionVolumeReadOnlyCount[v.Collection]; !exist { + collectionVolumeReadOnlyCount[v.Collection] = map[string]uint8{ + "IsReadOnly": 0, + "noWriteOrDelete": 0, + "noWriteCanDelete": 0, + "isDiskSpaceLow": 0, + } + } if v.IsReadOnly() { - collectionVolumeReadOnlyCount[v.Collection] += 1 - } else { - if _, exist := collectionVolumeReadOnlyCount[v.Collection]; !exist { - collectionVolumeReadOnlyCount[v.Collection] = 0 + collectionVolumeReadOnlyCount[v.Collection]["IsReadOnly"] += 1 + if v.noWriteOrDelete { + collectionVolumeReadOnlyCount[v.Collection]["noWriteOrDelete"] += 1 + } + if v.noWriteCanDelete { + collectionVolumeReadOnlyCount[v.Collection]["noWriteCanDelete"] += 1 + } + if v.location.isDiskSpaceLow { + collectionVolumeReadOnlyCount[v.Collection]["isDiskSpaceLow"] += 1 } } } @@ -251,8 +270,10 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { stats.VolumeServerDiskSizeGauge.WithLabelValues(col, "normal").Set(float64(size)) } - for col, count := range collectionVolumeReadOnlyCount { - stats.VolumeServerReadOnlyVolumeGauge.WithLabelValues(col, "normal").Set(float64(count)) + for col, types := range collectionVolumeReadOnlyCount { + for t, count := range types { + stats.VolumeServerReadOnlyVolumeGauge.WithLabelValues(col, t).Set(float64(count)) + } } return &master_pb.Heartbeat{ @@ -440,7 +461,8 @@ func (s *Store) GetVolumeSizeLimit() uint64 { func (s *Store) MaybeAdjustVolumeMax() (hasChanges bool) { volumeSizeLimit := s.GetVolumeSizeLimit() for _, diskLocation := range s.Locations { - if diskLocation.MaxVolumeCount == 0 { + if diskLocation.OriginalMaxVolumeCount == 0 { + currentMaxVolumeCount := diskLocation.MaxVolumeCount diskStatus := stats.NewDiskStatus(diskLocation.Directory) unusedSpace := diskLocation.UnUsedSpace(volumeSizeLimit) unclaimedSpaces := int64(diskStatus.Free) - int64(unusedSpace) @@ -452,7 +474,7 @@ func (s *Store) MaybeAdjustVolumeMax() (hasChanges bool) { diskLocation.MaxVolumeCount = maxVolumeCount glog.V(0).Infof("disk %s max %d unclaimedSpace:%dMB, unused:%dMB volumeSizeLimit:%dMB", diskLocation.Directory, maxVolumeCount, unclaimedSpaces/1024/1024, unusedSpace/1024/1024, volumeSizeLimit/1024/1024) - hasChanges = true + hasChanges = hasChanges || currentMaxVolumeCount != diskLocation.MaxVolumeCount } } return diff --git a/weed/storage/volume_checking.go b/weed/storage/volume_checking.go index e42fb238b..00e04047f 100644 --- a/weed/storage/volume_checking.go +++ b/weed/storage/volume_checking.go @@ -2,8 +2,10 @@ package storage import ( "fmt" + "io" "os" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage/backend" "github.com/chrislusf/seaweedfs/weed/storage/idx" "github.com/chrislusf/seaweedfs/weed/storage/needle" @@ -11,17 +13,40 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) -func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAtNs uint64, e error) { +func CheckAndFixVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAtNs uint64, err error) { var indexSize int64 - if indexSize, e = verifyIndexFileIntegrity(indexFile); e != nil { - return 0, fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", indexFile.Name(), e) + if indexSize, err = verifyIndexFileIntegrity(indexFile); err != nil { + return 0, fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", indexFile.Name(), err) } if indexSize == 0 { return 0, nil } + healthyIndexSize := indexSize + for i := 1; i <= 10 && indexSize >= int64(i)*NeedleMapEntrySize; i++ { + // check and fix last 10 entries + lastAppendAtNs, err = doCheckAndFixVolumeData(v, indexFile, indexSize-int64(i)*NeedleMapEntrySize) + if err == io.EOF { + healthyIndexSize = indexSize - int64(i)*NeedleMapEntrySize + continue + } + if err != ErrorSizeMismatch { + break + } + } + if healthyIndexSize < indexSize { + glog.Warningf("CheckAndFixVolumeDataIntegrity truncate idx file %s from %d to %d", indexFile.Name(), indexSize, healthyIndexSize) + err = indexFile.Truncate(healthyIndexSize) + if err != nil { + glog.Warningf("CheckAndFixVolumeDataIntegrity truncate idx file %s from %d to %d: %v", indexFile.Name(), indexSize, healthyIndexSize, err) + } + } + return +} + +func doCheckAndFixVolumeData(v *Volume, indexFile *os.File, indexOffset int64) (lastAppendAtNs uint64, err error) { var lastIdxEntry []byte - if lastIdxEntry, e = readIndexEntryAtOffset(indexFile, indexSize-NeedleMapEntrySize); e != nil { - return 0, fmt.Errorf("readLastIndexEntry %s failed: %v", indexFile.Name(), e) + if lastIdxEntry, err = readIndexEntryAtOffset(indexFile, indexOffset); err != nil { + return 0, fmt.Errorf("readLastIndexEntry %s failed: %v", indexFile.Name(), err) } key, offset, size := idx.IdxFileEntry(lastIdxEntry) if offset.IsZero() { @@ -29,15 +54,15 @@ func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAtNs uin } if size < 0 { // read the deletion entry - if lastAppendAtNs, e = verifyDeletedNeedleIntegrity(v.DataBackend, v.Version(), key); e != nil { - return lastAppendAtNs, fmt.Errorf("verifyNeedleIntegrity %s failed: %v", indexFile.Name(), e) + if lastAppendAtNs, err = verifyDeletedNeedleIntegrity(v.DataBackend, v.Version(), key); err != nil { + return lastAppendAtNs, fmt.Errorf("verifyNeedleIntegrity %s failed: %v", indexFile.Name(), err) } } else { - if lastAppendAtNs, e = verifyNeedleIntegrity(v.DataBackend, v.Version(), offset.ToAcutalOffset(), key, size); e != nil { - return lastAppendAtNs, fmt.Errorf("verifyNeedleIntegrity %s failed: %v", indexFile.Name(), e) + if lastAppendAtNs, err = verifyNeedleIntegrity(v.DataBackend, v.Version(), offset.ToAcutalOffset(), key, size); err != nil { + return lastAppendAtNs, err } } - return + return lastAppendAtNs, nil } func verifyIndexFileIntegrity(indexFile *os.File) (indexSize int64, err error) { @@ -60,7 +85,44 @@ func readIndexEntryAtOffset(indexFile *os.File, offset int64) (bytes []byte, err } func verifyNeedleIntegrity(datFile backend.BackendStorageFile, v needle.Version, offset int64, key NeedleId, size Size) (lastAppendAtNs uint64, err error) { - n := new(needle.Needle) + n, _, _, err := needle.ReadNeedleHeader(datFile, v, offset) + if err == io.EOF { + return 0, err + } + if err != nil { + return 0, fmt.Errorf("read %s at %d", datFile.Name(), offset) + } + if n.Size != size { + return 0, ErrorSizeMismatch + } + if v == needle.Version3 { + bytes := make([]byte, TimestampSize) + _, err = datFile.ReadAt(bytes, offset+NeedleHeaderSize+int64(size)+needle.NeedleChecksumSize) + if err == io.EOF { + return 0, err + } + if err != nil { + return 0, fmt.Errorf("verifyNeedleIntegrity check %s entry offset %d size %d: %v", datFile.Name(), offset, size, err) + } + n.AppendAtNs = util.BytesToUint64(bytes) + fileTailOffset := offset + needle.GetActualSize(size, v) + fileSize, _, err := datFile.GetStat() + if err != nil { + return 0, fmt.Errorf("stat file %s: %v", datFile.Name(), err) + } + if fileSize == fileTailOffset { + return n.AppendAtNs, nil + } + if fileSize > fileTailOffset { + glog.Warningf("Truncate %s from %d bytes to %d bytes!", datFile.Name(), fileSize, fileTailOffset) + err = datFile.Truncate(fileTailOffset) + if err == nil { + return n.AppendAtNs, nil + } + return n.AppendAtNs, fmt.Errorf("truncate file %s: %v", datFile.Name(), err) + } + glog.Warningf("data file %s has %d bytes, less than expected %d bytes!", datFile.Name(), fileSize, fileTailOffset) + } if err = n.ReadData(datFile, offset, size, v); err != nil { return n.AppendAtNs, fmt.Errorf("read data [%d,%d) : %v", offset, offset+int64(size), err) } diff --git a/weed/storage/volume_loading.go b/weed/storage/volume_loading.go index 73e2de02b..05684cbdb 100644 --- a/weed/storage/volume_loading.go +++ b/weed/storage/volume_loading.go @@ -89,7 +89,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, err) } } - if v.lastAppendAtNs, err = CheckVolumeDataIntegrity(v, indexFile); err != nil { + if v.lastAppendAtNs, err = CheckAndFixVolumeDataIntegrity(v, indexFile); err != nil { v.noWriteOrDelete = true glog.V(0).Infof("volumeDataIntegrityChecking failed %v", err) } diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go index 94c1d0ea1..869796a3f 100644 --- a/weed/storage/volume_read_write.go +++ b/weed/storage/volume_read_write.go @@ -17,6 +17,7 @@ import ( var ErrorNotFound = errors.New("not found") var ErrorDeleted = errors.New("already deleted") +var ErrorSizeMismatch = errors.New("size mismatch") // isFileUnchanged checks whether this needle to write is same as last one. // It requires serialized access in the same volume. @@ -55,16 +56,21 @@ func (v *Volume) Destroy() (err error) { } } v.Close() - os.Remove(v.FileName() + ".dat") - os.Remove(v.FileName() + ".idx") - os.Remove(v.FileName() + ".vif") - os.Remove(v.FileName() + ".sdx") - os.Remove(v.FileName() + ".cpd") - os.Remove(v.FileName() + ".cpx") - os.RemoveAll(v.FileName() + ".ldb") + removeVolumeFiles(v.FileName()) return } +func removeVolumeFiles(filename string) { + os.Remove(filename + ".dat") + os.Remove(filename + ".idx") + os.Remove(filename + ".vif") + os.Remove(filename + ".sdx") + os.Remove(filename + ".cpd") + os.Remove(filename + ".cpx") + os.RemoveAll(filename + ".ldb") + os.Remove(filename + ".note") +} + func (v *Volume) asyncRequestAppend(request *needle.AsyncRequest) { v.asyncRequestsChan <- request } @@ -274,6 +280,9 @@ func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption) (int, erro return 0, nil } err := n.ReadData(v.DataBackend, nv.Offset.ToAcutalOffset(), readSize, v.Version()) + if err == needle.ErrorSizeMismatch && OffsetSize == 4 { + err = n.ReadData(v.DataBackend, nv.Offset.ToAcutalOffset()+int64(MaxPossibleVolumeSize), readSize, v.Version()) + } if err != nil { return 0, err } diff --git a/weed/util/constants.go b/weed/util/constants.go index 177c20a60..498ef11a2 100644 --- a/weed/util/constants.go +++ b/weed/util/constants.go @@ -5,7 +5,7 @@ import ( ) var ( - VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 07) + VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 8) COMMIT = "" ) diff --git a/weed/util/retry.go b/weed/util/retry.go new file mode 100644 index 000000000..faaab0351 --- /dev/null +++ b/weed/util/retry.go @@ -0,0 +1,31 @@ +package util + +import ( + "strings" + "time" + + "github.com/chrislusf/seaweedfs/weed/glog" +) + +var RetryWaitTime = 6 * time.Second + +func Retry(name string, job func() error) (err error) { + waitTime := time.Second + hasErr := false + for waitTime < RetryWaitTime { + err = job() + if err == nil { + if hasErr { + glog.V(0).Infof("retry %s successfully", name) + } + break + } + if strings.Contains(err.Error(), "transport") { + hasErr = true + glog.V(0).Infof("retry %s", name) + time.Sleep(waitTime) + waitTime += waitTime / 2 + } + } + return err +} diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go index 60753e582..df8c186f2 100644 --- a/weed/wdclient/masterclient.go +++ b/weed/wdclient/masterclient.go @@ -5,6 +5,7 @@ import ( "math/rand" "time" + "github.com/chrislusf/seaweedfs/weed/util" "google.golang.org/grpc" "github.com/chrislusf/seaweedfs/weed/glog" @@ -150,10 +151,12 @@ func (mc *MasterClient) tryConnectToMaster(master string) (nextHintedLeader stri } func (mc *MasterClient) WithClient(fn func(client master_pb.SeaweedClient) error) error { - for mc.currentMaster == "" { - time.Sleep(3 * time.Second) - } - return pb.WithMasterClient(mc.currentMaster, mc.grpcDialOption, func(client master_pb.SeaweedClient) error { - return fn(client) + return util.Retry("master grpc", func() error { + for mc.currentMaster == "" { + time.Sleep(3 * time.Second) + } + return pb.WithMasterClient(mc.currentMaster, mc.grpcDialOption, func(client master_pb.SeaweedClient) error { + return fn(client) + }) }) } |
