diff options
42 files changed, 416 insertions, 166 deletions
@@ -56,7 +56,7 @@ require ( github.com/pquerna/cachecontrol v0.1.0 github.com/prometheus/client_golang v1.11.0 github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563 // indirect - github.com/seaweedfs/fuse v1.1.7 + github.com/seaweedfs/fuse v1.1.8 github.com/seaweedfs/goexif v1.0.2 github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e github.com/spaolacci/murmur3 v1.1.0 // indirect @@ -617,6 +617,8 @@ github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0 github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/seaweedfs/fuse v1.1.7 h1:T4L5c/Sn+q8lE+0zCmH2MWvIO+B5TttWOSqK5KQPRMQ= github.com/seaweedfs/fuse v1.1.7/go.mod h1:+PP6WlkrRUG6KPE+Th2EX5To/PjHaFsvqg/UgQ39aj8= +github.com/seaweedfs/fuse v1.1.8 h1:YFSDPotG4uhQzV7ooDUvQ8BRVy5rM1XCFPJAmAsZz68= +github.com/seaweedfs/fuse v1.1.8/go.mod h1:+PP6WlkrRUG6KPE+Th2EX5To/PjHaFsvqg/UgQ39aj8= github.com/seaweedfs/goexif v1.0.2 h1:p+rTXYdQ2mgxd+1JaTrQ9N8DvYuw9UH9xgYmJ+Bb29E= github.com/seaweedfs/goexif v1.0.2/go.mod h1:MrKs5LK0HXdffrdCZrW3OIMegL2xXpC6ThLyXMyjdrk= github.com/secsy/goftp v0.0.0-20190720192957-f31499d7c79a h1:C6IhVTxNkhlb0tlCB6JfHOUv1f0xHPK7V8X4HlJZEJw= diff --git a/k8s/seaweedfs/Chart.yaml b/k8s/seaweedfs/Chart.yaml index 0aebc1c84..7b779debe 100644 --- a/k8s/seaweedfs/Chart.yaml +++ b/k8s/seaweedfs/Chart.yaml @@ -1,5 +1,5 @@ apiVersion: v1 description: SeaweedFS name: seaweedfs -appVersion: "2.55" -version: "2.55" +appVersion: "2.56" +version: "2.56" diff --git a/k8s/seaweedfs/values.yaml b/k8s/seaweedfs/values.yaml index 072280f07..3b675eb41 100644 --- a/k8s/seaweedfs/values.yaml +++ b/k8s/seaweedfs/values.yaml @@ -4,7 +4,7 @@ global: registry: "" repository: "" imageName: chrislusf/seaweedfs - # imageTag: "2.55" - started using {.Chart.appVersion} + # imageTag: "2.56" - started using {.Chart.appVersion} imagePullPolicy: IfNotPresent imagePullSecrets: imagepullsecret restartPolicy: Always diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index cdbba7eb1..f49c1218f 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -208,6 +208,7 @@ message AtomicRenameEntryRequest { string old_name = 2; string new_directory = 3; string new_name = 4; + repeated int32 signatures = 5; } message AtomicRenameEntryResponse { diff --git a/test/data/187.idx b/test/data/187.idx Binary files differnew file mode 100644 index 000000000..d4cb42ef0 --- /dev/null +++ b/test/data/187.idx diff --git a/weed/Makefile b/weed/Makefile index edc0bf544..c82735a0e 100644 --- a/weed/Makefile +++ b/weed/Makefile @@ -16,7 +16,7 @@ debug_shell: debug_mount: go build -gcflags="all=-N -l" - dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- -v=4 mount -dir=~/tmp/mm -cacheCapacityMB=0 -filer.path=/buckets + dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- -v=4 mount -dir=~/tmp/mm -cacheCapacityMB=0 -filer.path=/ debug_server: go build -gcflags="all=-N -l" diff --git a/weed/command/fuse.go b/weed/command/fuse.go index 3949d8f70..609892b5b 100644 --- a/weed/command/fuse.go +++ b/weed/command/fuse.go @@ -2,10 +2,10 @@ package command import ( "fmt" - "strings" + "os" "strconv" + "strings" "time" - "os" ) func init() { @@ -13,7 +13,7 @@ func init() { } type parameter struct { - name string + name string value string } @@ -23,6 +23,7 @@ func runFuse(cmd *Command, args []string) bool { option := strings.Builder{} options := []parameter{} masterProcess := true + fusermountPath := "" // first parameter i := 0 @@ -41,7 +42,7 @@ func runFuse(cmd *Command, args []string) bool { option.Reset() } - // dash separator read option until next space + // dash separator read option until next space } else if rawArgs[i] == '-' { for i++; i < rawArgsLen && rawArgs[i] != ' '; i++ { option.WriteByte(rawArgs[i]) @@ -49,12 +50,12 @@ func runFuse(cmd *Command, args []string) bool { options = append(options, parameter{option.String(), "true"}) option.Reset() - // equal separator start option with pending value + // equal separator start option with pending value } else if rawArgs[i] == '=' { name := option.String() option.Reset() - for i++; i < rawArgsLen && rawArgs[i] != ','; i++ { + for i++; i < rawArgsLen && rawArgs[i] != ',' && rawArgs[i] != ' '; i++ { // double quote separator read option until next double quote if rawArgs[i] == '"' { for i++; i < rawArgsLen && rawArgs[i] != '"'; i++ { @@ -67,7 +68,7 @@ func runFuse(cmd *Command, args []string) bool { option.WriteByte(rawArgs[i]) } - // add chars before comma + // add chars before comma } else if rawArgs[i] != ' ' { option.WriteByte(rawArgs[i]) } @@ -76,12 +77,12 @@ func runFuse(cmd *Command, args []string) bool { options = append(options, parameter{name, option.String()}) option.Reset() - // comma separator just read current option + // comma separator just read current option } else if rawArgs[i] == ',' { options = append(options, parameter{option.String(), "true"}) option.Reset() - // what is not a separator fill option buffer + // what is not a separator fill option buffer } else { option.WriteByte(rawArgs[i]) } @@ -98,7 +99,7 @@ func runFuse(cmd *Command, args []string) bool { for i := 0; i < len(options); i++ { parameter := options[i] - switch parameter.name { + switch parameter.name { case "child": masterProcess = false case "arg0": @@ -187,17 +188,23 @@ func runFuse(cmd *Command, args []string) bool { } else { panic(fmt.Errorf("readRetryTime: %s", err)) } + case "fusermount.path": + fusermountPath = parameter.value } } // the master start the child, release it then finish himself if masterProcess { - arg0 := os.Args[0] + arg0, err := os.Executable() + if err != nil { + panic(err) + } + argv := append(os.Args, "-o", "child") - attr := os.ProcAttr{} - attr.Env = os.Environ() - + attr := os.ProcAttr{} + attr.Env = os.Environ() + child, err := os.StartProcess(arg0, argv, &attr) if err != nil { @@ -213,13 +220,23 @@ func runFuse(cmd *Command, args []string) bool { return true } + if fusermountPath != "" { + if err := os.Setenv("PATH", fusermountPath); err != nil { + panic(fmt.Errorf("setenv: %s", err)) + } + } else if os.Getenv("PATH") == "" { + if err := os.Setenv("PATH", "/bin:/sbin:/usr/bin:/usr/sbin"); err != nil { + panic(fmt.Errorf("setenv: %s", err)) + } + } + // just call "weed mount" command return runMount(cmdMount, []string{}) } var cmdFuse = &Command{ UsageLine: "fuse /mnt/mount/point -o \"filer=localhost:8888,filer.path=/\"", - Short: "Allow use weed with linux's mount command", + Short: "Allow use weed with linux's mount command", Long: `Allow use weed with linux's mount command You can use -t weed on mount command: diff --git a/weed/command/server.go b/weed/command/server.go index d2bd6466e..f6c033c64 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -107,7 +107,7 @@ func init() { serverOptions.v.indexType = cmdServer.Flag.String("volume.index", "memory", "Choose [memory|leveldb|leveldbMedium|leveldbLarge] mode for memory~performance balance.") serverOptions.v.diskType = cmdServer.Flag.String("volume.disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag") serverOptions.v.fixJpgOrientation = cmdServer.Flag.Bool("volume.images.fix.orientation", false, "Adjust jpg orientation when uploading.") - serverOptions.v.readRedirect = cmdServer.Flag.Bool("volume.read.redirect", true, "Redirect moved or non-local volumes.") + serverOptions.v.readMode = cmdServer.Flag.String("volume.readMode", "redirect", "[local|proxy|redirect] how to deal with non-local volume: 'not found|read in remote node|redirect volume location'.") serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second") serverOptions.v.fileSizeLimitMB = cmdServer.Flag.Int("volume.fileSizeLimitMB", 256, "limit file size to avoid out of memory") serverOptions.v.concurrentUploadLimitMB = cmdServer.Flag.Int("volume.concurrentUploadLimitMB", 64, "limit total concurrent upload size") diff --git a/weed/command/upload.go b/weed/command/upload.go index a102796b4..ccdec561f 100644 --- a/weed/command/upload.go +++ b/weed/command/upload.go @@ -110,7 +110,7 @@ func runUpload(cmd *Command, args []string) bool { }) if err != nil { fmt.Println(err.Error()) - return false; + return false } } else { parts, e := operation.NewFileParts(args) diff --git a/weed/command/volume.go b/weed/command/volume.go index 139a3791e..ced6ec414 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -51,7 +51,7 @@ type VolumeServerOptions struct { indexType *string diskType *string fixJpgOrientation *bool - readRedirect *bool + readMode *string cpuProfile *string memProfile *string compactionMBPerSecond *int @@ -80,7 +80,7 @@ func init() { v.indexType = cmdVolume.Flag.String("index", "memory", "Choose [memory|leveldb|leveldbMedium|leveldbLarge] mode for memory~performance balance.") v.diskType = cmdVolume.Flag.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag") v.fixJpgOrientation = cmdVolume.Flag.Bool("images.fix.orientation", false, "Adjust jpg orientation when uploading.") - v.readRedirect = cmdVolume.Flag.Bool("read.redirect", true, "Redirect moved or non-local volumes.") + v.readMode = cmdVolume.Flag.String("readMode", "redirect", "[local|proxy|redirect] how to deal with non-local volume: 'not found|proxy to remote node|redirect volume location'.") v.cpuProfile = cmdVolume.Flag.String("cpuprofile", "", "cpu profile output file") v.memProfile = cmdVolume.Flag.String("memprofile", "", "memory profile output file") v.compactionMBPerSecond = cmdVolume.Flag.Int("compactionMBps", 0, "limit background compaction or copying speed in mega bytes per second") @@ -228,7 +228,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v volumeNeedleMapKind, strings.Split(masters, ","), 5, *v.dataCenter, *v.rack, v.whiteList, - *v.fixJpgOrientation, *v.readRedirect, + *v.fixJpgOrientation, *v.readMode, *v.compactionMBPerSecond, *v.fileSizeLimitMB, int64(*v.concurrentUploadLimitMB)*1024*1024, diff --git a/weed/filer/sqlite/sqlite_store_unsupported.go b/weed/filer/sqlite/sqlite_store_unsupported.go index 9b7009df9..803c71afa 100644 --- a/weed/filer/sqlite/sqlite_store_unsupported.go +++ b/weed/filer/sqlite/sqlite_store_unsupported.go @@ -1,4 +1,4 @@ -// +build !linux,!darwin,!windows +// +build !linux,!darwin,!windows,!s390,!ppc64le,!mips64 // limited GOOS due to modernc.org/libc/unistd diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go index 1af868d58..9a791e013 100644 --- a/weed/filesys/dir.go +++ b/weed/filesys/dir.go @@ -141,7 +141,7 @@ func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest, var node fs.Node if isDirectory { node = dir.newDirectory(util.NewFullPath(dir.FullPath(), req.Name)) - return node, nil, nil + return node, node, nil } node = dir.newFile(req.Name) diff --git a/weed/filesys/dir_rename.go b/weed/filesys/dir_rename.go index d50c6dab0..27a14d5c6 100644 --- a/weed/filesys/dir_rename.go +++ b/weed/filesys/dir_rename.go @@ -2,6 +2,9 @@ package filesys import ( "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" + "math" "github.com/seaweedfs/fuse" "github.com/seaweedfs/fuse/fs" @@ -37,6 +40,7 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector OldName: req.OldName, NewDirectory: newDir.FullPath(), NewName: req.NewName, + Signatures: []int32{dir.wfs.signature}, } _, err := client.AtomicRenameEntry(ctx, request) @@ -53,34 +57,12 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector return fuse.EIO } - // TODO: replicate renaming logic on filer - if err := dir.wfs.metaCache.DeleteEntry(context.Background(), oldPath); err != nil { - glog.V(0).Infof("dir Rename delete local %s => %s : %v", oldPath, newPath, err) - return fuse.EIO - } - oldEntry.FullPath = newPath - if err := dir.wfs.metaCache.InsertEntry(context.Background(), oldEntry); err != nil { - glog.V(0).Infof("dir Rename insert local %s => %s : %v", oldPath, newPath, err) + err = dir.moveEntry(context.Background(), util.FullPath(dir.FullPath()), oldEntry, util.FullPath(newDir.FullPath()), req.NewName) + if err != nil { + glog.V(0).Infof("dir local Rename %s => %s : %v", oldPath, newPath, err) return fuse.EIO } - oldFsNode := NodeWithId(oldPath.AsInode()) - newFsNode := NodeWithId(newPath.AsInode()) - dir.wfs.Server.InvalidateInternalNode(oldFsNode, newFsNode, func(internalNode fs.Node) { - if file, ok := internalNode.(*File); ok { - glog.V(4).Infof("internal file node %s", file.Name) - file.Name = req.NewName - file.id = uint64(newFsNode) - file.dir = newDir - } - if dir, ok := internalNode.(*Dir); ok { - glog.V(4).Infof("internal dir node %s", dir.name) - dir.name = req.NewName - dir.id = uint64(newFsNode) - dir.parent = newDir - } - }) - // change file handle dir.wfs.handlesLock.Lock() defer dir.wfs.handlesLock.Unlock() @@ -96,3 +78,98 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector return nil } + +func (dir *Dir) moveEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string) error { + + oldName := entry.Name() + + if err := dir.moveSelfEntry(ctx, oldParent, entry, newParent, newName, func() error { + + oldFsNode := NodeWithId(oldParent.Child(oldName).AsInode()) + newFsNode := NodeWithId(newParent.Child(newName).AsInode()) + newDirNode, found := dir.wfs.Server.FindInternalNode(NodeWithId(newParent.AsInode())) + var newDir *Dir + if found { + newDir = newDirNode.(*Dir) + } + dir.wfs.Server.InvalidateInternalNode(oldFsNode, newFsNode, func(internalNode fs.Node) { + if file, ok := internalNode.(*File); ok { + glog.V(4).Infof("internal file node %s", oldParent.Child(oldName)) + file.Name = newName + file.id = uint64(newFsNode) + if found { + file.dir = newDir + } + } + if dir, ok := internalNode.(*Dir); ok { + glog.V(4).Infof("internal dir node %s", oldParent.Child(oldName)) + dir.name = newName + dir.id = uint64(newFsNode) + if found { + dir.parent = newDir + } + } + }) + + if entry.IsDirectory() { + if err := dir.moveFolderSubEntries(ctx, oldParent, oldName, newParent, newName); err != nil { + return err + } + } + return nil + }); err != nil { + return fmt.Errorf("fail to move %s => %s: %v", oldParent.Child(oldName), newParent.Child(newName), err) + } + + return nil +} + +func (dir *Dir) moveFolderSubEntries(ctx context.Context, oldParent util.FullPath, oldName string, newParent util.FullPath, newName string) error { + + currentDirPath := oldParent.Child(oldName) + newDirPath := newParent.Child(newName) + + glog.V(1).Infof("moving folder %s => %s", currentDirPath, newDirPath) + + var moveErr error + listErr := dir.wfs.metaCache.ListDirectoryEntries(ctx, currentDirPath, "", false, int64(math.MaxInt32), func(item *filer.Entry) bool { + moveErr = dir.moveEntry(ctx, currentDirPath, item, newDirPath, item.Name()) + if moveErr != nil { + return false + } + return true + }) + if listErr != nil { + return listErr + } + if moveErr != nil { + return moveErr + } + + return nil +} + +func (dir *Dir) moveSelfEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, moveFolderSubEntries func() error) error { + + newPath := newParent.Child(newName) + oldPath := oldParent.Child(entry.Name()) + + entry.FullPath = newPath + if err := dir.wfs.metaCache.InsertEntry(ctx, entry); err != nil { + glog.V(0).Infof("dir Rename insert local %s => %s : %v", oldPath, newPath, err) + return fuse.EIO + } + + if moveFolderSubEntries != nil { + if moveChildrenErr := moveFolderSubEntries(); moveChildrenErr != nil { + return moveChildrenErr + } + } + + if err := dir.wfs.metaCache.DeleteEntry(ctx, oldPath); err != nil { + glog.V(0).Infof("dir Rename delete local %s => %s : %v", oldPath, newPath, err) + return fuse.EIO + } + + return nil +} diff --git a/weed/filesys/dirty_pages_temp_file.go b/weed/filesys/dirty_pages_temp_file.go index 3826008b7..9fa7c0c8e 100644 --- a/weed/filesys/dirty_pages_temp_file.go +++ b/weed/filesys/dirty_pages_temp_file.go @@ -97,7 +97,7 @@ func (pages *TempFileDirtyPages) saveExistingPagesToStorage() { for _, list := range pages.writtenIntervals.lists { listStopOffset := list.Offset() + list.Size() - for uploadedOffset:=int64(0); uploadedOffset < listStopOffset; uploadedOffset += pageSize { + for uploadedOffset := int64(0); uploadedOffset < listStopOffset; uploadedOffset += pageSize { start, stop := max(list.Offset(), uploadedOffset), min(listStopOffset, uploadedOffset+pageSize) if start >= stop { continue diff --git a/weed/filesys/dirty_pages_temp_interval.go b/weed/filesys/dirty_pages_temp_interval.go index 2030178be..42c4b5a3b 100644 --- a/weed/filesys/dirty_pages_temp_interval.go +++ b/weed/filesys/dirty_pages_temp_interval.go @@ -54,7 +54,7 @@ func (list *WrittenIntervalLinkedList) ReadData(buf []byte, start, stop int64) { nodeStart, nodeStop := max(start, t.DataOffset), min(stop, t.DataOffset+t.Size) if nodeStart < nodeStop { // glog.V(4).Infof("copying start=%d stop=%d t=[%d,%d) => bufSize=%d nodeStart=%d, nodeStop=%d", start, stop, t.DataOffset, t.DataOffset+t.Size, len(buf), nodeStart, nodeStop) - list.tempFile.ReadAt(buf[nodeStart-start:nodeStop-start], t.TempOffset + nodeStart - t.DataOffset) + list.tempFile.ReadAt(buf[nodeStart-start:nodeStop-start], t.TempOffset+nodeStart-t.DataOffset) } if t.Next == nil { diff --git a/weed/filesys/file.go b/weed/filesys/file.go index c50ac0549..7f021619c 100644 --- a/weed/filesys/file.go +++ b/weed/filesys/file.go @@ -144,17 +144,17 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f file.dirtyMetadata = true } - if req.Valid.Mode() { + if req.Valid.Mode() && entry.Attributes.FileMode != uint32(req.Mode) { entry.Attributes.FileMode = uint32(req.Mode) file.dirtyMetadata = true } - if req.Valid.Uid() { + if req.Valid.Uid() && entry.Attributes.Uid != req.Uid { entry.Attributes.Uid = req.Uid file.dirtyMetadata = true } - if req.Valid.Gid() { + if req.Valid.Gid() && entry.Attributes.Gid != req.Gid { entry.Attributes.Gid = req.Gid file.dirtyMetadata = true } @@ -164,7 +164,7 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f file.dirtyMetadata = true } - if req.Valid.Mtime() { + if req.Valid.Mtime() && entry.Attributes.Mtime != req.Mtime.Unix() { entry.Attributes.Mtime = req.Mtime.Unix() file.dirtyMetadata = true } diff --git a/weed/filesys/meta_cache/meta_cache.go b/weed/filesys/meta_cache/meta_cache.go index 3f6391c39..69d1655ee 100644 --- a/weed/filesys/meta_cache/meta_cache.go +++ b/weed/filesys/meta_cache/meta_cache.go @@ -3,14 +3,12 @@ package meta_cache import ( "context" "fmt" - "os" - "sync" - "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/filer/leveldb" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util/bounded_tree" + "os" ) // need to have logic similar to FilerStoreWrapper @@ -18,7 +16,7 @@ import ( type MetaCache struct { localStore filer.VirtualFilerStore - sync.RWMutex + // sync.RWMutex visitedBoundary *bounded_tree.BoundedTree uidGidMapper *UidGidMapper invalidateFunc func(util.FullPath) @@ -54,8 +52,8 @@ func openMetaStore(dbFolder string) filer.VirtualFilerStore { } func (mc *MetaCache) InsertEntry(ctx context.Context, entry *filer.Entry) error { - mc.Lock() - defer mc.Unlock() + //mc.Lock() + //defer mc.Unlock() return mc.doInsertEntry(ctx, entry) } @@ -64,8 +62,8 @@ func (mc *MetaCache) doInsertEntry(ctx context.Context, entry *filer.Entry) erro } func (mc *MetaCache) AtomicUpdateEntryFromFiler(ctx context.Context, oldPath util.FullPath, newEntry *filer.Entry) error { - mc.Lock() - defer mc.Unlock() + //mc.Lock() + //defer mc.Unlock() oldDir, _ := oldPath.DirAndName() if mc.visitedBoundary.HasVisited(util.FullPath(oldDir)) { @@ -97,14 +95,14 @@ func (mc *MetaCache) AtomicUpdateEntryFromFiler(ctx context.Context, oldPath uti } func (mc *MetaCache) UpdateEntry(ctx context.Context, entry *filer.Entry) error { - mc.Lock() - defer mc.Unlock() + //mc.Lock() + //defer mc.Unlock() return mc.localStore.UpdateEntry(ctx, entry) } func (mc *MetaCache) FindEntry(ctx context.Context, fp util.FullPath) (entry *filer.Entry, err error) { - mc.RLock() - defer mc.RUnlock() + //mc.RLock() + //defer mc.RUnlock() entry, err = mc.localStore.FindEntry(ctx, fp) if err != nil { return nil, err @@ -114,14 +112,14 @@ func (mc *MetaCache) FindEntry(ctx context.Context, fp util.FullPath) (entry *fi } func (mc *MetaCache) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) { - mc.Lock() - defer mc.Unlock() + //mc.Lock() + //defer mc.Unlock() return mc.localStore.DeleteEntry(ctx, fp) } func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) error { - mc.RLock() - defer mc.RUnlock() + //mc.RLock() + //defer mc.RUnlock() if !mc.visitedBoundary.HasVisited(dirPath) { return fmt.Errorf("unsynchronized dir: %v", dirPath) @@ -138,8 +136,8 @@ func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.Full } func (mc *MetaCache) Shutdown() { - mc.Lock() - defer mc.Unlock() + //mc.Lock() + //defer mc.Unlock() mc.localStore.Shutdown() } diff --git a/weed/pb/filer.proto b/weed/pb/filer.proto index cdbba7eb1..f49c1218f 100644 --- a/weed/pb/filer.proto +++ b/weed/pb/filer.proto @@ -208,6 +208,7 @@ message AtomicRenameEntryRequest { string old_name = 2; string new_directory = 3; string new_name = 4; + repeated int32 signatures = 5; } message AtomicRenameEntryResponse { diff --git a/weed/pb/filer_pb/filer.pb.go b/weed/pb/filer_pb/filer.pb.go index 89fc448f4..2a7f3d041 100644 --- a/weed/pb/filer_pb/filer.pb.go +++ b/weed/pb/filer_pb/filer.pb.go @@ -1382,10 +1382,11 @@ type AtomicRenameEntryRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - OldDirectory string `protobuf:"bytes,1,opt,name=old_directory,json=oldDirectory,proto3" json:"old_directory,omitempty"` - OldName string `protobuf:"bytes,2,opt,name=old_name,json=oldName,proto3" json:"old_name,omitempty"` - NewDirectory string `protobuf:"bytes,3,opt,name=new_directory,json=newDirectory,proto3" json:"new_directory,omitempty"` - NewName string `protobuf:"bytes,4,opt,name=new_name,json=newName,proto3" json:"new_name,omitempty"` + OldDirectory string `protobuf:"bytes,1,opt,name=old_directory,json=oldDirectory,proto3" json:"old_directory,omitempty"` + OldName string `protobuf:"bytes,2,opt,name=old_name,json=oldName,proto3" json:"old_name,omitempty"` + NewDirectory string `protobuf:"bytes,3,opt,name=new_directory,json=newDirectory,proto3" json:"new_directory,omitempty"` + NewName string `protobuf:"bytes,4,opt,name=new_name,json=newName,proto3" json:"new_name,omitempty"` + Signatures []int32 `protobuf:"varint,5,rep,packed,name=signatures,proto3" json:"signatures,omitempty"` } func (x *AtomicRenameEntryRequest) Reset() { @@ -1448,6 +1449,13 @@ func (x *AtomicRenameEntryRequest) GetNewName() string { return "" } +func (x *AtomicRenameEntryRequest) GetSignatures() []int32 { + if x != nil { + return x.Signatures + } + return nil +} + type AtomicRenameEntryResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -3418,7 +3426,7 @@ var file_filer_proto_rawDesc = []byte{ 0x72, 0x65, 0x73, 0x22, 0x2b, 0x0a, 0x13, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, - 0x22, 0x9a, 0x01, 0x0a, 0x18, 0x41, 0x74, 0x6f, 0x6d, 0x69, 0x63, 0x52, 0x65, 0x6e, 0x61, 0x6d, + 0x22, 0xba, 0x01, 0x0a, 0x18, 0x41, 0x74, 0x6f, 0x6d, 0x69, 0x63, 0x52, 0x65, 0x6e, 0x61, 0x6d, 0x65, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x23, 0x0a, 0x0d, 0x6f, 0x6c, 0x64, 0x5f, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x6f, 0x6c, 0x64, 0x44, 0x69, 0x72, 0x65, 0x63, 0x74, 0x6f, @@ -3427,7 +3435,9 @@ var file_filer_proto_rawDesc = []byte{ 0x0d, 0x6e, 0x65, 0x77, 0x5f, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x6e, 0x65, 0x77, 0x44, 0x69, 0x72, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x79, 0x12, 0x19, 0x0a, 0x08, 0x6e, 0x65, 0x77, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x04, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6e, 0x65, 0x77, 0x4e, 0x61, 0x6d, 0x65, 0x22, 0x1b, 0x0a, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6e, 0x65, 0x77, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x1e, 0x0a, + 0x0a, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, + 0x05, 0x52, 0x0a, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x73, 0x22, 0x1b, 0x0a, 0x19, 0x41, 0x74, 0x6f, 0x6d, 0x69, 0x63, 0x52, 0x65, 0x6e, 0x61, 0x6d, 0x65, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0xec, 0x01, 0x0a, 0x13, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x56, 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, diff --git a/weed/pb/filer_pb/filer_client.go b/weed/pb/filer_pb/filer_client.go index 65bd85c84..12f918137 100644 --- a/weed/pb/filer_pb/filer_client.go +++ b/weed/pb/filer_pb/filer_client.go @@ -236,7 +236,7 @@ func Mkdir(filerClient FilerClient, parentDirectoryPath string, dirName string, }) } -func MkFile(filerClient FilerClient, parentDirectoryPath string, fileName string, chunks []*FileChunk) error { +func MkFile(filerClient FilerClient, parentDirectoryPath string, fileName string, chunks []*FileChunk, fn func(entry *Entry)) error { return filerClient.WithFilerClient(func(client SeaweedFilerClient) error { entry := &Entry{ @@ -252,6 +252,10 @@ func MkFile(filerClient FilerClient, parentDirectoryPath string, fileName string Chunks: chunks, } + if fn != nil { + fn(entry) + } + request := &CreateEntryRequest{ Directory: parentDirectoryPath, Entry: entry, diff --git a/weed/s3api/filer_multipart.go b/weed/s3api/filer_multipart.go index f882592c1..5d7bf2ac3 100644 --- a/weed/s3api/filer_multipart.go +++ b/weed/s3api/filer_multipart.go @@ -35,6 +35,9 @@ func (s3a *S3ApiServer) createMultipartUpload(input *s3.CreateMultipartUploadInp entry.Extended = make(map[string][]byte) } entry.Extended["key"] = []byte(*input.Key) + for k, v := range input.Metadata { + entry.Extended[k] = []byte(*v) + } }); err != nil { glog.Errorf("NewMultipartUpload error: %v", err) return nil, s3err.ErrInternalError @@ -68,6 +71,12 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa return nil, s3err.ErrNoSuchUpload } + pentry, err := s3a.getEntry(s3a.genUploadsFolder(*input.Bucket),*input.UploadId) + if err != nil { + glog.Errorf("completeMultipartUpload %s %s error: %v", *input.Bucket, *input.UploadId, err) + return nil, s3err.ErrNoSuchUpload + } + var finalParts []*filer_pb.FileChunk var offset int64 @@ -103,7 +112,16 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa dirName = dirName[:len(dirName)-1] } - err = s3a.mkFile(dirName, entryName, finalParts) + err = s3a.mkFile(dirName, entryName, finalParts,func(entry *filer_pb.Entry) { + if entry.Extended == nil { + entry.Extended = make(map[string][]byte) + } + for k,v := range pentry.Extended{ + if k != "key" { + entry.Extended[k] = v + } + } + }) if err != nil { glog.Errorf("completeMultipartUpload %s/%s error: %v", dirName, entryName, err) diff --git a/weed/s3api/filer_util.go b/weed/s3api/filer_util.go index 1803332a3..92267a154 100644 --- a/weed/s3api/filer_util.go +++ b/weed/s3api/filer_util.go @@ -15,9 +15,9 @@ func (s3a *S3ApiServer) mkdir(parentDirectoryPath string, dirName string, fn fun } -func (s3a *S3ApiServer) mkFile(parentDirectoryPath string, fileName string, chunks []*filer_pb.FileChunk) error { +func (s3a *S3ApiServer) mkFile(parentDirectoryPath string, fileName string, chunks []*filer_pb.FileChunk, fn func(entry *filer_pb.Entry)) error { - return filer_pb.MkFile(s3a, parentDirectoryPath, fileName, chunks) + return filer_pb.MkFile(s3a, parentDirectoryPath, fileName, chunks,fn) } diff --git a/weed/s3api/s3api_object_multipart_handlers.go b/weed/s3api/s3api_object_multipart_handlers.go index de3faaaaa..a4daea7a2 100644 --- a/weed/s3api/s3api_object_multipart_handlers.go +++ b/weed/s3api/s3api_object_multipart_handlers.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/s3api/s3err" + weed_server "github.com/chrislusf/seaweedfs/weed/server" "net/http" "net/url" "strconv" @@ -24,10 +25,18 @@ const ( func (s3a *S3ApiServer) NewMultipartUploadHandler(w http.ResponseWriter, r *http.Request) { bucket, object := getBucketAndObject(r) - response, errCode := s3a.createMultipartUpload(&s3.CreateMultipartUploadInput{ - Bucket: aws.String(bucket), - Key: objectKey(aws.String(object)), - }) + createMultipartUploadInput := &s3.CreateMultipartUploadInput{ + Bucket: aws.String(bucket), + Key: objectKey(aws.String(object)), + Metadata: make(map[string]*string), + } + + metadata := weed_server.SaveAmzMetaData(r, nil, false) + for k, v := range metadata { + createMultipartUploadInput.Metadata[k] = aws.String(string(v)) + } + + response, errCode := s3a.createMultipartUpload(createMultipartUploadInput) glog.V(2).Info("NewMultipartUploadHandler", s3err.EncodeXMLResponse(response), errCode) diff --git a/weed/server/common.go b/weed/server/common.go index 2e0ae4058..2cd2276eb 100644 --- a/weed/server/common.go +++ b/weed/server/common.go @@ -280,6 +280,7 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64 w.Header().Set("Content-Length", strconv.FormatInt(ra.length, 10)) w.Header().Set("Content-Range", ra.contentRange(totalSize)) + w.WriteHeader(http.StatusPartialContent) err = writeFn(w, ra.start, ra.length) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) diff --git a/weed/server/filer_grpc_server_rename.go b/weed/server/filer_grpc_server_rename.go index ba9f15370..8a11c91e3 100644 --- a/weed/server/filer_grpc_server_rename.go +++ b/weed/server/filer_grpc_server_rename.go @@ -33,7 +33,7 @@ func (fs *FilerServer) AtomicRenameEntry(ctx context.Context, req *filer_pb.Atom return nil, fmt.Errorf("%s/%s not found: %v", req.OldDirectory, req.OldName, err) } - moveErr := fs.moveEntry(ctx, oldParent, oldEntry, newParent, req.NewName) + moveErr := fs.moveEntry(ctx, oldParent, oldEntry, newParent, req.NewName, req.Signatures) if moveErr != nil { fs.filer.RollbackTransaction(ctx) return nil, fmt.Errorf("%s/%s move error: %v", req.OldDirectory, req.OldName, moveErr) @@ -47,23 +47,23 @@ func (fs *FilerServer) AtomicRenameEntry(ctx context.Context, req *filer_pb.Atom return &filer_pb.AtomicRenameEntryResponse{}, nil } -func (fs *FilerServer) moveEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string) error { +func (fs *FilerServer) moveEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, signatures []int32) error { if err := fs.moveSelfEntry(ctx, oldParent, entry, newParent, newName, func() error { if entry.IsDirectory() { - if err := fs.moveFolderSubEntries(ctx, oldParent, entry, newParent, newName); err != nil { + if err := fs.moveFolderSubEntries(ctx, oldParent, entry, newParent, newName, signatures); err != nil { return err } } return nil - }); err != nil { + }, signatures); err != nil { return fmt.Errorf("fail to move %s => %s: %v", oldParent.Child(entry.Name()), newParent.Child(newName), err) } return nil } -func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string) error { +func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, signatures []int32) error { currentDirPath := oldParent.Child(entry.Name()) newDirPath := newParent.Child(newName) @@ -84,7 +84,7 @@ func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util. for _, item := range entries { lastFileName = item.Name() // println("processing", lastFileName) - err := fs.moveEntry(ctx, currentDirPath, item, newDirPath, item.Name()) + err := fs.moveEntry(ctx, currentDirPath, item, newDirPath, item.Name(), signatures) if err != nil { return err } @@ -96,7 +96,7 @@ func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util. return nil } -func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, moveFolderSubEntries func() error) error { +func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, moveFolderSubEntries func() error, signatures []int32) error { oldPath, newPath := oldParent.Child(entry.Name()), newParent.Child(newName) @@ -115,7 +115,7 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPat Extended: entry.Extended, Content: entry.Content, } - if createErr := fs.filer.CreateEntry(ctx, newEntry, false, false, nil); createErr != nil { + if createErr := fs.filer.CreateEntry(ctx, newEntry, false, false, signatures); createErr != nil { return createErr } @@ -126,7 +126,7 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPat } // delete old entry - deleteErr := fs.filer.DeleteEntryMetaAndData(ctx, oldPath, false, false, false, false, nil) + deleteErr := fs.filer.DeleteEntryMetaAndData(ctx, oldPath, false, false, false, false, signatures) if deleteErr != nil { return deleteErr } diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go index 18505a95f..624069b7e 100644 --- a/weed/server/filer_grpc_server_sub_meta.go +++ b/weed/server/filer_grpc_server_sub_meta.go @@ -30,44 +30,49 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn) var processedTsNs int64 - var err error + var readPersistedLogErr error + var readInMemoryLogErr error for { glog.V(4).Infof("read on disk %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) - processedTsNs, err = fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn) - if err != nil { - return fmt.Errorf("reading from persisted logs: %v", err) + processedTsNs, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn) + if readPersistedLogErr != nil { + return fmt.Errorf("reading from persisted logs: %v", readPersistedLogErr) } if processedTsNs != 0 { lastReadTime = time.Unix(0, processedTsNs) + } else { + if readInMemoryLogErr == log_buffer.ResumeFromDiskError { + time.Sleep(1127 * time.Millisecond) + continue + } } glog.V(4).Infof("read in memory %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) - lastReadTime, err = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData("aggMeta:"+clientName, lastReadTime, func() bool { + lastReadTime, readInMemoryLogErr = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData("aggMeta:"+clientName, lastReadTime, func() bool { fs.filer.MetaAggregator.ListenersLock.Lock() fs.filer.MetaAggregator.ListenersCond.Wait() fs.filer.MetaAggregator.ListenersLock.Unlock() return true }, eachLogEntryFn) - if err != nil { - if err == log_buffer.ResumeFromDiskError { - time.Sleep(5127 * time.Millisecond) + if readInMemoryLogErr != nil { + if readInMemoryLogErr == log_buffer.ResumeFromDiskError { continue } - glog.Errorf("processed to %v: %v", lastReadTime, err) - if err != log_buffer.ResumeError { + glog.Errorf("processed to %v: %v", lastReadTime, readInMemoryLogErr) + if readInMemoryLogErr != log_buffer.ResumeError { break } } - time.Sleep(5127 * time.Millisecond) + time.Sleep(1127 * time.Millisecond) } - return err + return readInMemoryLogErr } @@ -87,41 +92,47 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn) var processedTsNs int64 - var err error + var readPersistedLogErr error + var readInMemoryLogErr error for { // println("reading from persisted logs ...") glog.V(0).Infof("read on disk %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) - processedTsNs, err = fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn) - if err != nil { - return fmt.Errorf("reading from persisted logs: %v", err) + processedTsNs, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn) + if readPersistedLogErr != nil { + return fmt.Errorf("reading from persisted logs: %v", readPersistedLogErr) } if processedTsNs != 0 { lastReadTime = time.Unix(0, processedTsNs) + } else { + if readInMemoryLogErr == log_buffer.ResumeFromDiskError { + time.Sleep(1127 * time.Millisecond) + continue + } } glog.V(0).Infof("read in memory %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) - lastReadTime, err = fs.filer.LocalMetaLogBuffer.LoopProcessLogData("localMeta:"+clientName, lastReadTime, func() bool { + lastReadTime, readInMemoryLogErr = fs.filer.LocalMetaLogBuffer.LoopProcessLogData("localMeta:"+clientName, lastReadTime, func() bool { fs.listenersLock.Lock() fs.listenersCond.Wait() fs.listenersLock.Unlock() return true }, eachLogEntryFn) - if err != nil { - if err == log_buffer.ResumeFromDiskError { + if readInMemoryLogErr != nil { + if readInMemoryLogErr == log_buffer.ResumeFromDiskError { continue } - glog.Errorf("processed to %v: %v", lastReadTime, err) - time.Sleep(3127 * time.Millisecond) - if err != log_buffer.ResumeError { + glog.Errorf("processed to %v: %v", lastReadTime, readInMemoryLogErr) + time.Sleep(1127 * time.Millisecond) + if readInMemoryLogErr != log_buffer.ResumeError { break } } } - return err + return readInMemoryLogErr } diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index dfb43c706..d7afaa65a 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -30,11 +30,11 @@ import ( _ "github.com/chrislusf/seaweedfs/weed/filer/mongodb" _ "github.com/chrislusf/seaweedfs/weed/filer/mysql" _ "github.com/chrislusf/seaweedfs/weed/filer/mysql2" - _ "github.com/chrislusf/seaweedfs/weed/filer/sqlite" _ "github.com/chrislusf/seaweedfs/weed/filer/postgres" _ "github.com/chrislusf/seaweedfs/weed/filer/postgres2" _ "github.com/chrislusf/seaweedfs/weed/filer/redis" _ "github.com/chrislusf/seaweedfs/weed/filer/redis2" + _ "github.com/chrislusf/seaweedfs/weed/filer/sqlite" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/notification" _ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs" diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index fcb92d8ec..a42e0fc97 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -214,10 +214,6 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa Size: int64(entry.FileSize), } - if entry.Extended == nil { - entry.Extended = make(map[string][]byte) - } - entry.Extended = SaveAmzMetaData(r, entry.Extended, false) for k, v := range r.Header { diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go index 3a4951cc5..4132ce690 100644 --- a/weed/server/master_grpc_server_volume.go +++ b/weed/server/master_grpc_server_volume.go @@ -143,7 +143,7 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest maxTimeout = time.Second * 10 startTime = time.Now() ) - + for time.Now().Sub(startTime) < maxTimeout { fid, count, dn, err := ms.Topo.PickForWrite(req.Count, option) if err == nil { diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 838803908..11dc95ded 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -97,7 +97,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste ms := &MasterServer{ option: option, preallocateSize: preallocateSize, - vgCh: make(chan *topology.VolumeGrowRequest, 1 << 6), + vgCh: make(chan *topology.VolumeGrowRequest, 1<<6), clientChans: make(map[string]chan *master_pb.VolumeLocation), grpcDialOption: grpcDialOption, MasterClient: wdclient.NewMasterClient(grpcDialOption, "master", option.Host, 0, "", peers), diff --git a/weed/server/master_server_handlers.go b/weed/server/master_server_handlers.go index 974b3308f..0609732c7 100644 --- a/weed/server/master_server_handlers.go +++ b/weed/server/master_server_handlers.go @@ -123,7 +123,7 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request) Count: writableVolumeCount, ErrCh: errCh, } - if err := <- errCh; err != nil { + if err := <-errCh; err != nil { writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("cannot grow volume group! %v", err)) return } diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go index f7359ea6b..0d765a253 100644 --- a/weed/server/volume_server.go +++ b/weed/server/volume_server.go @@ -28,7 +28,7 @@ type VolumeServer struct { needleMapKind storage.NeedleMapKind FixJpgOrientation bool - ReadRedirect bool + ReadMode string compactionBytePerSecond int64 metricsAddress string metricsIntervalSec int @@ -50,7 +50,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, dataCenter string, rack string, whiteList []string, fixJpgOrientation bool, - readRedirect bool, + readMode string, compactionMBPerSecond int, fileSizeLimitMB int, concurrentUploadLimit int64, @@ -72,7 +72,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, rack: rack, needleMapKind: needleMapKind, FixJpgOrientation: fixJpgOrientation, - ReadRedirect: readRedirect, + ReadMode: readMode, grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.volume"), compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024, fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024, diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go index 3e977cfd4..c5afd9545 100644 --- a/weed/server/volume_server_handlers_read.go +++ b/weed/server/volume_server_handlers_read.go @@ -58,14 +58,53 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) hasVolume := vs.store.HasVolume(volumeId) _, hasEcVolume := vs.store.FindEcVolume(volumeId) if !hasVolume && !hasEcVolume { - if !vs.ReadRedirect { - glog.V(2).Infoln("volume is not local:", err, r.URL.Path) + if vs.ReadMode == "local" { + glog.V(0).Infoln("volume is not local:", err, r.URL.Path) w.WriteHeader(http.StatusNotFound) return } lookupResult, err := operation.Lookup(vs.GetMaster, volumeId.String()) glog.V(2).Infoln("volume", volumeId, "found on", lookupResult, "error", err) - if err == nil && len(lookupResult.Locations) > 0 { + if err != nil || len(lookupResult.Locations) <= 0 { + glog.V(0).Infoln("lookup error:", err, r.URL.Path) + w.WriteHeader(http.StatusNotFound) + return + } + if vs.ReadMode == "proxy" { + // proxy client request to target server + u, _ := url.Parse(util.NormalizeUrl(lookupResult.Locations[0].Url)) + r.URL.Host = u.Host + r.URL.Scheme = u.Scheme + request, err := http.NewRequest("GET", r.URL.String(), nil) + if err != nil { + glog.V(0).Infof("failed to instance http request of url %s: %v", r.URL.String(), err) + w.WriteHeader(http.StatusInternalServerError) + return + } + for k, vv := range r.Header { + for _, v := range vv { + request.Header.Add(k, v) + } + } + + response, err := client.Do(request) + if err != nil { + glog.V(0).Infof("request remote url %s: %v", r.URL.String(), err) + w.WriteHeader(http.StatusInternalServerError) + return + } + defer util.CloseResponse(response) + // proxy target response to client + for k, vv := range response.Header { + for _, v := range vv { + w.Header().Add(k, v) + } + } + w.WriteHeader(response.StatusCode) + io.Copy(w, response.Body) + return + } else { + // redirect u, _ := url.Parse(util.NormalizeUrl(lookupResult.Locations[0].PublicUrl)) u.Path = fmt.Sprintf("%s/%s,%s", u.Path, vid, fid) arg := url.Values{} @@ -74,12 +113,8 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) } u.RawQuery = arg.Encode() http.Redirect(w, r, u.String(), http.StatusMovedPermanently) - - } else { - glog.V(2).Infoln("lookup error:", err, r.URL.Path) - w.WriteHeader(http.StatusNotFound) + return } - return } cookie := n.Cookie diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go index 400e96fe7..64389fdb5 100644 --- a/weed/shell/command_volume_fsck.go +++ b/weed/shell/command_volume_fsck.go @@ -62,7 +62,8 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io. fsckCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) verbose := fsckCommand.Bool("v", false, "verbose mode") - findMissingChunksInFiler := fsckCommand.Bool("findMissingChunksInFiler", false, "see help volume.fsck") + findMissingChunksInFiler := fsckCommand.Bool("findMissingChunksInFiler", false, "see \"help volume.fsck\"") + findMissingChunksInFilerPath := fsckCommand.String("findMissingChunksInFilerPath", "/", "used together with findMissingChunksInFiler") applyPurging := fsckCommand.Bool("reallyDeleteFromVolume", false, "<expert only> delete data not referenced by the filer") if err = fsckCommand.Parse(args); err != nil { return nil @@ -96,7 +97,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io. if *findMissingChunksInFiler { // collect all filer file ids and paths - if err = c.collectFilerFileIdAndPaths(volumeIdToVInfo, tempFolder, writer, *verbose, applyPurging); err != nil { + if err = c.collectFilerFileIdAndPaths(volumeIdToVInfo, tempFolder, writer, *findMissingChunksInFilerPath, *verbose, applyPurging); err != nil { return fmt.Errorf("collectFilerFileIdAndPaths: %v", err) } // for each volume, check filer file ids @@ -117,7 +118,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io. return nil } -func (c *commandVolumeFsck) collectFilerFileIdAndPaths(volumeIdToServer map[uint32]VInfo, tempFolder string, writer io.Writer, verbose bool, applyPurging *bool) error { +func (c *commandVolumeFsck) collectFilerFileIdAndPaths(volumeIdToServer map[uint32]VInfo, tempFolder string, writer io.Writer, filerPath string, verbose bool, applyPurging *bool) error { if verbose { fmt.Fprintf(writer, "checking each file from filer ...\n") @@ -143,22 +144,25 @@ func (c *commandVolumeFsck) collectFilerFileIdAndPaths(volumeIdToServer map[uint cookie uint32 path util.FullPath } - return doTraverseBfsAndSaving(c.env, nil, "/", false, func(outputChan chan interface{}) { - buffer := make([]byte, 8) + return doTraverseBfsAndSaving(c.env, nil, filerPath, false, func(outputChan chan interface{}) { + buffer := make([]byte, 16) for item := range outputChan { i := item.(*Item) if f, ok := files[i.vid]; ok { util.Uint64toBytes(buffer, i.fileKey) - f.Write(buffer) - util.Uint32toBytes(buffer, i.cookie) - util.Uint32toBytes(buffer[4:], uint32(len(i.path))) + util.Uint32toBytes(buffer[8:], i.cookie) + util.Uint32toBytes(buffer[12:], uint32(len(i.path))) f.Write(buffer) f.Write([]byte(i.path)) + // fmt.Fprintf(writer, "%d,%x%08x %d %s\n", i.vid, i.fileKey, i.cookie, len(i.path), i.path) } else { fmt.Fprintf(writer, "%d,%x%08x %s volume not found\n", i.vid, i.fileKey, i.cookie, i.path) } } }, func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) { + if verbose && entry.Entry.IsDirectory { + fmt.Fprintf(writer, "checking directory %s\n", util.NewFullPath(entry.Dir, entry.Entry.Name)) + } dChunks, mChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.Chunks) if resolveErr != nil { return nil @@ -317,6 +321,10 @@ func (c *commandVolumeFsck) collectFilerFileIds(tempFolder string, volumeIdToSer func (c *commandVolumeFsck) oneVolumeFileIdsCheckOneVolume(tempFolder string, volumeId uint32, writer io.Writer, verbose bool) (err error) { + if verbose { + fmt.Fprintf(writer, "find missing file chuns in volume %d ...\n", volumeId) + } + db := needle_map.NewMemDb() defer db.Close() @@ -342,7 +350,7 @@ func (c *commandVolumeFsck) oneVolumeFileIdsCheckOneVolume(tempFolder string, vo item := &Item{} var readSize int for { - readSize, err = br.Read(buffer) + readSize, err = io.ReadFull(br, buffer) if err != nil || readSize != 16 { if err == io.EOF { return nil @@ -355,11 +363,17 @@ func (c *commandVolumeFsck) oneVolumeFileIdsCheckOneVolume(tempFolder string, vo item.cookie = util.BytesToUint32(buffer[8:12]) pathSize := util.BytesToUint32(buffer[12:16]) pathBytes := make([]byte, int(pathSize)) - _, err = br.Read(pathBytes) + n, err := io.ReadFull(br, pathBytes) + if err != nil { + fmt.Fprintf(writer, "%d,%x%08x in unexpected error: %v\n", volumeId, item.fileKey, item.cookie, err) + } + if n != int(pathSize) { + fmt.Fprintf(writer, "%d,%x%08x %d unexpected file name size %d\n", volumeId, item.fileKey, item.cookie, pathSize, n) + } item.path = util.FullPath(string(pathBytes)) if _, found := db.Get(types.NeedleId(item.fileKey)); !found { - fmt.Fprintf(writer, "%d,%x%08x in %s not found\n", volumeId, item.fileKey, item.cookie, item.path) + fmt.Fprintf(writer, "%d,%x%08x in %s %d not found\n", volumeId, item.fileKey, item.cookie, item.path, pathSize) } } diff --git a/weed/storage/needle/needle_read_write.go b/weed/storage/needle/needle_read_write.go index d208404a8..84e00374a 100644 --- a/weed/storage/needle/needle_read_write.go +++ b/weed/storage/needle/needle_read_write.go @@ -52,7 +52,7 @@ func (n *Needle) prepareWriteBuffer(version Version, writeBytes *bytes.Buffer) ( writeBytes.Write(n.Data) padding := PaddingLength(n.Size, version) util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value()) - writeBytes.Write(header[0:NeedleChecksumSize+padding]) + writeBytes.Write(header[0 : NeedleChecksumSize+padding]) return size, actualSize, nil case Version2, Version3: header := make([]byte, NeedleHeaderSize+TimestampSize) // adding timestamp to reuse it and avoid extra allocation @@ -104,7 +104,7 @@ func (n *Needle) prepareWriteBuffer(version Version, writeBytes *bytes.Buffer) ( } if n.HasLastModifiedDate() { util.Uint64toBytes(header[0:8], n.LastModified) - writeBytes.Write(header[8-LastModifiedBytesLength:8]) + writeBytes.Write(header[8-LastModifiedBytesLength : 8]) } if n.HasTtl() && n.Ttl != nil { n.Ttl.ToBytes(header[0:TtlBytesLength]) @@ -119,11 +119,11 @@ func (n *Needle) prepareWriteBuffer(version Version, writeBytes *bytes.Buffer) ( padding := PaddingLength(n.Size, version) util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value()) if version == Version2 { - writeBytes.Write(header[0:NeedleChecksumSize+padding]) + writeBytes.Write(header[0 : NeedleChecksumSize+padding]) } else { // version3 util.Uint64toBytes(header[NeedleChecksumSize:NeedleChecksumSize+TimestampSize], n.AppendAtNs) - writeBytes.Write(header[0:NeedleChecksumSize+TimestampSize+padding]) + writeBytes.Write(header[0 : NeedleChecksumSize+TimestampSize+padding]) } return Size(n.DataSize), GetActualSize(n.Size, version), nil diff --git a/weed/storage/needle_map/compact_map.go b/weed/storage/needle_map/compact_map.go index 2b1a471bc..3d2047f99 100644 --- a/weed/storage/needle_map/compact_map.go +++ b/weed/storage/needle_map/compact_map.go @@ -64,11 +64,31 @@ func (cs *CompactSection) Set(key NeedleId, offset Offset, size Size) (oldOffset needOverflow := cs.counter >= batch needOverflow = needOverflow || cs.counter > 0 && cs.values[cs.counter-1].Key > skey if needOverflow { - //println("start", cs.start, "counter", cs.counter, "key", key) - if oldValueExtra, oldValue, found := cs.findOverflowEntry(skey); found { - oldOffset.OffsetHigher, oldOffset.OffsetLower, oldSize = oldValueExtra.OffsetHigher, oldValue.OffsetLower, oldValue.Size + lookBackIndex := cs.counter - 128 + if lookBackIndex < 0 { + lookBackIndex = 0 + } + if cs.counter < batch && cs.values[lookBackIndex].Key < skey { + // still has capacity and only partially out of order + p := &cs.values[cs.counter] + p.Key, cs.valuesExtra[cs.counter].OffsetHigher, p.OffsetLower, p.Size = skey, offset.OffsetHigher, offset.OffsetLower, size + //println("added index", cs.counter, "key", key, cs.values[cs.counter].Key) + for x := cs.counter - 1; x >= lookBackIndex; x-- { + if cs.values[x].Key > cs.values[x+1].Key { + cs.values[x], cs.values[x+1] = cs.values[x+1], cs.values[x] + cs.valuesExtra[x], cs.valuesExtra[x+1] = cs.valuesExtra[x+1], cs.valuesExtra[x] + } else { + break + } + } + cs.counter++ + } else { + //println("start", cs.start, "counter", cs.counter, "key", key) + if oldValueExtra, oldValue, found := cs.findOverflowEntry(skey); found { + oldOffset.OffsetHigher, oldOffset.OffsetLower, oldSize = oldValueExtra.OffsetHigher, oldValue.OffsetLower, oldValue.Size + } + cs.setOverflowEntry(skey, offset, size) } - cs.setOverflowEntry(skey, offset, size) } else { p := &cs.values[cs.counter] p.Key, cs.valuesExtra[cs.counter].OffsetHigher, p.OffsetLower, p.Size = skey, offset.OffsetHigher, offset.OffsetLower, size @@ -96,6 +116,7 @@ func (cs *CompactSection) setOverflowEntry(skey SectionalNeedleId, offset Offset cs.overflowExtra[i] = cs.overflowExtra[i-1] } cs.overflow[insertCandidate] = needleValue + cs.overflowExtra[insertCandidate] = needleValueExtra } } diff --git a/weed/storage/needle_map/compact_map_cases_test.go b/weed/storage/needle_map/compact_map_cases_test.go new file mode 100644 index 000000000..305925699 --- /dev/null +++ b/weed/storage/needle_map/compact_map_cases_test.go @@ -0,0 +1,33 @@ +// +build 5BytesOffset + +package needle_map + +import ( + "fmt" + "github.com/chrislusf/seaweedfs/weed/storage/types" + "github.com/stretchr/testify/assert" + "log" + "os" + "testing" +) + +func Test5bytesIndexLoading(t *testing.T) { + + indexFile, ie := os.OpenFile("../../../test/data/187.idx", os.O_RDWR|os.O_RDONLY, 0644) + if ie != nil { + log.Fatalln(ie) + } + defer indexFile.Close() + m, rowCount := loadNewNeedleMap(indexFile) + + println("total entries:", rowCount) + + key := types.NeedleId(0x671b905) // 108116229 + + needle, found := m.Get(types.NeedleId(0x671b905)) + + fmt.Printf("%v key:%v offset:%v size:%v\n", found, key, needle.Offset, needle.Size) + + assert.Equal(t, int64(12884911892)*8, needle.Offset.ToActualOffset(), "offset") + +} diff --git a/weed/topology/node.go b/weed/topology/node.go index 4556b4165..4772cb411 100644 --- a/weed/topology/node.go +++ b/weed/topology/node.go @@ -243,7 +243,7 @@ func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSi if v.Size >= volumeSizeLimit { //fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit) n.GetTopology().chanFullVolumes <- v - }else if float64(v.Size) > float64(volumeSizeLimit) * growThreshold { + } else if float64(v.Size) > float64(volumeSizeLimit)*growThreshold { n.GetTopology().chanCrowdedVolumes <- v } } diff --git a/weed/util/constants.go b/weed/util/constants.go index ac80b1737..4d7082af0 100644 --- a/weed/util/constants.go +++ b/weed/util/constants.go @@ -5,7 +5,7 @@ import ( ) var ( - VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 55) + VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 56) COMMIT = "" ) diff --git a/weed/util/grace/pprof.go b/weed/util/grace/pprof.go index 0406b762c..28bf6d553 100644 --- a/weed/util/grace/pprof.go +++ b/weed/util/grace/pprof.go @@ -21,21 +21,21 @@ func SetupProfiling(cpuProfile, memProfile string) { pprof.StopCPUProfile() // write block pprof - blockF, err := os.Create(cpuProfile+".block") + blockF, err := os.Create(cpuProfile + ".block") if err != nil { return } p := pprof.Lookup("block") - p.WriteTo(blockF,0) + p.WriteTo(blockF, 0) blockF.Close() // write mutex pprof - mutexF, err := os.Create(cpuProfile+".mutex") + mutexF, err := os.Create(cpuProfile + ".mutex") if err != nil { return } p = pprof.Lookup("mutex") - p.WriteTo(mutexF,0) + p.WriteTo(mutexF, 0) mutexF.Close() }) diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index 12840a88a..4742c2b7c 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -170,6 +170,8 @@ func (m *LogBuffer) copyToFlush() *dataToFlush { m.lastFlushTime = m.stopTime } m.buf = m.prevBuffers.SealBuffer(m.startTime, m.stopTime, m.buf, m.pos) + m.startTime = time.Unix(0,0) + m.stopTime = time.Unix(0,0) m.pos = 0 m.idx = m.idx[:0] return d |
