aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--go.mod2
-rw-r--r--go.sum2
-rw-r--r--k8s/seaweedfs/Chart.yaml4
-rw-r--r--k8s/seaweedfs/values.yaml2
-rw-r--r--other/java/client/src/main/proto/filer.proto1
-rw-r--r--test/data/187.idxbin0 -> 1028959 bytes
-rw-r--r--weed/Makefile2
-rw-r--r--weed/command/fuse.go47
-rw-r--r--weed/command/server.go2
-rw-r--r--weed/command/upload.go2
-rw-r--r--weed/command/volume.go6
-rw-r--r--weed/filer/sqlite/sqlite_store_unsupported.go2
-rw-r--r--weed/filesys/dir.go2
-rw-r--r--weed/filesys/dir_rename.go127
-rw-r--r--weed/filesys/dirty_pages_temp_file.go2
-rw-r--r--weed/filesys/dirty_pages_temp_interval.go2
-rw-r--r--weed/filesys/file.go8
-rw-r--r--weed/filesys/meta_cache/meta_cache.go34
-rw-r--r--weed/pb/filer.proto1
-rw-r--r--weed/pb/filer_pb/filer.pb.go22
-rw-r--r--weed/pb/filer_pb/filer_client.go6
-rw-r--r--weed/s3api/filer_multipart.go20
-rw-r--r--weed/s3api/filer_util.go4
-rw-r--r--weed/s3api/s3api_object_multipart_handlers.go17
-rw-r--r--weed/server/common.go1
-rw-r--r--weed/server/filer_grpc_server_rename.go18
-rw-r--r--weed/server/filer_grpc_server_sub_meta.go57
-rw-r--r--weed/server/filer_server.go2
-rw-r--r--weed/server/filer_server_handlers_write_autochunk.go4
-rw-r--r--weed/server/master_grpc_server_volume.go2
-rw-r--r--weed/server/master_server.go2
-rw-r--r--weed/server/master_server_handlers.go2
-rw-r--r--weed/server/volume_server.go6
-rw-r--r--weed/server/volume_server_handlers_read.go51
-rw-r--r--weed/shell/command_volume_fsck.go36
-rw-r--r--weed/storage/needle/needle_read_write.go8
-rw-r--r--weed/storage/needle_map/compact_map.go29
-rw-r--r--weed/storage/needle_map/compact_map_cases_test.go33
-rw-r--r--weed/topology/node.go2
-rw-r--r--weed/util/constants.go2
-rw-r--r--weed/util/grace/pprof.go8
-rw-r--r--weed/util/log_buffer/log_buffer.go2
42 files changed, 416 insertions, 166 deletions
diff --git a/go.mod b/go.mod
index 5510eee8e..84c03ebdc 100644
--- a/go.mod
+++ b/go.mod
@@ -56,7 +56,7 @@ require (
github.com/pquerna/cachecontrol v0.1.0
github.com/prometheus/client_golang v1.11.0
github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563 // indirect
- github.com/seaweedfs/fuse v1.1.7
+ github.com/seaweedfs/fuse v1.1.8
github.com/seaweedfs/goexif v1.0.2
github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e
github.com/spaolacci/murmur3 v1.1.0 // indirect
diff --git a/go.sum b/go.sum
index e0587fd9c..1911be8dd 100644
--- a/go.sum
+++ b/go.sum
@@ -617,6 +617,8 @@ github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/seaweedfs/fuse v1.1.7 h1:T4L5c/Sn+q8lE+0zCmH2MWvIO+B5TttWOSqK5KQPRMQ=
github.com/seaweedfs/fuse v1.1.7/go.mod h1:+PP6WlkrRUG6KPE+Th2EX5To/PjHaFsvqg/UgQ39aj8=
+github.com/seaweedfs/fuse v1.1.8 h1:YFSDPotG4uhQzV7ooDUvQ8BRVy5rM1XCFPJAmAsZz68=
+github.com/seaweedfs/fuse v1.1.8/go.mod h1:+PP6WlkrRUG6KPE+Th2EX5To/PjHaFsvqg/UgQ39aj8=
github.com/seaweedfs/goexif v1.0.2 h1:p+rTXYdQ2mgxd+1JaTrQ9N8DvYuw9UH9xgYmJ+Bb29E=
github.com/seaweedfs/goexif v1.0.2/go.mod h1:MrKs5LK0HXdffrdCZrW3OIMegL2xXpC6ThLyXMyjdrk=
github.com/secsy/goftp v0.0.0-20190720192957-f31499d7c79a h1:C6IhVTxNkhlb0tlCB6JfHOUv1f0xHPK7V8X4HlJZEJw=
diff --git a/k8s/seaweedfs/Chart.yaml b/k8s/seaweedfs/Chart.yaml
index 0aebc1c84..7b779debe 100644
--- a/k8s/seaweedfs/Chart.yaml
+++ b/k8s/seaweedfs/Chart.yaml
@@ -1,5 +1,5 @@
apiVersion: v1
description: SeaweedFS
name: seaweedfs
-appVersion: "2.55"
-version: "2.55"
+appVersion: "2.56"
+version: "2.56"
diff --git a/k8s/seaweedfs/values.yaml b/k8s/seaweedfs/values.yaml
index 072280f07..3b675eb41 100644
--- a/k8s/seaweedfs/values.yaml
+++ b/k8s/seaweedfs/values.yaml
@@ -4,7 +4,7 @@ global:
registry: ""
repository: ""
imageName: chrislusf/seaweedfs
- # imageTag: "2.55" - started using {.Chart.appVersion}
+ # imageTag: "2.56" - started using {.Chart.appVersion}
imagePullPolicy: IfNotPresent
imagePullSecrets: imagepullsecret
restartPolicy: Always
diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto
index cdbba7eb1..f49c1218f 100644
--- a/other/java/client/src/main/proto/filer.proto
+++ b/other/java/client/src/main/proto/filer.proto
@@ -208,6 +208,7 @@ message AtomicRenameEntryRequest {
string old_name = 2;
string new_directory = 3;
string new_name = 4;
+ repeated int32 signatures = 5;
}
message AtomicRenameEntryResponse {
diff --git a/test/data/187.idx b/test/data/187.idx
new file mode 100644
index 000000000..d4cb42ef0
--- /dev/null
+++ b/test/data/187.idx
Binary files differ
diff --git a/weed/Makefile b/weed/Makefile
index edc0bf544..c82735a0e 100644
--- a/weed/Makefile
+++ b/weed/Makefile
@@ -16,7 +16,7 @@ debug_shell:
debug_mount:
go build -gcflags="all=-N -l"
- dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- -v=4 mount -dir=~/tmp/mm -cacheCapacityMB=0 -filer.path=/buckets
+ dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- -v=4 mount -dir=~/tmp/mm -cacheCapacityMB=0 -filer.path=/
debug_server:
go build -gcflags="all=-N -l"
diff --git a/weed/command/fuse.go b/weed/command/fuse.go
index 3949d8f70..609892b5b 100644
--- a/weed/command/fuse.go
+++ b/weed/command/fuse.go
@@ -2,10 +2,10 @@ package command
import (
"fmt"
- "strings"
+ "os"
"strconv"
+ "strings"
"time"
- "os"
)
func init() {
@@ -13,7 +13,7 @@ func init() {
}
type parameter struct {
- name string
+ name string
value string
}
@@ -23,6 +23,7 @@ func runFuse(cmd *Command, args []string) bool {
option := strings.Builder{}
options := []parameter{}
masterProcess := true
+ fusermountPath := ""
// first parameter
i := 0
@@ -41,7 +42,7 @@ func runFuse(cmd *Command, args []string) bool {
option.Reset()
}
- // dash separator read option until next space
+ // dash separator read option until next space
} else if rawArgs[i] == '-' {
for i++; i < rawArgsLen && rawArgs[i] != ' '; i++ {
option.WriteByte(rawArgs[i])
@@ -49,12 +50,12 @@ func runFuse(cmd *Command, args []string) bool {
options = append(options, parameter{option.String(), "true"})
option.Reset()
- // equal separator start option with pending value
+ // equal separator start option with pending value
} else if rawArgs[i] == '=' {
name := option.String()
option.Reset()
- for i++; i < rawArgsLen && rawArgs[i] != ','; i++ {
+ for i++; i < rawArgsLen && rawArgs[i] != ',' && rawArgs[i] != ' '; i++ {
// double quote separator read option until next double quote
if rawArgs[i] == '"' {
for i++; i < rawArgsLen && rawArgs[i] != '"'; i++ {
@@ -67,7 +68,7 @@ func runFuse(cmd *Command, args []string) bool {
option.WriteByte(rawArgs[i])
}
- // add chars before comma
+ // add chars before comma
} else if rawArgs[i] != ' ' {
option.WriteByte(rawArgs[i])
}
@@ -76,12 +77,12 @@ func runFuse(cmd *Command, args []string) bool {
options = append(options, parameter{name, option.String()})
option.Reset()
- // comma separator just read current option
+ // comma separator just read current option
} else if rawArgs[i] == ',' {
options = append(options, parameter{option.String(), "true"})
option.Reset()
- // what is not a separator fill option buffer
+ // what is not a separator fill option buffer
} else {
option.WriteByte(rawArgs[i])
}
@@ -98,7 +99,7 @@ func runFuse(cmd *Command, args []string) bool {
for i := 0; i < len(options); i++ {
parameter := options[i]
- switch parameter.name {
+ switch parameter.name {
case "child":
masterProcess = false
case "arg0":
@@ -187,17 +188,23 @@ func runFuse(cmd *Command, args []string) bool {
} else {
panic(fmt.Errorf("readRetryTime: %s", err))
}
+ case "fusermount.path":
+ fusermountPath = parameter.value
}
}
// the master start the child, release it then finish himself
if masterProcess {
- arg0 := os.Args[0]
+ arg0, err := os.Executable()
+ if err != nil {
+ panic(err)
+ }
+
argv := append(os.Args, "-o", "child")
- attr := os.ProcAttr{}
- attr.Env = os.Environ()
-
+ attr := os.ProcAttr{}
+ attr.Env = os.Environ()
+
child, err := os.StartProcess(arg0, argv, &attr)
if err != nil {
@@ -213,13 +220,23 @@ func runFuse(cmd *Command, args []string) bool {
return true
}
+ if fusermountPath != "" {
+ if err := os.Setenv("PATH", fusermountPath); err != nil {
+ panic(fmt.Errorf("setenv: %s", err))
+ }
+ } else if os.Getenv("PATH") == "" {
+ if err := os.Setenv("PATH", "/bin:/sbin:/usr/bin:/usr/sbin"); err != nil {
+ panic(fmt.Errorf("setenv: %s", err))
+ }
+ }
+
// just call "weed mount" command
return runMount(cmdMount, []string{})
}
var cmdFuse = &Command{
UsageLine: "fuse /mnt/mount/point -o \"filer=localhost:8888,filer.path=/\"",
- Short: "Allow use weed with linux's mount command",
+ Short: "Allow use weed with linux's mount command",
Long: `Allow use weed with linux's mount command
You can use -t weed on mount command:
diff --git a/weed/command/server.go b/weed/command/server.go
index d2bd6466e..f6c033c64 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -107,7 +107,7 @@ func init() {
serverOptions.v.indexType = cmdServer.Flag.String("volume.index", "memory", "Choose [memory|leveldb|leveldbMedium|leveldbLarge] mode for memory~performance balance.")
serverOptions.v.diskType = cmdServer.Flag.String("volume.disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag")
serverOptions.v.fixJpgOrientation = cmdServer.Flag.Bool("volume.images.fix.orientation", false, "Adjust jpg orientation when uploading.")
- serverOptions.v.readRedirect = cmdServer.Flag.Bool("volume.read.redirect", true, "Redirect moved or non-local volumes.")
+ serverOptions.v.readMode = cmdServer.Flag.String("volume.readMode", "redirect", "[local|proxy|redirect] how to deal with non-local volume: 'not found|read in remote node|redirect volume location'.")
serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second")
serverOptions.v.fileSizeLimitMB = cmdServer.Flag.Int("volume.fileSizeLimitMB", 256, "limit file size to avoid out of memory")
serverOptions.v.concurrentUploadLimitMB = cmdServer.Flag.Int("volume.concurrentUploadLimitMB", 64, "limit total concurrent upload size")
diff --git a/weed/command/upload.go b/weed/command/upload.go
index a102796b4..ccdec561f 100644
--- a/weed/command/upload.go
+++ b/weed/command/upload.go
@@ -110,7 +110,7 @@ func runUpload(cmd *Command, args []string) bool {
})
if err != nil {
fmt.Println(err.Error())
- return false;
+ return false
}
} else {
parts, e := operation.NewFileParts(args)
diff --git a/weed/command/volume.go b/weed/command/volume.go
index 139a3791e..ced6ec414 100644
--- a/weed/command/volume.go
+++ b/weed/command/volume.go
@@ -51,7 +51,7 @@ type VolumeServerOptions struct {
indexType *string
diskType *string
fixJpgOrientation *bool
- readRedirect *bool
+ readMode *string
cpuProfile *string
memProfile *string
compactionMBPerSecond *int
@@ -80,7 +80,7 @@ func init() {
v.indexType = cmdVolume.Flag.String("index", "memory", "Choose [memory|leveldb|leveldbMedium|leveldbLarge] mode for memory~performance balance.")
v.diskType = cmdVolume.Flag.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag")
v.fixJpgOrientation = cmdVolume.Flag.Bool("images.fix.orientation", false, "Adjust jpg orientation when uploading.")
- v.readRedirect = cmdVolume.Flag.Bool("read.redirect", true, "Redirect moved or non-local volumes.")
+ v.readMode = cmdVolume.Flag.String("readMode", "redirect", "[local|proxy|redirect] how to deal with non-local volume: 'not found|proxy to remote node|redirect volume location'.")
v.cpuProfile = cmdVolume.Flag.String("cpuprofile", "", "cpu profile output file")
v.memProfile = cmdVolume.Flag.String("memprofile", "", "memory profile output file")
v.compactionMBPerSecond = cmdVolume.Flag.Int("compactionMBps", 0, "limit background compaction or copying speed in mega bytes per second")
@@ -228,7 +228,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
volumeNeedleMapKind,
strings.Split(masters, ","), 5, *v.dataCenter, *v.rack,
v.whiteList,
- *v.fixJpgOrientation, *v.readRedirect,
+ *v.fixJpgOrientation, *v.readMode,
*v.compactionMBPerSecond,
*v.fileSizeLimitMB,
int64(*v.concurrentUploadLimitMB)*1024*1024,
diff --git a/weed/filer/sqlite/sqlite_store_unsupported.go b/weed/filer/sqlite/sqlite_store_unsupported.go
index 9b7009df9..803c71afa 100644
--- a/weed/filer/sqlite/sqlite_store_unsupported.go
+++ b/weed/filer/sqlite/sqlite_store_unsupported.go
@@ -1,4 +1,4 @@
-// +build !linux,!darwin,!windows
+// +build !linux,!darwin,!windows,!s390,!ppc64le,!mips64
// limited GOOS due to modernc.org/libc/unistd
diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go
index 1af868d58..9a791e013 100644
--- a/weed/filesys/dir.go
+++ b/weed/filesys/dir.go
@@ -141,7 +141,7 @@ func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest,
var node fs.Node
if isDirectory {
node = dir.newDirectory(util.NewFullPath(dir.FullPath(), req.Name))
- return node, nil, nil
+ return node, node, nil
}
node = dir.newFile(req.Name)
diff --git a/weed/filesys/dir_rename.go b/weed/filesys/dir_rename.go
index d50c6dab0..27a14d5c6 100644
--- a/weed/filesys/dir_rename.go
+++ b/weed/filesys/dir_rename.go
@@ -2,6 +2,9 @@ package filesys
import (
"context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "math"
"github.com/seaweedfs/fuse"
"github.com/seaweedfs/fuse/fs"
@@ -37,6 +40,7 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector
OldName: req.OldName,
NewDirectory: newDir.FullPath(),
NewName: req.NewName,
+ Signatures: []int32{dir.wfs.signature},
}
_, err := client.AtomicRenameEntry(ctx, request)
@@ -53,34 +57,12 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector
return fuse.EIO
}
- // TODO: replicate renaming logic on filer
- if err := dir.wfs.metaCache.DeleteEntry(context.Background(), oldPath); err != nil {
- glog.V(0).Infof("dir Rename delete local %s => %s : %v", oldPath, newPath, err)
- return fuse.EIO
- }
- oldEntry.FullPath = newPath
- if err := dir.wfs.metaCache.InsertEntry(context.Background(), oldEntry); err != nil {
- glog.V(0).Infof("dir Rename insert local %s => %s : %v", oldPath, newPath, err)
+ err = dir.moveEntry(context.Background(), util.FullPath(dir.FullPath()), oldEntry, util.FullPath(newDir.FullPath()), req.NewName)
+ if err != nil {
+ glog.V(0).Infof("dir local Rename %s => %s : %v", oldPath, newPath, err)
return fuse.EIO
}
- oldFsNode := NodeWithId(oldPath.AsInode())
- newFsNode := NodeWithId(newPath.AsInode())
- dir.wfs.Server.InvalidateInternalNode(oldFsNode, newFsNode, func(internalNode fs.Node) {
- if file, ok := internalNode.(*File); ok {
- glog.V(4).Infof("internal file node %s", file.Name)
- file.Name = req.NewName
- file.id = uint64(newFsNode)
- file.dir = newDir
- }
- if dir, ok := internalNode.(*Dir); ok {
- glog.V(4).Infof("internal dir node %s", dir.name)
- dir.name = req.NewName
- dir.id = uint64(newFsNode)
- dir.parent = newDir
- }
- })
-
// change file handle
dir.wfs.handlesLock.Lock()
defer dir.wfs.handlesLock.Unlock()
@@ -96,3 +78,98 @@ func (dir *Dir) Rename(ctx context.Context, req *fuse.RenameRequest, newDirector
return nil
}
+
+func (dir *Dir) moveEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string) error {
+
+ oldName := entry.Name()
+
+ if err := dir.moveSelfEntry(ctx, oldParent, entry, newParent, newName, func() error {
+
+ oldFsNode := NodeWithId(oldParent.Child(oldName).AsInode())
+ newFsNode := NodeWithId(newParent.Child(newName).AsInode())
+ newDirNode, found := dir.wfs.Server.FindInternalNode(NodeWithId(newParent.AsInode()))
+ var newDir *Dir
+ if found {
+ newDir = newDirNode.(*Dir)
+ }
+ dir.wfs.Server.InvalidateInternalNode(oldFsNode, newFsNode, func(internalNode fs.Node) {
+ if file, ok := internalNode.(*File); ok {
+ glog.V(4).Infof("internal file node %s", oldParent.Child(oldName))
+ file.Name = newName
+ file.id = uint64(newFsNode)
+ if found {
+ file.dir = newDir
+ }
+ }
+ if dir, ok := internalNode.(*Dir); ok {
+ glog.V(4).Infof("internal dir node %s", oldParent.Child(oldName))
+ dir.name = newName
+ dir.id = uint64(newFsNode)
+ if found {
+ dir.parent = newDir
+ }
+ }
+ })
+
+ if entry.IsDirectory() {
+ if err := dir.moveFolderSubEntries(ctx, oldParent, oldName, newParent, newName); err != nil {
+ return err
+ }
+ }
+ return nil
+ }); err != nil {
+ return fmt.Errorf("fail to move %s => %s: %v", oldParent.Child(oldName), newParent.Child(newName), err)
+ }
+
+ return nil
+}
+
+func (dir *Dir) moveFolderSubEntries(ctx context.Context, oldParent util.FullPath, oldName string, newParent util.FullPath, newName string) error {
+
+ currentDirPath := oldParent.Child(oldName)
+ newDirPath := newParent.Child(newName)
+
+ glog.V(1).Infof("moving folder %s => %s", currentDirPath, newDirPath)
+
+ var moveErr error
+ listErr := dir.wfs.metaCache.ListDirectoryEntries(ctx, currentDirPath, "", false, int64(math.MaxInt32), func(item *filer.Entry) bool {
+ moveErr = dir.moveEntry(ctx, currentDirPath, item, newDirPath, item.Name())
+ if moveErr != nil {
+ return false
+ }
+ return true
+ })
+ if listErr != nil {
+ return listErr
+ }
+ if moveErr != nil {
+ return moveErr
+ }
+
+ return nil
+}
+
+func (dir *Dir) moveSelfEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, moveFolderSubEntries func() error) error {
+
+ newPath := newParent.Child(newName)
+ oldPath := oldParent.Child(entry.Name())
+
+ entry.FullPath = newPath
+ if err := dir.wfs.metaCache.InsertEntry(ctx, entry); err != nil {
+ glog.V(0).Infof("dir Rename insert local %s => %s : %v", oldPath, newPath, err)
+ return fuse.EIO
+ }
+
+ if moveFolderSubEntries != nil {
+ if moveChildrenErr := moveFolderSubEntries(); moveChildrenErr != nil {
+ return moveChildrenErr
+ }
+ }
+
+ if err := dir.wfs.metaCache.DeleteEntry(ctx, oldPath); err != nil {
+ glog.V(0).Infof("dir Rename delete local %s => %s : %v", oldPath, newPath, err)
+ return fuse.EIO
+ }
+
+ return nil
+}
diff --git a/weed/filesys/dirty_pages_temp_file.go b/weed/filesys/dirty_pages_temp_file.go
index 3826008b7..9fa7c0c8e 100644
--- a/weed/filesys/dirty_pages_temp_file.go
+++ b/weed/filesys/dirty_pages_temp_file.go
@@ -97,7 +97,7 @@ func (pages *TempFileDirtyPages) saveExistingPagesToStorage() {
for _, list := range pages.writtenIntervals.lists {
listStopOffset := list.Offset() + list.Size()
- for uploadedOffset:=int64(0); uploadedOffset < listStopOffset; uploadedOffset += pageSize {
+ for uploadedOffset := int64(0); uploadedOffset < listStopOffset; uploadedOffset += pageSize {
start, stop := max(list.Offset(), uploadedOffset), min(listStopOffset, uploadedOffset+pageSize)
if start >= stop {
continue
diff --git a/weed/filesys/dirty_pages_temp_interval.go b/weed/filesys/dirty_pages_temp_interval.go
index 2030178be..42c4b5a3b 100644
--- a/weed/filesys/dirty_pages_temp_interval.go
+++ b/weed/filesys/dirty_pages_temp_interval.go
@@ -54,7 +54,7 @@ func (list *WrittenIntervalLinkedList) ReadData(buf []byte, start, stop int64) {
nodeStart, nodeStop := max(start, t.DataOffset), min(stop, t.DataOffset+t.Size)
if nodeStart < nodeStop {
// glog.V(4).Infof("copying start=%d stop=%d t=[%d,%d) => bufSize=%d nodeStart=%d, nodeStop=%d", start, stop, t.DataOffset, t.DataOffset+t.Size, len(buf), nodeStart, nodeStop)
- list.tempFile.ReadAt(buf[nodeStart-start:nodeStop-start], t.TempOffset + nodeStart - t.DataOffset)
+ list.tempFile.ReadAt(buf[nodeStart-start:nodeStop-start], t.TempOffset+nodeStart-t.DataOffset)
}
if t.Next == nil {
diff --git a/weed/filesys/file.go b/weed/filesys/file.go
index c50ac0549..7f021619c 100644
--- a/weed/filesys/file.go
+++ b/weed/filesys/file.go
@@ -144,17 +144,17 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f
file.dirtyMetadata = true
}
- if req.Valid.Mode() {
+ if req.Valid.Mode() && entry.Attributes.FileMode != uint32(req.Mode) {
entry.Attributes.FileMode = uint32(req.Mode)
file.dirtyMetadata = true
}
- if req.Valid.Uid() {
+ if req.Valid.Uid() && entry.Attributes.Uid != req.Uid {
entry.Attributes.Uid = req.Uid
file.dirtyMetadata = true
}
- if req.Valid.Gid() {
+ if req.Valid.Gid() && entry.Attributes.Gid != req.Gid {
entry.Attributes.Gid = req.Gid
file.dirtyMetadata = true
}
@@ -164,7 +164,7 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f
file.dirtyMetadata = true
}
- if req.Valid.Mtime() {
+ if req.Valid.Mtime() && entry.Attributes.Mtime != req.Mtime.Unix() {
entry.Attributes.Mtime = req.Mtime.Unix()
file.dirtyMetadata = true
}
diff --git a/weed/filesys/meta_cache/meta_cache.go b/weed/filesys/meta_cache/meta_cache.go
index 3f6391c39..69d1655ee 100644
--- a/weed/filesys/meta_cache/meta_cache.go
+++ b/weed/filesys/meta_cache/meta_cache.go
@@ -3,14 +3,12 @@ package meta_cache
import (
"context"
"fmt"
- "os"
- "sync"
-
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/filer/leveldb"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/util/bounded_tree"
+ "os"
)
// need to have logic similar to FilerStoreWrapper
@@ -18,7 +16,7 @@ import (
type MetaCache struct {
localStore filer.VirtualFilerStore
- sync.RWMutex
+ // sync.RWMutex
visitedBoundary *bounded_tree.BoundedTree
uidGidMapper *UidGidMapper
invalidateFunc func(util.FullPath)
@@ -54,8 +52,8 @@ func openMetaStore(dbFolder string) filer.VirtualFilerStore {
}
func (mc *MetaCache) InsertEntry(ctx context.Context, entry *filer.Entry) error {
- mc.Lock()
- defer mc.Unlock()
+ //mc.Lock()
+ //defer mc.Unlock()
return mc.doInsertEntry(ctx, entry)
}
@@ -64,8 +62,8 @@ func (mc *MetaCache) doInsertEntry(ctx context.Context, entry *filer.Entry) erro
}
func (mc *MetaCache) AtomicUpdateEntryFromFiler(ctx context.Context, oldPath util.FullPath, newEntry *filer.Entry) error {
- mc.Lock()
- defer mc.Unlock()
+ //mc.Lock()
+ //defer mc.Unlock()
oldDir, _ := oldPath.DirAndName()
if mc.visitedBoundary.HasVisited(util.FullPath(oldDir)) {
@@ -97,14 +95,14 @@ func (mc *MetaCache) AtomicUpdateEntryFromFiler(ctx context.Context, oldPath uti
}
func (mc *MetaCache) UpdateEntry(ctx context.Context, entry *filer.Entry) error {
- mc.Lock()
- defer mc.Unlock()
+ //mc.Lock()
+ //defer mc.Unlock()
return mc.localStore.UpdateEntry(ctx, entry)
}
func (mc *MetaCache) FindEntry(ctx context.Context, fp util.FullPath) (entry *filer.Entry, err error) {
- mc.RLock()
- defer mc.RUnlock()
+ //mc.RLock()
+ //defer mc.RUnlock()
entry, err = mc.localStore.FindEntry(ctx, fp)
if err != nil {
return nil, err
@@ -114,14 +112,14 @@ func (mc *MetaCache) FindEntry(ctx context.Context, fp util.FullPath) (entry *fi
}
func (mc *MetaCache) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) {
- mc.Lock()
- defer mc.Unlock()
+ //mc.Lock()
+ //defer mc.Unlock()
return mc.localStore.DeleteEntry(ctx, fp)
}
func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) error {
- mc.RLock()
- defer mc.RUnlock()
+ //mc.RLock()
+ //defer mc.RUnlock()
if !mc.visitedBoundary.HasVisited(dirPath) {
return fmt.Errorf("unsynchronized dir: %v", dirPath)
@@ -138,8 +136,8 @@ func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.Full
}
func (mc *MetaCache) Shutdown() {
- mc.Lock()
- defer mc.Unlock()
+ //mc.Lock()
+ //defer mc.Unlock()
mc.localStore.Shutdown()
}
diff --git a/weed/pb/filer.proto b/weed/pb/filer.proto
index cdbba7eb1..f49c1218f 100644
--- a/weed/pb/filer.proto
+++ b/weed/pb/filer.proto
@@ -208,6 +208,7 @@ message AtomicRenameEntryRequest {
string old_name = 2;
string new_directory = 3;
string new_name = 4;
+ repeated int32 signatures = 5;
}
message AtomicRenameEntryResponse {
diff --git a/weed/pb/filer_pb/filer.pb.go b/weed/pb/filer_pb/filer.pb.go
index 89fc448f4..2a7f3d041 100644
--- a/weed/pb/filer_pb/filer.pb.go
+++ b/weed/pb/filer_pb/filer.pb.go
@@ -1382,10 +1382,11 @@ type AtomicRenameEntryRequest struct {
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
- OldDirectory string `protobuf:"bytes,1,opt,name=old_directory,json=oldDirectory,proto3" json:"old_directory,omitempty"`
- OldName string `protobuf:"bytes,2,opt,name=old_name,json=oldName,proto3" json:"old_name,omitempty"`
- NewDirectory string `protobuf:"bytes,3,opt,name=new_directory,json=newDirectory,proto3" json:"new_directory,omitempty"`
- NewName string `protobuf:"bytes,4,opt,name=new_name,json=newName,proto3" json:"new_name,omitempty"`
+ OldDirectory string `protobuf:"bytes,1,opt,name=old_directory,json=oldDirectory,proto3" json:"old_directory,omitempty"`
+ OldName string `protobuf:"bytes,2,opt,name=old_name,json=oldName,proto3" json:"old_name,omitempty"`
+ NewDirectory string `protobuf:"bytes,3,opt,name=new_directory,json=newDirectory,proto3" json:"new_directory,omitempty"`
+ NewName string `protobuf:"bytes,4,opt,name=new_name,json=newName,proto3" json:"new_name,omitempty"`
+ Signatures []int32 `protobuf:"varint,5,rep,packed,name=signatures,proto3" json:"signatures,omitempty"`
}
func (x *AtomicRenameEntryRequest) Reset() {
@@ -1448,6 +1449,13 @@ func (x *AtomicRenameEntryRequest) GetNewName() string {
return ""
}
+func (x *AtomicRenameEntryRequest) GetSignatures() []int32 {
+ if x != nil {
+ return x.Signatures
+ }
+ return nil
+}
+
type AtomicRenameEntryResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
@@ -3418,7 +3426,7 @@ var file_filer_proto_rawDesc = []byte{
0x72, 0x65, 0x73, 0x22, 0x2b, 0x0a, 0x13, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x45, 0x6e, 0x74,
0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72,
0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72,
- 0x22, 0x9a, 0x01, 0x0a, 0x18, 0x41, 0x74, 0x6f, 0x6d, 0x69, 0x63, 0x52, 0x65, 0x6e, 0x61, 0x6d,
+ 0x22, 0xba, 0x01, 0x0a, 0x18, 0x41, 0x74, 0x6f, 0x6d, 0x69, 0x63, 0x52, 0x65, 0x6e, 0x61, 0x6d,
0x65, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x23, 0x0a,
0x0d, 0x6f, 0x6c, 0x64, 0x5f, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x79, 0x18, 0x01,
0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x6f, 0x6c, 0x64, 0x44, 0x69, 0x72, 0x65, 0x63, 0x74, 0x6f,
@@ -3427,7 +3435,9 @@ var file_filer_proto_rawDesc = []byte{
0x0d, 0x6e, 0x65, 0x77, 0x5f, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x79, 0x18, 0x03,
0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x6e, 0x65, 0x77, 0x44, 0x69, 0x72, 0x65, 0x63, 0x74, 0x6f,
0x72, 0x79, 0x12, 0x19, 0x0a, 0x08, 0x6e, 0x65, 0x77, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x04,
- 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6e, 0x65, 0x77, 0x4e, 0x61, 0x6d, 0x65, 0x22, 0x1b, 0x0a,
+ 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6e, 0x65, 0x77, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x1e, 0x0a,
+ 0x0a, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28,
+ 0x05, 0x52, 0x0a, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x74, 0x75, 0x72, 0x65, 0x73, 0x22, 0x1b, 0x0a,
0x19, 0x41, 0x74, 0x6f, 0x6d, 0x69, 0x63, 0x52, 0x65, 0x6e, 0x61, 0x6d, 0x65, 0x45, 0x6e, 0x74,
0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0xec, 0x01, 0x0a, 0x13, 0x41,
0x73, 0x73, 0x69, 0x67, 0x6e, 0x56, 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65,
diff --git a/weed/pb/filer_pb/filer_client.go b/weed/pb/filer_pb/filer_client.go
index 65bd85c84..12f918137 100644
--- a/weed/pb/filer_pb/filer_client.go
+++ b/weed/pb/filer_pb/filer_client.go
@@ -236,7 +236,7 @@ func Mkdir(filerClient FilerClient, parentDirectoryPath string, dirName string,
})
}
-func MkFile(filerClient FilerClient, parentDirectoryPath string, fileName string, chunks []*FileChunk) error {
+func MkFile(filerClient FilerClient, parentDirectoryPath string, fileName string, chunks []*FileChunk, fn func(entry *Entry)) error {
return filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
entry := &Entry{
@@ -252,6 +252,10 @@ func MkFile(filerClient FilerClient, parentDirectoryPath string, fileName string
Chunks: chunks,
}
+ if fn != nil {
+ fn(entry)
+ }
+
request := &CreateEntryRequest{
Directory: parentDirectoryPath,
Entry: entry,
diff --git a/weed/s3api/filer_multipart.go b/weed/s3api/filer_multipart.go
index f882592c1..5d7bf2ac3 100644
--- a/weed/s3api/filer_multipart.go
+++ b/weed/s3api/filer_multipart.go
@@ -35,6 +35,9 @@ func (s3a *S3ApiServer) createMultipartUpload(input *s3.CreateMultipartUploadInp
entry.Extended = make(map[string][]byte)
}
entry.Extended["key"] = []byte(*input.Key)
+ for k, v := range input.Metadata {
+ entry.Extended[k] = []byte(*v)
+ }
}); err != nil {
glog.Errorf("NewMultipartUpload error: %v", err)
return nil, s3err.ErrInternalError
@@ -68,6 +71,12 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa
return nil, s3err.ErrNoSuchUpload
}
+ pentry, err := s3a.getEntry(s3a.genUploadsFolder(*input.Bucket),*input.UploadId)
+ if err != nil {
+ glog.Errorf("completeMultipartUpload %s %s error: %v", *input.Bucket, *input.UploadId, err)
+ return nil, s3err.ErrNoSuchUpload
+ }
+
var finalParts []*filer_pb.FileChunk
var offset int64
@@ -103,7 +112,16 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa
dirName = dirName[:len(dirName)-1]
}
- err = s3a.mkFile(dirName, entryName, finalParts)
+ err = s3a.mkFile(dirName, entryName, finalParts,func(entry *filer_pb.Entry) {
+ if entry.Extended == nil {
+ entry.Extended = make(map[string][]byte)
+ }
+ for k,v := range pentry.Extended{
+ if k != "key" {
+ entry.Extended[k] = v
+ }
+ }
+ })
if err != nil {
glog.Errorf("completeMultipartUpload %s/%s error: %v", dirName, entryName, err)
diff --git a/weed/s3api/filer_util.go b/weed/s3api/filer_util.go
index 1803332a3..92267a154 100644
--- a/weed/s3api/filer_util.go
+++ b/weed/s3api/filer_util.go
@@ -15,9 +15,9 @@ func (s3a *S3ApiServer) mkdir(parentDirectoryPath string, dirName string, fn fun
}
-func (s3a *S3ApiServer) mkFile(parentDirectoryPath string, fileName string, chunks []*filer_pb.FileChunk) error {
+func (s3a *S3ApiServer) mkFile(parentDirectoryPath string, fileName string, chunks []*filer_pb.FileChunk, fn func(entry *filer_pb.Entry)) error {
- return filer_pb.MkFile(s3a, parentDirectoryPath, fileName, chunks)
+ return filer_pb.MkFile(s3a, parentDirectoryPath, fileName, chunks,fn)
}
diff --git a/weed/s3api/s3api_object_multipart_handlers.go b/weed/s3api/s3api_object_multipart_handlers.go
index de3faaaaa..a4daea7a2 100644
--- a/weed/s3api/s3api_object_multipart_handlers.go
+++ b/weed/s3api/s3api_object_multipart_handlers.go
@@ -4,6 +4,7 @@ import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/s3api/s3err"
+ weed_server "github.com/chrislusf/seaweedfs/weed/server"
"net/http"
"net/url"
"strconv"
@@ -24,10 +25,18 @@ const (
func (s3a *S3ApiServer) NewMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
bucket, object := getBucketAndObject(r)
- response, errCode := s3a.createMultipartUpload(&s3.CreateMultipartUploadInput{
- Bucket: aws.String(bucket),
- Key: objectKey(aws.String(object)),
- })
+ createMultipartUploadInput := &s3.CreateMultipartUploadInput{
+ Bucket: aws.String(bucket),
+ Key: objectKey(aws.String(object)),
+ Metadata: make(map[string]*string),
+ }
+
+ metadata := weed_server.SaveAmzMetaData(r, nil, false)
+ for k, v := range metadata {
+ createMultipartUploadInput.Metadata[k] = aws.String(string(v))
+ }
+
+ response, errCode := s3a.createMultipartUpload(createMultipartUploadInput)
glog.V(2).Info("NewMultipartUploadHandler", s3err.EncodeXMLResponse(response), errCode)
diff --git a/weed/server/common.go b/weed/server/common.go
index 2e0ae4058..2cd2276eb 100644
--- a/weed/server/common.go
+++ b/weed/server/common.go
@@ -280,6 +280,7 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64
w.Header().Set("Content-Length", strconv.FormatInt(ra.length, 10))
w.Header().Set("Content-Range", ra.contentRange(totalSize))
+ w.WriteHeader(http.StatusPartialContent)
err = writeFn(w, ra.start, ra.length)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
diff --git a/weed/server/filer_grpc_server_rename.go b/weed/server/filer_grpc_server_rename.go
index ba9f15370..8a11c91e3 100644
--- a/weed/server/filer_grpc_server_rename.go
+++ b/weed/server/filer_grpc_server_rename.go
@@ -33,7 +33,7 @@ func (fs *FilerServer) AtomicRenameEntry(ctx context.Context, req *filer_pb.Atom
return nil, fmt.Errorf("%s/%s not found: %v", req.OldDirectory, req.OldName, err)
}
- moveErr := fs.moveEntry(ctx, oldParent, oldEntry, newParent, req.NewName)
+ moveErr := fs.moveEntry(ctx, oldParent, oldEntry, newParent, req.NewName, req.Signatures)
if moveErr != nil {
fs.filer.RollbackTransaction(ctx)
return nil, fmt.Errorf("%s/%s move error: %v", req.OldDirectory, req.OldName, moveErr)
@@ -47,23 +47,23 @@ func (fs *FilerServer) AtomicRenameEntry(ctx context.Context, req *filer_pb.Atom
return &filer_pb.AtomicRenameEntryResponse{}, nil
}
-func (fs *FilerServer) moveEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string) error {
+func (fs *FilerServer) moveEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, signatures []int32) error {
if err := fs.moveSelfEntry(ctx, oldParent, entry, newParent, newName, func() error {
if entry.IsDirectory() {
- if err := fs.moveFolderSubEntries(ctx, oldParent, entry, newParent, newName); err != nil {
+ if err := fs.moveFolderSubEntries(ctx, oldParent, entry, newParent, newName, signatures); err != nil {
return err
}
}
return nil
- }); err != nil {
+ }, signatures); err != nil {
return fmt.Errorf("fail to move %s => %s: %v", oldParent.Child(entry.Name()), newParent.Child(newName), err)
}
return nil
}
-func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string) error {
+func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, signatures []int32) error {
currentDirPath := oldParent.Child(entry.Name())
newDirPath := newParent.Child(newName)
@@ -84,7 +84,7 @@ func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util.
for _, item := range entries {
lastFileName = item.Name()
// println("processing", lastFileName)
- err := fs.moveEntry(ctx, currentDirPath, item, newDirPath, item.Name())
+ err := fs.moveEntry(ctx, currentDirPath, item, newDirPath, item.Name(), signatures)
if err != nil {
return err
}
@@ -96,7 +96,7 @@ func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util.
return nil
}
-func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, moveFolderSubEntries func() error) error {
+func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, moveFolderSubEntries func() error, signatures []int32) error {
oldPath, newPath := oldParent.Child(entry.Name()), newParent.Child(newName)
@@ -115,7 +115,7 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPat
Extended: entry.Extended,
Content: entry.Content,
}
- if createErr := fs.filer.CreateEntry(ctx, newEntry, false, false, nil); createErr != nil {
+ if createErr := fs.filer.CreateEntry(ctx, newEntry, false, false, signatures); createErr != nil {
return createErr
}
@@ -126,7 +126,7 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPat
}
// delete old entry
- deleteErr := fs.filer.DeleteEntryMetaAndData(ctx, oldPath, false, false, false, false, nil)
+ deleteErr := fs.filer.DeleteEntryMetaAndData(ctx, oldPath, false, false, false, false, signatures)
if deleteErr != nil {
return deleteErr
}
diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go
index 18505a95f..624069b7e 100644
--- a/weed/server/filer_grpc_server_sub_meta.go
+++ b/weed/server/filer_grpc_server_sub_meta.go
@@ -30,44 +30,49 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn)
var processedTsNs int64
- var err error
+ var readPersistedLogErr error
+ var readInMemoryLogErr error
for {
glog.V(4).Infof("read on disk %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
- processedTsNs, err = fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn)
- if err != nil {
- return fmt.Errorf("reading from persisted logs: %v", err)
+ processedTsNs, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn)
+ if readPersistedLogErr != nil {
+ return fmt.Errorf("reading from persisted logs: %v", readPersistedLogErr)
}
if processedTsNs != 0 {
lastReadTime = time.Unix(0, processedTsNs)
+ } else {
+ if readInMemoryLogErr == log_buffer.ResumeFromDiskError {
+ time.Sleep(1127 * time.Millisecond)
+ continue
+ }
}
glog.V(4).Infof("read in memory %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
- lastReadTime, err = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData("aggMeta:"+clientName, lastReadTime, func() bool {
+ lastReadTime, readInMemoryLogErr = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData("aggMeta:"+clientName, lastReadTime, func() bool {
fs.filer.MetaAggregator.ListenersLock.Lock()
fs.filer.MetaAggregator.ListenersCond.Wait()
fs.filer.MetaAggregator.ListenersLock.Unlock()
return true
}, eachLogEntryFn)
- if err != nil {
- if err == log_buffer.ResumeFromDiskError {
- time.Sleep(5127 * time.Millisecond)
+ if readInMemoryLogErr != nil {
+ if readInMemoryLogErr == log_buffer.ResumeFromDiskError {
continue
}
- glog.Errorf("processed to %v: %v", lastReadTime, err)
- if err != log_buffer.ResumeError {
+ glog.Errorf("processed to %v: %v", lastReadTime, readInMemoryLogErr)
+ if readInMemoryLogErr != log_buffer.ResumeError {
break
}
}
- time.Sleep(5127 * time.Millisecond)
+ time.Sleep(1127 * time.Millisecond)
}
- return err
+ return readInMemoryLogErr
}
@@ -87,41 +92,47 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn)
var processedTsNs int64
- var err error
+ var readPersistedLogErr error
+ var readInMemoryLogErr error
for {
// println("reading from persisted logs ...")
glog.V(0).Infof("read on disk %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
- processedTsNs, err = fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn)
- if err != nil {
- return fmt.Errorf("reading from persisted logs: %v", err)
+ processedTsNs, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn)
+ if readPersistedLogErr != nil {
+ return fmt.Errorf("reading from persisted logs: %v", readPersistedLogErr)
}
if processedTsNs != 0 {
lastReadTime = time.Unix(0, processedTsNs)
+ } else {
+ if readInMemoryLogErr == log_buffer.ResumeFromDiskError {
+ time.Sleep(1127 * time.Millisecond)
+ continue
+ }
}
glog.V(0).Infof("read in memory %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
- lastReadTime, err = fs.filer.LocalMetaLogBuffer.LoopProcessLogData("localMeta:"+clientName, lastReadTime, func() bool {
+ lastReadTime, readInMemoryLogErr = fs.filer.LocalMetaLogBuffer.LoopProcessLogData("localMeta:"+clientName, lastReadTime, func() bool {
fs.listenersLock.Lock()
fs.listenersCond.Wait()
fs.listenersLock.Unlock()
return true
}, eachLogEntryFn)
- if err != nil {
- if err == log_buffer.ResumeFromDiskError {
+ if readInMemoryLogErr != nil {
+ if readInMemoryLogErr == log_buffer.ResumeFromDiskError {
continue
}
- glog.Errorf("processed to %v: %v", lastReadTime, err)
- time.Sleep(3127 * time.Millisecond)
- if err != log_buffer.ResumeError {
+ glog.Errorf("processed to %v: %v", lastReadTime, readInMemoryLogErr)
+ time.Sleep(1127 * time.Millisecond)
+ if readInMemoryLogErr != log_buffer.ResumeError {
break
}
}
}
- return err
+ return readInMemoryLogErr
}
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index dfb43c706..d7afaa65a 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -30,11 +30,11 @@ import (
_ "github.com/chrislusf/seaweedfs/weed/filer/mongodb"
_ "github.com/chrislusf/seaweedfs/weed/filer/mysql"
_ "github.com/chrislusf/seaweedfs/weed/filer/mysql2"
- _ "github.com/chrislusf/seaweedfs/weed/filer/sqlite"
_ "github.com/chrislusf/seaweedfs/weed/filer/postgres"
_ "github.com/chrislusf/seaweedfs/weed/filer/postgres2"
_ "github.com/chrislusf/seaweedfs/weed/filer/redis"
_ "github.com/chrislusf/seaweedfs/weed/filer/redis2"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/sqlite"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/notification"
_ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs"
diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go
index fcb92d8ec..a42e0fc97 100644
--- a/weed/server/filer_server_handlers_write_autochunk.go
+++ b/weed/server/filer_server_handlers_write_autochunk.go
@@ -214,10 +214,6 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
Size: int64(entry.FileSize),
}
- if entry.Extended == nil {
- entry.Extended = make(map[string][]byte)
- }
-
entry.Extended = SaveAmzMetaData(r, entry.Extended, false)
for k, v := range r.Header {
diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go
index 3a4951cc5..4132ce690 100644
--- a/weed/server/master_grpc_server_volume.go
+++ b/weed/server/master_grpc_server_volume.go
@@ -143,7 +143,7 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
maxTimeout = time.Second * 10
startTime = time.Now()
)
-
+
for time.Now().Sub(startTime) < maxTimeout {
fid, count, dn, err := ms.Topo.PickForWrite(req.Count, option)
if err == nil {
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index 838803908..11dc95ded 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -97,7 +97,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste
ms := &MasterServer{
option: option,
preallocateSize: preallocateSize,
- vgCh: make(chan *topology.VolumeGrowRequest, 1 << 6),
+ vgCh: make(chan *topology.VolumeGrowRequest, 1<<6),
clientChans: make(map[string]chan *master_pb.VolumeLocation),
grpcDialOption: grpcDialOption,
MasterClient: wdclient.NewMasterClient(grpcDialOption, "master", option.Host, 0, "", peers),
diff --git a/weed/server/master_server_handlers.go b/weed/server/master_server_handlers.go
index 974b3308f..0609732c7 100644
--- a/weed/server/master_server_handlers.go
+++ b/weed/server/master_server_handlers.go
@@ -123,7 +123,7 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request)
Count: writableVolumeCount,
ErrCh: errCh,
}
- if err := <- errCh; err != nil {
+ if err := <-errCh; err != nil {
writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("cannot grow volume group! %v", err))
return
}
diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go
index f7359ea6b..0d765a253 100644
--- a/weed/server/volume_server.go
+++ b/weed/server/volume_server.go
@@ -28,7 +28,7 @@ type VolumeServer struct {
needleMapKind storage.NeedleMapKind
FixJpgOrientation bool
- ReadRedirect bool
+ ReadMode string
compactionBytePerSecond int64
metricsAddress string
metricsIntervalSec int
@@ -50,7 +50,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
dataCenter string, rack string,
whiteList []string,
fixJpgOrientation bool,
- readRedirect bool,
+ readMode string,
compactionMBPerSecond int,
fileSizeLimitMB int,
concurrentUploadLimit int64,
@@ -72,7 +72,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
rack: rack,
needleMapKind: needleMapKind,
FixJpgOrientation: fixJpgOrientation,
- ReadRedirect: readRedirect,
+ ReadMode: readMode,
grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.volume"),
compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024,
fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024,
diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go
index 3e977cfd4..c5afd9545 100644
--- a/weed/server/volume_server_handlers_read.go
+++ b/weed/server/volume_server_handlers_read.go
@@ -58,14 +58,53 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
hasVolume := vs.store.HasVolume(volumeId)
_, hasEcVolume := vs.store.FindEcVolume(volumeId)
if !hasVolume && !hasEcVolume {
- if !vs.ReadRedirect {
- glog.V(2).Infoln("volume is not local:", err, r.URL.Path)
+ if vs.ReadMode == "local" {
+ glog.V(0).Infoln("volume is not local:", err, r.URL.Path)
w.WriteHeader(http.StatusNotFound)
return
}
lookupResult, err := operation.Lookup(vs.GetMaster, volumeId.String())
glog.V(2).Infoln("volume", volumeId, "found on", lookupResult, "error", err)
- if err == nil && len(lookupResult.Locations) > 0 {
+ if err != nil || len(lookupResult.Locations) <= 0 {
+ glog.V(0).Infoln("lookup error:", err, r.URL.Path)
+ w.WriteHeader(http.StatusNotFound)
+ return
+ }
+ if vs.ReadMode == "proxy" {
+ // proxy client request to target server
+ u, _ := url.Parse(util.NormalizeUrl(lookupResult.Locations[0].Url))
+ r.URL.Host = u.Host
+ r.URL.Scheme = u.Scheme
+ request, err := http.NewRequest("GET", r.URL.String(), nil)
+ if err != nil {
+ glog.V(0).Infof("failed to instance http request of url %s: %v", r.URL.String(), err)
+ w.WriteHeader(http.StatusInternalServerError)
+ return
+ }
+ for k, vv := range r.Header {
+ for _, v := range vv {
+ request.Header.Add(k, v)
+ }
+ }
+
+ response, err := client.Do(request)
+ if err != nil {
+ glog.V(0).Infof("request remote url %s: %v", r.URL.String(), err)
+ w.WriteHeader(http.StatusInternalServerError)
+ return
+ }
+ defer util.CloseResponse(response)
+ // proxy target response to client
+ for k, vv := range response.Header {
+ for _, v := range vv {
+ w.Header().Add(k, v)
+ }
+ }
+ w.WriteHeader(response.StatusCode)
+ io.Copy(w, response.Body)
+ return
+ } else {
+ // redirect
u, _ := url.Parse(util.NormalizeUrl(lookupResult.Locations[0].PublicUrl))
u.Path = fmt.Sprintf("%s/%s,%s", u.Path, vid, fid)
arg := url.Values{}
@@ -74,12 +113,8 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
}
u.RawQuery = arg.Encode()
http.Redirect(w, r, u.String(), http.StatusMovedPermanently)
-
- } else {
- glog.V(2).Infoln("lookup error:", err, r.URL.Path)
- w.WriteHeader(http.StatusNotFound)
+ return
}
- return
}
cookie := n.Cookie
diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go
index 400e96fe7..64389fdb5 100644
--- a/weed/shell/command_volume_fsck.go
+++ b/weed/shell/command_volume_fsck.go
@@ -62,7 +62,8 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
fsckCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
verbose := fsckCommand.Bool("v", false, "verbose mode")
- findMissingChunksInFiler := fsckCommand.Bool("findMissingChunksInFiler", false, "see help volume.fsck")
+ findMissingChunksInFiler := fsckCommand.Bool("findMissingChunksInFiler", false, "see \"help volume.fsck\"")
+ findMissingChunksInFilerPath := fsckCommand.String("findMissingChunksInFilerPath", "/", "used together with findMissingChunksInFiler")
applyPurging := fsckCommand.Bool("reallyDeleteFromVolume", false, "<expert only> delete data not referenced by the filer")
if err = fsckCommand.Parse(args); err != nil {
return nil
@@ -96,7 +97,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
if *findMissingChunksInFiler {
// collect all filer file ids and paths
- if err = c.collectFilerFileIdAndPaths(volumeIdToVInfo, tempFolder, writer, *verbose, applyPurging); err != nil {
+ if err = c.collectFilerFileIdAndPaths(volumeIdToVInfo, tempFolder, writer, *findMissingChunksInFilerPath, *verbose, applyPurging); err != nil {
return fmt.Errorf("collectFilerFileIdAndPaths: %v", err)
}
// for each volume, check filer file ids
@@ -117,7 +118,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
return nil
}
-func (c *commandVolumeFsck) collectFilerFileIdAndPaths(volumeIdToServer map[uint32]VInfo, tempFolder string, writer io.Writer, verbose bool, applyPurging *bool) error {
+func (c *commandVolumeFsck) collectFilerFileIdAndPaths(volumeIdToServer map[uint32]VInfo, tempFolder string, writer io.Writer, filerPath string, verbose bool, applyPurging *bool) error {
if verbose {
fmt.Fprintf(writer, "checking each file from filer ...\n")
@@ -143,22 +144,25 @@ func (c *commandVolumeFsck) collectFilerFileIdAndPaths(volumeIdToServer map[uint
cookie uint32
path util.FullPath
}
- return doTraverseBfsAndSaving(c.env, nil, "/", false, func(outputChan chan interface{}) {
- buffer := make([]byte, 8)
+ return doTraverseBfsAndSaving(c.env, nil, filerPath, false, func(outputChan chan interface{}) {
+ buffer := make([]byte, 16)
for item := range outputChan {
i := item.(*Item)
if f, ok := files[i.vid]; ok {
util.Uint64toBytes(buffer, i.fileKey)
- f.Write(buffer)
- util.Uint32toBytes(buffer, i.cookie)
- util.Uint32toBytes(buffer[4:], uint32(len(i.path)))
+ util.Uint32toBytes(buffer[8:], i.cookie)
+ util.Uint32toBytes(buffer[12:], uint32(len(i.path)))
f.Write(buffer)
f.Write([]byte(i.path))
+ // fmt.Fprintf(writer, "%d,%x%08x %d %s\n", i.vid, i.fileKey, i.cookie, len(i.path), i.path)
} else {
fmt.Fprintf(writer, "%d,%x%08x %s volume not found\n", i.vid, i.fileKey, i.cookie, i.path)
}
}
}, func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) {
+ if verbose && entry.Entry.IsDirectory {
+ fmt.Fprintf(writer, "checking directory %s\n", util.NewFullPath(entry.Dir, entry.Entry.Name))
+ }
dChunks, mChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.Chunks)
if resolveErr != nil {
return nil
@@ -317,6 +321,10 @@ func (c *commandVolumeFsck) collectFilerFileIds(tempFolder string, volumeIdToSer
func (c *commandVolumeFsck) oneVolumeFileIdsCheckOneVolume(tempFolder string, volumeId uint32, writer io.Writer, verbose bool) (err error) {
+ if verbose {
+ fmt.Fprintf(writer, "find missing file chuns in volume %d ...\n", volumeId)
+ }
+
db := needle_map.NewMemDb()
defer db.Close()
@@ -342,7 +350,7 @@ func (c *commandVolumeFsck) oneVolumeFileIdsCheckOneVolume(tempFolder string, vo
item := &Item{}
var readSize int
for {
- readSize, err = br.Read(buffer)
+ readSize, err = io.ReadFull(br, buffer)
if err != nil || readSize != 16 {
if err == io.EOF {
return nil
@@ -355,11 +363,17 @@ func (c *commandVolumeFsck) oneVolumeFileIdsCheckOneVolume(tempFolder string, vo
item.cookie = util.BytesToUint32(buffer[8:12])
pathSize := util.BytesToUint32(buffer[12:16])
pathBytes := make([]byte, int(pathSize))
- _, err = br.Read(pathBytes)
+ n, err := io.ReadFull(br, pathBytes)
+ if err != nil {
+ fmt.Fprintf(writer, "%d,%x%08x in unexpected error: %v\n", volumeId, item.fileKey, item.cookie, err)
+ }
+ if n != int(pathSize) {
+ fmt.Fprintf(writer, "%d,%x%08x %d unexpected file name size %d\n", volumeId, item.fileKey, item.cookie, pathSize, n)
+ }
item.path = util.FullPath(string(pathBytes))
if _, found := db.Get(types.NeedleId(item.fileKey)); !found {
- fmt.Fprintf(writer, "%d,%x%08x in %s not found\n", volumeId, item.fileKey, item.cookie, item.path)
+ fmt.Fprintf(writer, "%d,%x%08x in %s %d not found\n", volumeId, item.fileKey, item.cookie, item.path, pathSize)
}
}
diff --git a/weed/storage/needle/needle_read_write.go b/weed/storage/needle/needle_read_write.go
index d208404a8..84e00374a 100644
--- a/weed/storage/needle/needle_read_write.go
+++ b/weed/storage/needle/needle_read_write.go
@@ -52,7 +52,7 @@ func (n *Needle) prepareWriteBuffer(version Version, writeBytes *bytes.Buffer) (
writeBytes.Write(n.Data)
padding := PaddingLength(n.Size, version)
util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value())
- writeBytes.Write(header[0:NeedleChecksumSize+padding])
+ writeBytes.Write(header[0 : NeedleChecksumSize+padding])
return size, actualSize, nil
case Version2, Version3:
header := make([]byte, NeedleHeaderSize+TimestampSize) // adding timestamp to reuse it and avoid extra allocation
@@ -104,7 +104,7 @@ func (n *Needle) prepareWriteBuffer(version Version, writeBytes *bytes.Buffer) (
}
if n.HasLastModifiedDate() {
util.Uint64toBytes(header[0:8], n.LastModified)
- writeBytes.Write(header[8-LastModifiedBytesLength:8])
+ writeBytes.Write(header[8-LastModifiedBytesLength : 8])
}
if n.HasTtl() && n.Ttl != nil {
n.Ttl.ToBytes(header[0:TtlBytesLength])
@@ -119,11 +119,11 @@ func (n *Needle) prepareWriteBuffer(version Version, writeBytes *bytes.Buffer) (
padding := PaddingLength(n.Size, version)
util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value())
if version == Version2 {
- writeBytes.Write(header[0:NeedleChecksumSize+padding])
+ writeBytes.Write(header[0 : NeedleChecksumSize+padding])
} else {
// version3
util.Uint64toBytes(header[NeedleChecksumSize:NeedleChecksumSize+TimestampSize], n.AppendAtNs)
- writeBytes.Write(header[0:NeedleChecksumSize+TimestampSize+padding])
+ writeBytes.Write(header[0 : NeedleChecksumSize+TimestampSize+padding])
}
return Size(n.DataSize), GetActualSize(n.Size, version), nil
diff --git a/weed/storage/needle_map/compact_map.go b/weed/storage/needle_map/compact_map.go
index 2b1a471bc..3d2047f99 100644
--- a/weed/storage/needle_map/compact_map.go
+++ b/weed/storage/needle_map/compact_map.go
@@ -64,11 +64,31 @@ func (cs *CompactSection) Set(key NeedleId, offset Offset, size Size) (oldOffset
needOverflow := cs.counter >= batch
needOverflow = needOverflow || cs.counter > 0 && cs.values[cs.counter-1].Key > skey
if needOverflow {
- //println("start", cs.start, "counter", cs.counter, "key", key)
- if oldValueExtra, oldValue, found := cs.findOverflowEntry(skey); found {
- oldOffset.OffsetHigher, oldOffset.OffsetLower, oldSize = oldValueExtra.OffsetHigher, oldValue.OffsetLower, oldValue.Size
+ lookBackIndex := cs.counter - 128
+ if lookBackIndex < 0 {
+ lookBackIndex = 0
+ }
+ if cs.counter < batch && cs.values[lookBackIndex].Key < skey {
+ // still has capacity and only partially out of order
+ p := &cs.values[cs.counter]
+ p.Key, cs.valuesExtra[cs.counter].OffsetHigher, p.OffsetLower, p.Size = skey, offset.OffsetHigher, offset.OffsetLower, size
+ //println("added index", cs.counter, "key", key, cs.values[cs.counter].Key)
+ for x := cs.counter - 1; x >= lookBackIndex; x-- {
+ if cs.values[x].Key > cs.values[x+1].Key {
+ cs.values[x], cs.values[x+1] = cs.values[x+1], cs.values[x]
+ cs.valuesExtra[x], cs.valuesExtra[x+1] = cs.valuesExtra[x+1], cs.valuesExtra[x]
+ } else {
+ break
+ }
+ }
+ cs.counter++
+ } else {
+ //println("start", cs.start, "counter", cs.counter, "key", key)
+ if oldValueExtra, oldValue, found := cs.findOverflowEntry(skey); found {
+ oldOffset.OffsetHigher, oldOffset.OffsetLower, oldSize = oldValueExtra.OffsetHigher, oldValue.OffsetLower, oldValue.Size
+ }
+ cs.setOverflowEntry(skey, offset, size)
}
- cs.setOverflowEntry(skey, offset, size)
} else {
p := &cs.values[cs.counter]
p.Key, cs.valuesExtra[cs.counter].OffsetHigher, p.OffsetLower, p.Size = skey, offset.OffsetHigher, offset.OffsetLower, size
@@ -96,6 +116,7 @@ func (cs *CompactSection) setOverflowEntry(skey SectionalNeedleId, offset Offset
cs.overflowExtra[i] = cs.overflowExtra[i-1]
}
cs.overflow[insertCandidate] = needleValue
+ cs.overflowExtra[insertCandidate] = needleValueExtra
}
}
diff --git a/weed/storage/needle_map/compact_map_cases_test.go b/weed/storage/needle_map/compact_map_cases_test.go
new file mode 100644
index 000000000..305925699
--- /dev/null
+++ b/weed/storage/needle_map/compact_map_cases_test.go
@@ -0,0 +1,33 @@
+// +build 5BytesOffset
+
+package needle_map
+
+import (
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
+ "github.com/stretchr/testify/assert"
+ "log"
+ "os"
+ "testing"
+)
+
+func Test5bytesIndexLoading(t *testing.T) {
+
+ indexFile, ie := os.OpenFile("../../../test/data/187.idx", os.O_RDWR|os.O_RDONLY, 0644)
+ if ie != nil {
+ log.Fatalln(ie)
+ }
+ defer indexFile.Close()
+ m, rowCount := loadNewNeedleMap(indexFile)
+
+ println("total entries:", rowCount)
+
+ key := types.NeedleId(0x671b905) // 108116229
+
+ needle, found := m.Get(types.NeedleId(0x671b905))
+
+ fmt.Printf("%v key:%v offset:%v size:%v\n", found, key, needle.Offset, needle.Size)
+
+ assert.Equal(t, int64(12884911892)*8, needle.Offset.ToActualOffset(), "offset")
+
+}
diff --git a/weed/topology/node.go b/weed/topology/node.go
index 4556b4165..4772cb411 100644
--- a/weed/topology/node.go
+++ b/weed/topology/node.go
@@ -243,7 +243,7 @@ func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSi
if v.Size >= volumeSizeLimit {
//fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit)
n.GetTopology().chanFullVolumes <- v
- }else if float64(v.Size) > float64(volumeSizeLimit) * growThreshold {
+ } else if float64(v.Size) > float64(volumeSizeLimit)*growThreshold {
n.GetTopology().chanCrowdedVolumes <- v
}
}
diff --git a/weed/util/constants.go b/weed/util/constants.go
index ac80b1737..4d7082af0 100644
--- a/weed/util/constants.go
+++ b/weed/util/constants.go
@@ -5,7 +5,7 @@ import (
)
var (
- VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 55)
+ VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 56)
COMMIT = ""
)
diff --git a/weed/util/grace/pprof.go b/weed/util/grace/pprof.go
index 0406b762c..28bf6d553 100644
--- a/weed/util/grace/pprof.go
+++ b/weed/util/grace/pprof.go
@@ -21,21 +21,21 @@ func SetupProfiling(cpuProfile, memProfile string) {
pprof.StopCPUProfile()
// write block pprof
- blockF, err := os.Create(cpuProfile+".block")
+ blockF, err := os.Create(cpuProfile + ".block")
if err != nil {
return
}
p := pprof.Lookup("block")
- p.WriteTo(blockF,0)
+ p.WriteTo(blockF, 0)
blockF.Close()
// write mutex pprof
- mutexF, err := os.Create(cpuProfile+".mutex")
+ mutexF, err := os.Create(cpuProfile + ".mutex")
if err != nil {
return
}
p = pprof.Lookup("mutex")
- p.WriteTo(mutexF,0)
+ p.WriteTo(mutexF, 0)
mutexF.Close()
})
diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go
index 12840a88a..4742c2b7c 100644
--- a/weed/util/log_buffer/log_buffer.go
+++ b/weed/util/log_buffer/log_buffer.go
@@ -170,6 +170,8 @@ func (m *LogBuffer) copyToFlush() *dataToFlush {
m.lastFlushTime = m.stopTime
}
m.buf = m.prevBuffers.SealBuffer(m.startTime, m.stopTime, m.buf, m.pos)
+ m.startTime = time.Unix(0,0)
+ m.stopTime = time.Unix(0,0)
m.pos = 0
m.idx = m.idx[:0]
return d