aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
authorhilimd <68371223+hilimd@users.noreply.github.com>2020-11-05 12:02:47 +0800
committerGitHub <noreply@github.com>2020-11-05 12:02:47 +0800
commit546f1bcb903dd26ba447cdbedb972736fdb31b42 (patch)
tree09b8119faa7162acaa7240de5af6fd0bebe96c2f /weed
parent843865f2ca534bb6286b7a3d79c436384d875608 (diff)
parent75887ba2a20b9f3f7ff9c4b8998cf3af0c0f48c2 (diff)
downloadseaweedfs-546f1bcb903dd26ba447cdbedb972736fdb31b42.tar.xz
seaweedfs-546f1bcb903dd26ba447cdbedb972736fdb31b42.zip
Merge pull request #34 from chrislusf/master
sync
Diffstat (limited to 'weed')
-rw-r--r--weed/command/filer_copy.go4
-rw-r--r--weed/command/mount.go2
-rw-r--r--weed/command/mount_std.go7
-rw-r--r--weed/command/server.go14
-rw-r--r--weed/command/volume.go2
-rw-r--r--weed/filer/filechunk_manifest.go2
-rw-r--r--weed/filer/reader_at.go60
-rw-r--r--weed/filesys/dir.go29
-rw-r--r--weed/filesys/dir_link.go13
-rw-r--r--weed/filesys/dirty_page.go28
-rw-r--r--weed/filesys/file.go125
-rw-r--r--weed/filesys/filehandle.go15
-rw-r--r--weed/filesys/meta_cache/meta_cache_init.go18
-rw-r--r--weed/filesys/wfs.go10
-rw-r--r--weed/filesys/wfs_filer_client.go11
-rw-r--r--weed/ftpd/ftp_server.go81
-rw-r--r--weed/operation/upload_content.go7
-rw-r--r--weed/pb/grpc_client_server.go1
-rw-r--r--weed/s3api/filer_util_tags.go3
-rw-r--r--weed/s3api/http/header.go30
-rw-r--r--weed/s3api/s3api_object_handlers.go2
-rw-r--r--weed/s3api/s3api_objects_list_handlers.go9
-rw-r--r--weed/server/filer_server_handlers.go25
-rw-r--r--weed/server/filer_server_handlers_read.go19
-rw-r--r--weed/server/filer_server_handlers_write_autochunk.go38
-rw-r--r--weed/server/filer_ui/templates.go2
-rw-r--r--weed/server/volume_grpc_client_to_master.go18
-rw-r--r--weed/server/volume_grpc_copy.go10
-rw-r--r--weed/server/volume_server_handlers.go19
-rw-r--r--weed/shell/command_volume_mark.go55
-rw-r--r--weed/shell/command_volume_move.go15
-rw-r--r--weed/storage/disk_location.go24
-rw-r--r--weed/storage/needle/needle_read_write.go7
-rw-r--r--weed/storage/store.go40
-rw-r--r--weed/storage/volume_checking.go84
-rw-r--r--weed/storage/volume_loading.go2
-rw-r--r--weed/storage/volume_read_write.go23
-rw-r--r--weed/util/constants.go2
-rw-r--r--weed/util/retry.go31
-rw-r--r--weed/wdclient/masterclient.go13
40 files changed, 716 insertions, 184 deletions
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go
index 2295faa8a..9afa65d23 100644
--- a/weed/command/filer_copy.go
+++ b/weed/command/filer_copy.go
@@ -219,7 +219,7 @@ func genFileCopyTask(fileOrDir string, destPath string, fileCopyTaskChan chan Fi
fileCopyTaskChan <- FileCopyTask{
sourceLocation: fileOrDir,
- destinationUrlPath: destPath+fi.Name(),
+ destinationUrlPath: destPath,
fileSize: fi.Size(),
fileMode: fi.Mode(),
uid: uid,
@@ -405,7 +405,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
Replication: *worker.options.replication,
Collection: *worker.options.collection,
TtlSec: worker.options.ttlSec,
- Path: task.destinationUrlPath+fileName,
+ Path: task.destinationUrlPath + fileName,
}
assignResult, assignError = client.AssignVolume(context.Background(), request)
diff --git a/weed/command/mount.go b/weed/command/mount.go
index 7fdb21254..f325cb0a5 100644
--- a/weed/command/mount.go
+++ b/weed/command/mount.go
@@ -14,6 +14,7 @@ type MountOptions struct {
replication *string
ttlSec *int
chunkSizeLimitMB *int
+ concurrentWriters *int
cacheDir *string
cacheSizeMB *int64
dataCenter *string
@@ -42,6 +43,7 @@ func init() {
mountOptions.replication = cmdMount.Flag.String("replication", "", "replication(e.g. 000, 001) to create to files. If empty, let filer decide.")
mountOptions.ttlSec = cmdMount.Flag.Int("ttl", 0, "file ttl in seconds")
mountOptions.chunkSizeLimitMB = cmdMount.Flag.Int("chunkSizeLimitMB", 2, "local write buffer size, also chunk large files")
+ mountOptions.concurrentWriters = cmdMount.Flag.Int("concurrentWriters", 0, "limit concurrent goroutine writers if not 0")
mountOptions.cacheDir = cmdMount.Flag.String("cacheDir", os.TempDir(), "local cache directory for file chunks and meta data")
mountOptions.cacheSizeMB = cmdMount.Flag.Int64("cacheCapacityMB", 1000, "local file chunk cache capacity in MB (0 will disable cache)")
mountOptions.dataCenter = cmdMount.Flag.String("dataCenter", "", "prefer to write to the data center")
diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go
index 20d08314c..83cb352ff 100644
--- a/weed/command/mount_std.go
+++ b/weed/command/mount_std.go
@@ -5,8 +5,6 @@ package command
import (
"context"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
"os"
"os/user"
"path"
@@ -15,6 +13,8 @@ import (
"strings"
"time"
+ "github.com/chrislusf/seaweedfs/weed/filesys/meta_cache"
+
"github.com/seaweedfs/fuse"
"github.com/seaweedfs/fuse/fs"
@@ -33,7 +33,7 @@ func runMount(cmd *Command, args []string) bool {
if *mountReadRetryTime < time.Second {
*mountReadRetryTime = time.Second
}
- filer.ReadWaitTime = *mountReadRetryTime
+ util.RetryWaitTime = *mountReadRetryTime
umask, umaskErr := strconv.ParseUint(*mountOptions.umaskString, 8, 64)
if umaskErr != nil {
@@ -175,6 +175,7 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
Replication: *option.replication,
TtlSec: int32(*option.ttlSec),
ChunkSizeLimit: int64(chunkSizeLimitMB) * 1024 * 1024,
+ ConcurrentWriters: *option.concurrentWriters,
CacheDir: *option.cacheDir,
CacheSizeMB: *option.cacheSizeMB,
DataCenter: *option.dataCenter,
diff --git a/weed/command/server.go b/weed/command/server.go
index 6a78fb3f4..4a9c2411a 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -2,13 +2,14 @@ package command
import (
"fmt"
- stats_collect "github.com/chrislusf/seaweedfs/weed/stats"
"os"
"runtime"
"runtime/pprof"
"strings"
"time"
+ stats_collect "github.com/chrislusf/seaweedfs/weed/stats"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -60,9 +61,10 @@ var (
serverMetricsHttpPort = cmdServer.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
// pulseSeconds = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
- isStartingFiler = cmdServer.Flag.Bool("filer", false, "whether to start filer")
- isStartingS3 = cmdServer.Flag.Bool("s3", false, "whether to start S3 gateway")
- isStartingMsgBroker = cmdServer.Flag.Bool("msgBroker", false, "whether to start message broker")
+ isStartingVolumeServer = cmdServer.Flag.Bool("volume", true, "whether to start volume server")
+ isStartingFiler = cmdServer.Flag.Bool("filer", false, "whether to start filer")
+ isStartingS3 = cmdServer.Flag.Bool("s3", false, "whether to start S3 gateway")
+ isStartingMsgBroker = cmdServer.Flag.Bool("msgBroker", false, "whether to start message broker")
serverWhiteList []string
@@ -99,7 +101,7 @@ func init() {
serverOptions.v.fixJpgOrientation = cmdServer.Flag.Bool("volume.images.fix.orientation", false, "Adjust jpg orientation when uploading.")
serverOptions.v.readRedirect = cmdServer.Flag.Bool("volume.read.redirect", true, "Redirect moved or non-local volumes.")
serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second")
- serverOptions.v.fileSizeLimitMB = cmdServer.Flag.Int("volume.fileSizeLimitMB", 1024, "limit file size to avoid out of memory")
+ serverOptions.v.fileSizeLimitMB = cmdServer.Flag.Int("volume.fileSizeLimitMB", 256, "limit file size to avoid out of memory")
serverOptions.v.publicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address")
serverOptions.v.preStopSeconds = cmdServer.Flag.Int("volume.preStopSeconds", 10, "number of seconds between stop send heartbeats and stop volume server")
serverOptions.v.pprof = cmdServer.Flag.Bool("volume.pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile")
@@ -214,7 +216,7 @@ func runServer(cmd *Command, args []string) bool {
}
// start volume server
- {
+ if *isStartingVolumeServer {
go serverOptions.v.startVolumeServer(*volumeDataFolders, *volumeMaxDataVolumeCounts, *serverWhiteListOption, *volumeMinFreeSpacePercent)
}
diff --git a/weed/command/volume.go b/weed/command/volume.go
index d73c24ed1..ce5992665 100644
--- a/weed/command/volume.go
+++ b/weed/command/volume.go
@@ -80,7 +80,7 @@ func init() {
v.cpuProfile = cmdVolume.Flag.String("cpuprofile", "", "cpu profile output file")
v.memProfile = cmdVolume.Flag.String("memprofile", "", "memory profile output file")
v.compactionMBPerSecond = cmdVolume.Flag.Int("compactionMBps", 0, "limit background compaction or copying speed in mega bytes per second")
- v.fileSizeLimitMB = cmdVolume.Flag.Int("fileSizeLimitMB", 1024, "limit file size to avoid out of memory")
+ v.fileSizeLimitMB = cmdVolume.Flag.Int("fileSizeLimitMB", 256, "limit file size to avoid out of memory")
v.pprof = cmdVolume.Flag.Bool("pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile")
v.metricsHttpPort = cmdVolume.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
}
diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go
index 0d01a4a36..f5ab36d37 100644
--- a/weed/filer/filechunk_manifest.go
+++ b/weed/filer/filechunk_manifest.go
@@ -99,7 +99,7 @@ func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool
var buffer bytes.Buffer
var shouldRetry bool
- for waitTime := time.Second; waitTime < ReadWaitTime; waitTime += waitTime / 2 {
+ for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 {
for _, urlString := range urlStrings {
shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
buffer.Write(data)
diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go
index 04c64d449..ccc746b90 100644
--- a/weed/filer/reader_at.go
+++ b/weed/filer/reader_at.go
@@ -3,19 +3,16 @@ package filer
import (
"context"
"fmt"
+ "io"
+ "math/rand"
+ "sync"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
"github.com/chrislusf/seaweedfs/weed/wdclient"
"github.com/golang/groupcache/singleflight"
- "io"
- "math/rand"
- "sync"
- "time"
-)
-
-var (
- ReadWaitTime = 6 * time.Second
)
type ChunkReadAt struct {
@@ -45,34 +42,29 @@ func LookupFn(filerClient filer_pb.FilerClient) LookupFileIdFunctionType {
locations, found := vidCache[vid]
vicCacheLock.RUnlock()
- waitTime := time.Second
- for !found && waitTime < ReadWaitTime {
- // println("looking up volume", vid)
- err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
- resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
- VolumeIds: []string{vid},
+ if !found {
+ util.Retry("lookup volume "+vid, func() error {
+ err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
+ VolumeIds: []string{vid},
+ })
+ if err != nil {
+ return err
+ }
+
+ locations = resp.LocationsMap[vid]
+ if locations == nil || len(locations.Locations) == 0 {
+ glog.V(0).Infof("failed to locate %s", fileId)
+ return fmt.Errorf("failed to locate %s", fileId)
+ }
+ vicCacheLock.Lock()
+ vidCache[vid] = locations
+ vicCacheLock.Unlock()
+
+ return nil
})
- if err != nil {
- return err
- }
-
- locations = resp.LocationsMap[vid]
- if locations == nil || len(locations.Locations) == 0 {
- glog.V(0).Infof("failed to locate %s", fileId)
- return fmt.Errorf("failed to locate %s", fileId)
- }
- vicCacheLock.Lock()
- vidCache[vid] = locations
- vicCacheLock.Unlock()
-
- return nil
+ return err
})
- if err == nil {
- break
- }
- glog.V(1).Infof("wait for volume %s", vid)
- time.Sleep(waitTime)
- waitTime += waitTime / 2
}
if err != nil {
diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go
index ae2ae3418..a8481a435 100644
--- a/weed/filesys/dir.go
+++ b/weed/filesys/dir.go
@@ -27,6 +27,7 @@ type Dir struct {
var _ = fs.Node(&Dir{})
var _ = fs.NodeCreater(&Dir{})
+var _ = fs.NodeMknoder(&Dir{})
var _ = fs.NodeMkdirer(&Dir{})
var _ = fs.NodeFsyncer(&Dir{})
var _ = fs.NodeRequestLookuper(&Dir{})
@@ -179,6 +180,20 @@ func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest,
}
+func (dir *Dir) Mknod(ctx context.Context, req *fuse.MknodRequest) (fs.Node, error) {
+ if req.Mode&os.ModeNamedPipe != 0 {
+ glog.V(1).Infof("mknod named pipe %s", req.String())
+ return nil, fuse.ENOSYS
+ }
+ if req.Mode&req.Mode&os.ModeSocket != 0 {
+ glog.V(1).Infof("mknod socket %s", req.String())
+ return nil, fuse.ENOSYS
+ }
+ // not going to support mknod for normal files either
+ glog.V(1).Infof("mknod %s", req.String())
+ return nil, fuse.ENOSYS
+}
+
func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, error) {
glog.V(4).Infof("mkdir %s: %s", dir.FullPath(), req.Name)
@@ -347,8 +362,22 @@ func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error {
// then, delete meta cache and fsNode cache
dir.wfs.metaCache.DeleteEntry(context.Background(), filePath)
+
+ // clear entry inside the file
+ fsNode := dir.wfs.fsNodeCache.GetFsNode(filePath)
+ if fsNode != nil {
+ if file, ok := fsNode.(*File); ok {
+ file.clearEntry()
+ }
+ }
dir.wfs.fsNodeCache.DeleteFsNode(filePath)
+ // remove current file handle if any
+ dir.wfs.handlesLock.Lock()
+ defer dir.wfs.handlesLock.Unlock()
+ inodeId := util.NewFullPath(dir.FullPath(), req.Name).AsInode()
+ delete(dir.wfs.handles, inodeId)
+
// delete the chunks last
if isDeleteData {
dir.wfs.deleteFileChunks(entry.Chunks)
diff --git a/weed/filesys/dir_link.go b/weed/filesys/dir_link.go
index f6bc41b56..ba3280f03 100644
--- a/weed/filesys/dir_link.go
+++ b/weed/filesys/dir_link.go
@@ -31,7 +31,7 @@ func (dir *Dir) Link(ctx context.Context, req *fuse.LinkRequest, old fs.Node) (f
glog.V(4).Infof("Link: %v/%v -> %v/%v", oldFile.dir.FullPath(), oldFile.Name, dir.FullPath(), req.NewName)
- if err := oldFile.maybeLoadEntry(ctx); err != nil {
+ if _, err := oldFile.maybeLoadEntry(ctx); err != nil {
return nil, err
}
@@ -86,7 +86,7 @@ func (dir *Dir) Link(ctx context.Context, req *fuse.LinkRequest, old fs.Node) (f
// create new file node
newNode := dir.newFile(req.NewName, request.Entry)
newFile := newNode.(*File)
- if err := newFile.maybeLoadEntry(ctx); err != nil {
+ if _, err := newFile.maybeLoadEntry(ctx); err != nil {
return nil, err
}
@@ -138,16 +138,17 @@ func (dir *Dir) Symlink(ctx context.Context, req *fuse.SymlinkRequest) (fs.Node,
func (file *File) Readlink(ctx context.Context, req *fuse.ReadlinkRequest) (string, error) {
- if err := file.maybeLoadEntry(ctx); err != nil {
+ entry, err := file.maybeLoadEntry(ctx)
+ if err != nil {
return "", err
}
- if os.FileMode(file.entry.Attributes.FileMode)&os.ModeSymlink == 0 {
+ if os.FileMode(entry.Attributes.FileMode)&os.ModeSymlink == 0 {
return "", fuse.Errno(syscall.EINVAL)
}
- glog.V(4).Infof("Readlink: %v/%v => %v", file.dir.FullPath(), file.Name, file.entry.Attributes.SymlinkTarget)
+ glog.V(4).Infof("Readlink: %v/%v => %v", file.dir.FullPath(), file.Name, entry.Attributes.SymlinkTarget)
- return file.entry.Attributes.SymlinkTarget, nil
+ return entry.Attributes.SymlinkTarget, nil
}
diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go
index dd0c48796..11089186f 100644
--- a/weed/filesys/dirty_page.go
+++ b/weed/filesys/dirty_page.go
@@ -9,22 +9,16 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-var (
- concurrentWriterLimit = runtime.NumCPU()
- concurrentWriters = util.NewLimitedConcurrentExecutor(4 * concurrentWriterLimit)
)
type ContinuousDirtyPages struct {
intervals *ContinuousIntervals
f *File
writeWaitGroup sync.WaitGroup
+ chunkAddLock sync.Mutex
chunkSaveErrChan chan error
chunkSaveErrChanClosed bool
lastErr error
- lock sync.Mutex
collection string
replication string
}
@@ -33,7 +27,7 @@ func newDirtyPages(file *File) *ContinuousDirtyPages {
dirtyPages := &ContinuousDirtyPages{
intervals: &ContinuousIntervals{},
f: file,
- chunkSaveErrChan: make(chan error, concurrentWriterLimit),
+ chunkSaveErrChan: make(chan error, runtime.NumCPU()),
}
go func() {
for t := range dirtyPages.chunkSaveErrChan {
@@ -100,14 +94,18 @@ func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (hasSavedD
func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, size int64) {
+ errChanSize := pages.f.wfs.option.ConcurrentWriters
+ if errChanSize == 0 {
+ errChanSize = runtime.NumCPU()
+ }
if pages.chunkSaveErrChanClosed {
- pages.chunkSaveErrChan = make(chan error, concurrentWriterLimit)
+ pages.chunkSaveErrChan = make(chan error, errChanSize)
pages.chunkSaveErrChanClosed = false
}
mtime := time.Now().UnixNano()
pages.writeWaitGroup.Add(1)
- go func() {
+ writer := func() {
defer pages.writeWaitGroup.Done()
reader = io.LimitReader(reader, size)
@@ -119,9 +117,17 @@ func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64,
}
chunk.Mtime = mtime
pages.collection, pages.replication = collection, replication
+ pages.chunkAddLock.Lock()
+ defer pages.chunkAddLock.Unlock()
pages.f.addChunks([]*filer_pb.FileChunk{chunk})
glog.V(3).Infof("%s saveToStorage [%d,%d)", pages.f.fullpath(), offset, offset+size)
- }()
+ }
+
+ if pages.f.wfs.concurrentWriters != nil {
+ pages.f.wfs.concurrentWriters.Execute(writer)
+ } else {
+ go writer()
+ }
}
func max(x, y int64) int64 {
diff --git a/weed/filesys/file.go b/weed/filesys/file.go
index 7aa1016d7..9e1342370 100644
--- a/weed/filesys/file.go
+++ b/weed/filesys/file.go
@@ -43,32 +43,33 @@ func (file *File) fullpath() util.FullPath {
return util.NewFullPath(file.dir.FullPath(), file.Name)
}
-func (file *File) Attr(ctx context.Context, attr *fuse.Attr) error {
+func (file *File) Attr(ctx context.Context, attr *fuse.Attr) (err error) {
glog.V(4).Infof("file Attr %s, open:%v, existing attr: %+v", file.fullpath(), file.isOpen, attr)
- if file.isOpen <= 0 {
- if err := file.maybeLoadEntry(ctx); err != nil {
+ entry := file.entry
+ if file.isOpen <= 0 || entry == nil {
+ if entry, err = file.maybeLoadEntry(ctx); err != nil {
return err
}
}
attr.Inode = file.fullpath().AsInode()
attr.Valid = time.Second
- attr.Mode = os.FileMode(file.entry.Attributes.FileMode)
- attr.Size = filer.FileSize(file.entry)
+ attr.Mode = os.FileMode(entry.Attributes.FileMode)
+ attr.Size = filer.FileSize(entry)
if file.isOpen > 0 {
- attr.Size = file.entry.Attributes.FileSize
+ attr.Size = entry.Attributes.FileSize
glog.V(4).Infof("file Attr %s, open:%v, size: %d", file.fullpath(), file.isOpen, attr.Size)
}
- attr.Crtime = time.Unix(file.entry.Attributes.Crtime, 0)
- attr.Mtime = time.Unix(file.entry.Attributes.Mtime, 0)
- attr.Gid = file.entry.Attributes.Gid
- attr.Uid = file.entry.Attributes.Uid
+ attr.Crtime = time.Unix(entry.Attributes.Crtime, 0)
+ attr.Mtime = time.Unix(entry.Attributes.Mtime, 0)
+ attr.Gid = entry.Attributes.Gid
+ attr.Uid = entry.Attributes.Uid
attr.Blocks = attr.Size/blockSize + 1
attr.BlockSize = uint32(file.wfs.option.ChunkSizeLimit)
- if file.entry.HardLinkCounter > 0 {
- attr.Nlink = uint32(file.entry.HardLinkCounter)
+ if entry.HardLinkCounter > 0 {
+ attr.Nlink = uint32(entry.HardLinkCounter)
}
return nil
@@ -79,11 +80,12 @@ func (file *File) Getxattr(ctx context.Context, req *fuse.GetxattrRequest, resp
glog.V(4).Infof("file Getxattr %s", file.fullpath())
- if err := file.maybeLoadEntry(ctx); err != nil {
+ entry, err := file.maybeLoadEntry(ctx)
+ if err != nil {
return err
}
- return getxattr(file.entry, req, resp)
+ return getxattr(entry, req, resp)
}
func (file *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fs.Handle, error) {
@@ -104,7 +106,8 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f
glog.V(4).Infof("%v file setattr %+v", file.fullpath(), req)
- if err := file.maybeLoadEntry(ctx); err != nil {
+ _, err := file.maybeLoadEntry(ctx)
+ if err != nil {
return err
}
if file.isOpen > 0 {
@@ -141,7 +144,7 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f
}
}
file.entry.Chunks = chunks
- file.entryViewCache = nil
+ file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(file.wfs), chunks)
file.reader = nil
file.wfs.deleteFileChunks(truncatedChunks)
}
@@ -186,7 +189,7 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f
return nil
}
- return file.saveEntry()
+ return file.saveEntry(file.entry)
}
@@ -194,15 +197,16 @@ func (file *File) Setxattr(ctx context.Context, req *fuse.SetxattrRequest) error
glog.V(4).Infof("file Setxattr %s: %s", file.fullpath(), req.Name)
- if err := file.maybeLoadEntry(ctx); err != nil {
+ entry, err := file.maybeLoadEntry(ctx)
+ if err != nil {
return err
}
- if err := setxattr(file.entry, req); err != nil {
+ if err := setxattr(entry, req); err != nil {
return err
}
- return file.saveEntry()
+ return file.saveEntry(entry)
}
@@ -210,15 +214,16 @@ func (file *File) Removexattr(ctx context.Context, req *fuse.RemovexattrRequest)
glog.V(4).Infof("file Removexattr %s: %s", file.fullpath(), req.Name)
- if err := file.maybeLoadEntry(ctx); err != nil {
+ entry, err := file.maybeLoadEntry(ctx)
+ if err != nil {
return err
}
- if err := removexattr(file.entry, req); err != nil {
+ if err := removexattr(entry, req); err != nil {
return err
}
- return file.saveEntry()
+ return file.saveEntry(entry)
}
@@ -226,11 +231,12 @@ func (file *File) Listxattr(ctx context.Context, req *fuse.ListxattrRequest, res
glog.V(4).Infof("file Listxattr %s", file.fullpath())
- if err := file.maybeLoadEntry(ctx); err != nil {
+ entry, err := file.maybeLoadEntry(ctx)
+ if err != nil {
return err
}
- if err := listxattr(file.entry, req, resp); err != nil {
+ if err := listxattr(entry, req, resp); err != nil {
return err
}
@@ -252,30 +258,61 @@ func (file *File) Forget() {
file.wfs.fsNodeCache.DeleteFsNode(t)
}
-func (file *File) maybeLoadEntry(ctx context.Context) error {
- if (file.entry != nil && len(file.entry.HardLinkId) != 0) || file.isOpen > 0 {
- return nil
+func (file *File) maybeLoadEntry(ctx context.Context) (entry *filer_pb.Entry, err error) {
+ entry = file.entry
+ if file.isOpen > 0 {
+ return entry, nil
}
- entry, err := file.wfs.maybeLoadEntry(file.dir.FullPath(), file.Name)
+ if entry != nil {
+ if len(entry.HardLinkId) == 0 {
+ // only always reload hard link
+ return entry, nil
+ }
+ }
+ entry, err = file.wfs.maybeLoadEntry(file.dir.FullPath(), file.Name)
if err != nil {
glog.V(3).Infof("maybeLoadEntry file %s/%s: %v", file.dir.FullPath(), file.Name, err)
- return err
+ return entry, err
}
if entry != nil {
file.setEntry(entry)
+ } else {
+ glog.Warningf("maybeLoadEntry not found entry %s/%s: %v", file.dir.FullPath(), file.Name, err)
}
- return nil
+ return entry, nil
+}
+
+func lessThan(a, b *filer_pb.FileChunk) bool {
+ if a.Mtime == b.Mtime {
+ return a.Fid.FileKey < b.Fid.FileKey
+ }
+ return a.Mtime < b.Mtime
}
func (file *File) addChunks(chunks []*filer_pb.FileChunk) {
- sort.Slice(chunks, func(i, j int) bool {
- if chunks[i].Mtime == chunks[j].Mtime {
- return chunks[i].Fid.FileKey < chunks[j].Fid.FileKey
+ // find the earliest incoming chunk
+ newChunks := chunks
+ earliestChunk := newChunks[0]
+ for i:=1;i<len(newChunks);i++{
+ if lessThan(earliestChunk, newChunks[i]) {
+ earliestChunk = newChunks[i]
+ }
+ }
+
+ // pick out-of-order chunks from existing chunks
+ for _, chunk := range file.entry.Chunks {
+ if lessThan(earliestChunk, chunk) {
+ chunks = append(chunks, chunk)
}
- return chunks[i].Mtime < chunks[j].Mtime
+ }
+
+ // sort incoming chunks
+ sort.Slice(chunks, func(i, j int) bool {
+ return lessThan(chunks[i], chunks[j])
})
+ // add to entry view cache
for _, chunk := range chunks {
file.entryViewCache = filer.MergeIntoVisibles(file.entryViewCache, chunk)
}
@@ -284,24 +321,30 @@ func (file *File) addChunks(chunks []*filer_pb.FileChunk) {
glog.V(4).Infof("%s existing %d chunks adds %d more", file.fullpath(), len(file.entry.Chunks), len(chunks))
- file.entry.Chunks = append(file.entry.Chunks, chunks...)
+ file.entry.Chunks = append(file.entry.Chunks, newChunks...)
}
func (file *File) setEntry(entry *filer_pb.Entry) {
file.entry = entry
- file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(file.wfs), file.entry.Chunks)
+ file.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(file.wfs), entry.Chunks)
+ file.reader = nil
+}
+
+func (file *File) clearEntry() {
+ file.entry = nil
+ file.entryViewCache = nil
file.reader = nil
}
-func (file *File) saveEntry() error {
+func (file *File) saveEntry(entry *filer_pb.Entry) error {
return file.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
- file.wfs.mapPbIdFromLocalToFiler(file.entry)
- defer file.wfs.mapPbIdFromFilerToLocal(file.entry)
+ file.wfs.mapPbIdFromLocalToFiler(entry)
+ defer file.wfs.mapPbIdFromFilerToLocal(entry)
request := &filer_pb.UpdateEntryRequest{
Directory: file.dir.FullPath(),
- Entry: file.entry,
+ Entry: entry,
Signatures: []int32{file.wfs.signature},
}
diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go
index 54bde3494..54410a0ba 100644
--- a/weed/filesys/filehandle.go
+++ b/weed/filesys/filehandle.go
@@ -183,16 +183,18 @@ func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) err
}
if fh.f.isOpen == 0 {
+
if err := fh.doFlush(ctx, req.Header); err != nil {
glog.Errorf("Release doFlush %s: %v", fh.f.Name, err)
}
- fh.f.wfs.ReleaseHandle(fh.f.fullpath(), fuse.HandleID(fh.handle))
- }
- // stop the goroutine
- if !fh.dirtyPages.chunkSaveErrChanClosed {
- fh.dirtyPages.chunkSaveErrChanClosed = true
- close(fh.dirtyPages.chunkSaveErrChan)
+ // stop the goroutine
+ if !fh.dirtyPages.chunkSaveErrChanClosed {
+ fh.dirtyPages.chunkSaveErrChanClosed = true
+ close(fh.dirtyPages.chunkSaveErrChan)
+ }
+
+ fh.f.wfs.ReleaseHandle(fh.f.fullpath(), fuse.HandleID(fh.handle))
}
return nil
@@ -262,7 +264,6 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error {
glog.V(0).Infof("MaybeManifestize: %v", manifestErr)
}
fh.f.entry.Chunks = append(chunks, manifestChunks...)
- fh.f.entryViewCache = nil
fh.f.wfs.mapPbIdFromLocalToFiler(request.Entry)
defer fh.f.wfs.mapPbIdFromFilerToLocal(request.Entry)
diff --git a/weed/filesys/meta_cache/meta_cache_init.go b/weed/filesys/meta_cache/meta_cache_init.go
index f42d61230..4089cea28 100644
--- a/weed/filesys/meta_cache/meta_cache_init.go
+++ b/weed/filesys/meta_cache/meta_cache_init.go
@@ -3,8 +3,6 @@ package meta_cache
import (
"context"
"fmt"
- "strings"
- "time"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -18,7 +16,7 @@ func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.Full
glog.V(4).Infof("ReadDirAllEntries %s ...", path)
- for waitTime := time.Second; waitTime < filer.ReadWaitTime; waitTime += waitTime / 2 {
+ util.Retry("ReadDirAllEntries", func() error {
err = filer_pb.ReadDirAllEntries(client, dirPath, "", func(pbEntry *filer_pb.Entry, isLast bool) error {
entry := filer.FromPbEntry(string(dirPath), pbEntry)
if err := mc.doInsertEntry(context.Background(), entry); err != nil {
@@ -30,17 +28,13 @@ func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.Full
}
return nil
})
- if err == nil {
- break
- }
- if strings.Contains(err.Error(), "transport: ") {
- glog.V(0).Infof("ReadDirAllEntries %s: %v. Retry in %v", path, err, waitTime)
- time.Sleep(waitTime)
- continue
- }
+ return err
+ })
+
+ if err != nil {
err = fmt.Errorf("list %s: %v", dirPath, err)
- break
}
+
return
})
}
diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go
index 759e21b15..cd14e8032 100644
--- a/weed/filesys/wfs.go
+++ b/weed/filesys/wfs.go
@@ -31,6 +31,7 @@ type Option struct {
Replication string
TtlSec int32
ChunkSizeLimit int64
+ ConcurrentWriters int
CacheDir string
CacheSizeMB int64
DataCenter string
@@ -68,6 +69,9 @@ type WFS struct {
chunkCache *chunk_cache.TieredChunkCache
metaCache *meta_cache.MetaCache
signature int32
+
+ // throttle writers
+ concurrentWriters *util.LimitedConcurrentExecutor
}
type statsCache struct {
filer_pb.StatisticsResponse
@@ -96,7 +100,7 @@ func NewSeaweedFileSystem(option *Option) *WFS {
fsNode := wfs.fsNodeCache.GetFsNode(filePath)
if fsNode != nil {
if file, ok := fsNode.(*File); ok {
- file.entry = nil
+ file.clearEntry()
}
}
})
@@ -110,6 +114,10 @@ func NewSeaweedFileSystem(option *Option) *WFS {
wfs.root = &Dir{name: wfs.option.FilerMountRootPath, wfs: wfs, entry: entry}
wfs.fsNodeCache = newFsCache(wfs.root)
+ if wfs.option.ConcurrentWriters > 0 {
+ wfs.concurrentWriters = util.NewLimitedConcurrentExecutor(wfs.option.ConcurrentWriters)
+ }
+
return wfs
}
diff --git a/weed/filesys/wfs_filer_client.go b/weed/filesys/wfs_filer_client.go
index 096ee555f..dd76f5669 100644
--- a/weed/filesys/wfs_filer_client.go
+++ b/weed/filesys/wfs_filer_client.go
@@ -1,6 +1,7 @@
package filesys
import (
+ "github.com/chrislusf/seaweedfs/weed/util"
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/pb"
@@ -11,10 +12,12 @@ var _ = filer_pb.FilerClient(&WFS{})
func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
- err := pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
- client := filer_pb.NewSeaweedFilerClient(grpcConnection)
- return fn(client)
- }, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption)
+ err := util.Retry("filer grpc", func() error {
+ return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
+ client := filer_pb.NewSeaweedFilerClient(grpcConnection)
+ return fn(client)
+ }, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption)
+ })
if err == nil {
return nil
diff --git a/weed/ftpd/ftp_server.go b/weed/ftpd/ftp_server.go
new file mode 100644
index 000000000..4a0dca2c3
--- /dev/null
+++ b/weed/ftpd/ftp_server.go
@@ -0,0 +1,81 @@
+package ftpd
+
+import (
+ "crypto/tls"
+ "errors"
+ "fmt"
+ "net"
+
+ ftpserver "github.com/fclairamb/ftpserverlib"
+ "google.golang.org/grpc"
+)
+
+type FtpServerOption struct {
+ Filer string
+ IP string
+ IpBind string
+ Port int
+ FilerGrpcAddress string
+ FtpRoot string
+ GrpcDialOption grpc.DialOption
+ PassivePortStart int
+ PassivePortStop int
+}
+
+type SftpServer struct {
+ option *FtpServerOption
+ ftpListener net.Listener
+}
+
+var _ = ftpserver.MainDriver(&SftpServer{})
+
+// NewServer returns a new FTP server driver
+func NewFtpServer(ftpListener net.Listener, option *FtpServerOption) (*SftpServer, error) {
+ var err error
+ server := &SftpServer{
+ option: option,
+ ftpListener: ftpListener,
+ }
+ return server, err
+}
+
+// GetSettings returns some general settings around the server setup
+func (s *SftpServer) GetSettings() (*ftpserver.Settings, error) {
+ var portRange *ftpserver.PortRange
+ if s.option.PassivePortStart > 0 && s.option.PassivePortStop > s.option.PassivePortStart {
+ portRange = &ftpserver.PortRange{
+ Start: s.option.PassivePortStart,
+ End: s.option.PassivePortStop,
+ }
+ }
+
+ return &ftpserver.Settings{
+ Listener: s.ftpListener,
+ ListenAddr: fmt.Sprintf("%s:%d", s.option.IpBind, s.option.Port),
+ PublicHost: s.option.IP,
+ PassiveTransferPortRange: portRange,
+ ActiveTransferPortNon20: true,
+ IdleTimeout: -1,
+ ConnectionTimeout: 20,
+ }, nil
+}
+
+// ClientConnected is called to send the very first welcome message
+func (s *SftpServer) ClientConnected(cc ftpserver.ClientContext) (string, error) {
+ return "Welcome to SeaweedFS FTP Server", nil
+}
+
+// ClientDisconnected is called when the user disconnects, even if he never authenticated
+func (s *SftpServer) ClientDisconnected(cc ftpserver.ClientContext) {
+}
+
+// AuthUser authenticates the user and selects an handling driver
+func (s *SftpServer) AuthUser(cc ftpserver.ClientContext, username, password string) (ftpserver.ClientDriver, error) {
+ return nil, nil
+}
+
+// GetTLSConfig returns a TLS Certificate to use
+// The certificate could frequently change if we use something like "let's encrypt"
+func (s *SftpServer) GetTLSConfig() (*tls.Config, error) {
+ return nil, errors.New("no TLS certificate configured")
+}
diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go
index a4148cb22..ac0b477cb 100644
--- a/weed/operation/upload_content.go
+++ b/weed/operation/upload_content.go
@@ -81,21 +81,18 @@ func doUpload(uploadUrl string, filename string, cipher bool, reader io.Reader,
if ok {
data = bytesReader.Bytes
} else {
- buf := bytebufferpool.Get()
- _, err = buf.ReadFrom(reader)
- defer bytebufferpool.Put(buf)
+ data, err = ioutil.ReadAll(reader)
if err != nil {
err = fmt.Errorf("read input: %v", err)
return
}
- data = buf.Bytes()
}
uploadResult, uploadErr := retriedUploadData(uploadUrl, filename, cipher, data, isInputCompressed, mtype, pairMap, jwt)
return uploadResult, uploadErr, data
}
func retriedUploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) {
- for i := 0; i < 1; i++ {
+ for i := 0; i < 3; i++ {
uploadResult, err = doUploadData(uploadUrl, filename, cipher, data, isInputCompressed, mtype, pairMap, jwt)
if err == nil {
return
diff --git a/weed/pb/grpc_client_server.go b/weed/pb/grpc_client_server.go
index ce706e282..f19af43b2 100644
--- a/weed/pb/grpc_client_server.go
+++ b/weed/pb/grpc_client_server.go
@@ -62,6 +62,7 @@ func GrpcDial(ctx context.Context, address string, opts ...grpc.DialOption) (*gr
grpc.WithDefaultCallOptions(
grpc.MaxCallSendMsgSize(Max_Message_Size),
grpc.MaxCallRecvMsgSize(Max_Message_Size),
+ grpc.WaitForReady(true),
),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 30 * time.Second, // client ping server if no activity for this long
diff --git a/weed/s3api/filer_util_tags.go b/weed/s3api/filer_util_tags.go
index 3d4da7825..75d3b37d0 100644
--- a/weed/s3api/filer_util_tags.go
+++ b/weed/s3api/filer_util_tags.go
@@ -4,10 +4,11 @@ import (
"strings"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ xhttp "github.com/chrislusf/seaweedfs/weed/s3api/http"
)
const (
- S3TAG_PREFIX = "s3-"
+ S3TAG_PREFIX = xhttp.AmzObjectTagging + "-"
)
func (s3a *S3ApiServer) getTags(parentDirectoryPath string, entryName string) (tags map[string]string, err error) {
diff --git a/weed/s3api/http/header.go b/weed/s3api/http/header.go
new file mode 100644
index 000000000..2802b560f
--- /dev/null
+++ b/weed/s3api/http/header.go
@@ -0,0 +1,30 @@
+/*
+ * MinIO Cloud Storage, (C) 2019 MinIO, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package http
+
+// Standard S3 HTTP request constants
+const (
+ // S3 storage class
+ AmzStorageClass = "x-amz-storage-class"
+
+ // S3 user-defined metadata
+ AmzUserMetaPrefix = "X-Amz-Meta-"
+
+ // S3 object tagging
+ AmzObjectTagging = "X-Amz-Tagging"
+ AmzTagCount = "x-amz-tagging-count"
+)
diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go
index fa628f44e..fe134c102 100644
--- a/weed/s3api/s3api_object_handlers.go
+++ b/weed/s3api/s3api_object_handlers.go
@@ -266,7 +266,7 @@ func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, des
resp, postErr := client.Do(proxyReq)
- if resp.ContentLength == -1 && !strings.HasSuffix(destUrl, "/") {
+ if (resp.ContentLength == -1 || resp.StatusCode == 404) && !strings.HasSuffix(destUrl, "/") {
writeErrorResponse(w, s3err.ErrNoSuchKey, r.URL)
return
}
diff --git a/weed/s3api/s3api_objects_list_handlers.go b/weed/s3api/s3api_objects_list_handlers.go
index 23406d6df..5d63f1039 100644
--- a/weed/s3api/s3api_objects_list_handlers.go
+++ b/weed/s3api/s3api_objects_list_handlers.go
@@ -4,7 +4,6 @@ import (
"context"
"encoding/xml"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/s3api/s3err"
"io"
"net/http"
"net/url"
@@ -15,6 +14,8 @@ import (
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ xhttp "github.com/chrislusf/seaweedfs/weed/s3api/http"
+ "github.com/chrislusf/seaweedfs/weed/s3api/s3err"
)
type ListBucketResultV2 struct {
@@ -137,6 +138,10 @@ func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, m
})
}
} else {
+ storageClass := "STANDARD"
+ if v, ok := entry.Extended[xhttp.AmzStorageClass]; ok {
+ storageClass = string(v)
+ }
contents = append(contents, ListEntry{
Key: fmt.Sprintf("%s/%s", dir, entry.Name)[len(bucketPrefix):],
LastModified: time.Unix(entry.Attributes.Mtime, 0).UTC(),
@@ -146,7 +151,7 @@ func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, m
ID: fmt.Sprintf("%x", entry.Attributes.Uid),
DisplayName: entry.Attributes.UserName,
},
- StorageClass: "STANDARD",
+ StorageClass: StorageClass(storageClass),
})
}
})
diff --git a/weed/server/filer_server_handlers.go b/weed/server/filer_server_handlers.go
index 18f78881c..555036feb 100644
--- a/weed/server/filer_server_handlers.go
+++ b/weed/server/filer_server_handlers.go
@@ -10,6 +10,10 @@ import (
func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Server", "SeaweedFS Filer "+util.VERSION)
+ if r.Header.Get("Origin") != "" {
+ w.Header().Set("Access-Control-Allow-Origin", "*")
+ w.Header().Set("Access-Control-Allow-Credentials", "true")
+ }
start := time.Now()
switch r.Method {
case "GET":
@@ -32,11 +36,19 @@ func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) {
stats.FilerRequestCounter.WithLabelValues("post").Inc()
fs.PostHandler(w, r)
stats.FilerRequestHistogram.WithLabelValues("post").Observe(time.Since(start).Seconds())
+ case "OPTIONS":
+ stats.FilerRequestCounter.WithLabelValues("options").Inc()
+ OptionsHandler(w, r, false)
+ stats.FilerRequestHistogram.WithLabelValues("head").Observe(time.Since(start).Seconds())
}
}
func (fs *FilerServer) readonlyFilerHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Server", "SeaweedFS Filer "+util.VERSION)
+ if r.Header.Get("Origin") != "" {
+ w.Header().Set("Access-Control-Allow-Origin", "*")
+ w.Header().Set("Access-Control-Allow-Credentials", "true")
+ }
start := time.Now()
switch r.Method {
case "GET":
@@ -47,5 +59,18 @@ func (fs *FilerServer) readonlyFilerHandler(w http.ResponseWriter, r *http.Reque
stats.FilerRequestCounter.WithLabelValues("head").Inc()
fs.GetOrHeadHandler(w, r, false)
stats.FilerRequestHistogram.WithLabelValues("head").Observe(time.Since(start).Seconds())
+ case "OPTIONS":
+ stats.FilerRequestCounter.WithLabelValues("options").Inc()
+ OptionsHandler(w, r, true)
+ stats.FilerRequestHistogram.WithLabelValues("head").Observe(time.Since(start).Seconds())
+ }
+}
+
+func OptionsHandler(w http.ResponseWriter, r *http.Request, isReadOnly bool) {
+ if isReadOnly {
+ w.Header().Add("Access-Control-Allow-Methods", "GET, OPTIONS")
+ } else {
+ w.Header().Add("Access-Control-Allow-Methods", "PUT, POST, GET, DELETE, OPTIONS")
}
+ w.Header().Add("Access-Control-Allow-Headers", "*")
}
diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go
index fbd45d6b9..69d485e90 100644
--- a/weed/server/filer_server_handlers_read.go
+++ b/weed/server/filer_server_handlers_read.go
@@ -15,6 +15,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/images"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ xhttp "github.com/chrislusf/seaweedfs/weed/s3api/http"
"github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -93,6 +94,24 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
}
}
+ // print out the header from extended properties
+ for k, v := range entry.Extended {
+ w.Header().Set(k, string(v))
+ }
+
+ //set tag count
+ if r.Method == "GET" {
+ tagCount := 0
+ for k := range entry.Extended {
+ if strings.HasPrefix(k, xhttp.AmzObjectTagging+"-") {
+ tagCount++
+ }
+ }
+ if tagCount > 0 {
+ w.Header().Set(xhttp.AmzTagCount, strconv.Itoa(tagCount))
+ }
+ }
+
// set etag
etag := filer.ETagEntry(entry)
if inm := r.Header.Get("If-None-Match"); inm == "\""+etag+"\"" {
diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go
index d86d49b2a..ee0af9aab 100644
--- a/weed/server/filer_server_handlers_write_autochunk.go
+++ b/weed/server/filer_server_handlers_write_autochunk.go
@@ -18,8 +18,10 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ xhttp "github.com/chrislusf/seaweedfs/weed/s3api/http"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/stats"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -176,6 +178,18 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
Size: chunkOffset,
}
+ if entry.Extended == nil {
+ entry.Extended = make(map[string][]byte)
+ }
+
+ fs.saveAmzMetaData(r, entry)
+
+ for k, v := range r.Header {
+ if len(v) > 0 && strings.HasPrefix(k, needle.PairNamePrefix) {
+ entry.Extended[k] = []byte(v[0])
+ }
+ }
+
if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil); dbErr != nil {
fs.filer.DeleteChunks(entry.Chunks)
replyerr = dbErr
@@ -308,3 +322,27 @@ func (fs *FilerServer) mkdir(ctx context.Context, w http.ResponseWriter, r *http
}
return filerResult, replyerr
}
+
+func (fs *FilerServer) saveAmzMetaData(r *http.Request, entry *filer.Entry) {
+
+ if sc := r.Header.Get(xhttp.AmzStorageClass); sc != "" {
+ entry.Extended[xhttp.AmzStorageClass] = []byte(sc)
+ }
+
+ if tags := r.Header.Get(xhttp.AmzObjectTagging); tags != "" {
+ for _, v := range strings.Split(tags, "&") {
+ tag := strings.Split(v, "=")
+ if len(tag) == 2 {
+ entry.Extended[xhttp.AmzObjectTagging+"-"+tag[0]] = []byte(tag[1])
+ }
+ }
+ }
+
+ for header, values := range r.Header {
+ if strings.HasPrefix(header, xhttp.AmzUserMetaPrefix) {
+ for _, value := range values {
+ entry.Extended[header] = []byte(value)
+ }
+ }
+ }
+}
diff --git a/weed/server/filer_ui/templates.go b/weed/server/filer_ui/templates.go
index f86dde5b1..3f0647119 100644
--- a/weed/server/filer_ui/templates.go
+++ b/weed/server/filer_ui/templates.go
@@ -25,7 +25,7 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC
<meta name="viewport" content="width=device-width, initial-scale=1">
<link rel="stylesheet" href="/seaweedfsstatic/bootstrap/3.3.1/css/bootstrap.min.css">
<style>
-body { padding-bottom: 70px; }
+body { padding-bottom: 128px; }
#drop-area {
border: 1px transparent;
}
diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go
index 199f8faba..2f594fa2b 100644
--- a/weed/server/volume_grpc_client_to_master.go
+++ b/weed/server/volume_grpc_client_to_master.go
@@ -203,6 +203,7 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi
}
case <-volumeTickChan:
glog.V(4).Infof("volume server %s:%d heartbeat", vs.store.Ip, vs.store.Port)
+ vs.store.MaybeAdjustVolumeMax()
if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err)
return "", err
@@ -216,6 +217,23 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi
case err = <-doneChan:
return
case <-vs.stopChan:
+ var volumeMessages []*master_pb.VolumeInformationMessage
+ emptyBeat := &master_pb.Heartbeat{
+ Ip: vs.store.Ip,
+ Port: uint32(vs.store.Port),
+ PublicUrl: vs.store.PublicUrl,
+ MaxVolumeCount: uint32(0),
+ MaxFileKey: uint64(0),
+ DataCenter: vs.store.GetDataCenter(),
+ Rack: vs.store.GetRack(),
+ Volumes: volumeMessages,
+ HasNoVolumes: len(volumeMessages) == 0,
+ }
+ glog.V(1).Infof("volume server %s:%d stops and deletes all volumes", vs.store.Ip, vs.store.Port)
+ if err = stream.Send(emptyBeat); err != nil {
+ glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterNode, err)
+ return "", err
+ }
return
}
}
diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go
index cd2b53c8a..2aecb140f 100644
--- a/weed/server/volume_grpc_copy.go
+++ b/weed/server/volume_grpc_copy.go
@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
+ "io/ioutil"
"math"
"os"
"time"
@@ -60,13 +61,14 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
volumeFileName = storage.VolumeFileName(location.Directory, volFileInfoResp.Collection, int(req.VolumeId))
+ ioutil.WriteFile(volumeFileName+".note", []byte(fmt.Sprintf("copying from %s", req.SourceDataNode)), 0755)
+
// println("source:", volFileInfoResp.String())
- // copy ecx file
- if err := vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, volumeFileName, ".idx", false, false); err != nil {
+ if err := vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".dat", false, true); err != nil {
return err
}
- if err := vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".dat", false, true); err != nil {
+ if err := vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, volumeFileName, ".idx", false, false); err != nil {
return err
}
@@ -74,6 +76,8 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
return err
}
+ os.Remove(volumeFileName + ".note")
+
return nil
})
diff --git a/weed/server/volume_server_handlers.go b/weed/server/volume_server_handlers.go
index ad13cdf3b..7852c950a 100644
--- a/weed/server/volume_server_handlers.go
+++ b/weed/server/volume_server_handlers.go
@@ -1,10 +1,11 @@
package weed_server
import (
- "github.com/chrislusf/seaweedfs/weed/util"
"net/http"
"strings"
+ "github.com/chrislusf/seaweedfs/weed/util"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/stats"
@@ -27,6 +28,10 @@ security settings:
func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION)
+ if r.Header.Get("Origin") != "" {
+ w.Header().Set("Access-Control-Allow-Origin", "*")
+ w.Header().Set("Access-Control-Allow-Credentials", "true")
+ }
switch r.Method {
case "GET", "HEAD":
stats.ReadRequest()
@@ -37,11 +42,19 @@ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Reque
case "PUT", "POST":
stats.WriteRequest()
vs.guard.WhiteList(vs.PostHandler)(w, r)
+ case "OPTIONS":
+ stats.ReadRequest()
+ w.Header().Add("Access-Control-Allow-Methods", "PUT, POST, GET, DELETE, OPTIONS")
+ w.Header().Add("Access-Control-Allow-Headers", "*")
}
}
func (vs *VolumeServer) publicReadOnlyHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION)
+ if r.Header.Get("Origin") != "" {
+ w.Header().Set("Access-Control-Allow-Origin", "*")
+ w.Header().Set("Access-Control-Allow-Credentials", "true")
+ }
switch r.Method {
case "GET":
stats.ReadRequest()
@@ -49,6 +62,10 @@ func (vs *VolumeServer) publicReadOnlyHandler(w http.ResponseWriter, r *http.Req
case "HEAD":
stats.ReadRequest()
vs.GetOrHeadHandler(w, r)
+ case "OPTIONS":
+ stats.ReadRequest()
+ w.Header().Add("Access-Control-Allow-Methods", "GET, OPTIONS")
+ w.Header().Add("Access-Control-Allow-Headers", "*")
}
}
diff --git a/weed/shell/command_volume_mark.go b/weed/shell/command_volume_mark.go
new file mode 100644
index 000000000..19b614310
--- /dev/null
+++ b/weed/shell/command_volume_mark.go
@@ -0,0 +1,55 @@
+package shell
+
+import (
+ "flag"
+ "fmt"
+ "io"
+
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+)
+
+func init() {
+ Commands = append(Commands, &commandVolumeMark{})
+}
+
+type commandVolumeMark struct {
+}
+
+func (c *commandVolumeMark) Name() string {
+ return "volume.mark"
+}
+
+func (c *commandVolumeMark) Help() string {
+ return `Mark volume writable or readonly from one volume server
+
+ volume.mark -node <volume server host:port> -volumeId <volume id> -writable or -readonly
+`
+}
+
+func (c *commandVolumeMark) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ if err = commandEnv.confirmIsLocked(); err != nil {
+ return
+ }
+
+ volMarkCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
+ volumeIdInt := volMarkCommand.Int("volumeId", 0, "the volume id")
+ nodeStr := volMarkCommand.String("node", "", "the volume server <host>:<port>")
+ writable := volMarkCommand.Bool("writable", false, "volume mark writable")
+ readonly := volMarkCommand.Bool("readonly", false, "volume mark readonly")
+ if err = volMarkCommand.Parse(args); err != nil {
+ return nil
+ }
+ markWritable := false
+ if (*writable && *readonly) || (!*writable && !*readonly) {
+ return fmt.Errorf("use -readonly or -writable")
+ } else if *writable {
+ markWritable = true
+ }
+
+ sourceVolumeServer := *nodeStr
+
+ volumeId := needle.VolumeId(*volumeIdInt)
+
+ return markVolumeWritable(commandEnv.option.GrpcDialOption, volumeId, sourceVolumeServer, markWritable)
+}
diff --git a/weed/shell/command_volume_move.go b/weed/shell/command_volume_move.go
index b136604e5..2bc8dfad8 100644
--- a/weed/shell/command_volume_move.go
+++ b/weed/shell/command_volume_move.go
@@ -166,3 +166,18 @@ func deleteVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sour
return deleteErr
})
}
+
+func markVolumeWritable(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceVolumeServer string, writable bool) (err error) {
+ return operation.WithVolumeServerClient(sourceVolumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ if writable {
+ _, err = volumeServerClient.VolumeMarkWritable(context.Background(), &volume_server_pb.VolumeMarkWritableRequest{
+ VolumeId: uint32(volumeId),
+ })
+ } else {
+ _, err = volumeServerClient.VolumeMarkReadonly(context.Background(), &volume_server_pb.VolumeMarkReadonlyRequest{
+ VolumeId: uint32(volumeId),
+ })
+ }
+ return err
+ })
+}
diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go
index ed57aa54b..775ebf092 100644
--- a/weed/storage/disk_location.go
+++ b/weed/storage/disk_location.go
@@ -13,14 +13,16 @@ import (
"github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
type DiskLocation struct {
- Directory string
- MaxVolumeCount int
- MinFreeSpacePercent float32
- volumes map[needle.VolumeId]*Volume
- volumesLock sync.RWMutex
+ Directory string
+ MaxVolumeCount int
+ OriginalMaxVolumeCount int
+ MinFreeSpacePercent float32
+ volumes map[needle.VolumeId]*Volume
+ volumesLock sync.RWMutex
// erasure coding
ecVolumes map[needle.VolumeId]*erasure_coding.EcVolume
@@ -30,7 +32,7 @@ type DiskLocation struct {
}
func NewDiskLocation(dir string, maxVolumeCount int, minFreeSpacePercent float32) *DiskLocation {
- location := &DiskLocation{Directory: dir, MaxVolumeCount: maxVolumeCount, MinFreeSpacePercent: minFreeSpacePercent}
+ location := &DiskLocation{Directory: dir, MaxVolumeCount: maxVolumeCount, OriginalMaxVolumeCount: maxVolumeCount, MinFreeSpacePercent: minFreeSpacePercent}
location.volumes = make(map[needle.VolumeId]*Volume)
location.ecVolumes = make(map[needle.VolumeId]*erasure_coding.EcVolume)
go location.CheckDiskSpace()
@@ -60,6 +62,14 @@ func parseCollectionVolumeId(base string) (collection string, vid needle.VolumeI
func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind NeedleMapType) bool {
name := fileInfo.Name()
if !fileInfo.IsDir() && strings.HasSuffix(name, ".idx") {
+ name := name[:len(name)-len(".idx")]
+ noteFile := l.Directory + "/" + name + ".note"
+ if util.FileExists(noteFile) {
+ note, _ := ioutil.ReadFile(noteFile)
+ glog.Warningf("volume %s was not completed: %s", name, string(note))
+ removeVolumeFiles(l.Directory + "/" + name)
+ return false
+ }
vid, collection, err := l.volumeIdFromPath(fileInfo)
if err != nil {
glog.Warningf("get volume id failed, %s, err : %s", name, err)
@@ -85,7 +95,7 @@ func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind Ne
size, _, _ := v.FileStat()
glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s",
- l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), size, v.Ttl.String())
+ l.Directory+"/"+name+".dat", v.ReplicaPlacement, v.Version(), size, v.Ttl.String())
return true
}
return false
diff --git a/weed/storage/needle/needle_read_write.go b/weed/storage/needle/needle_read_write.go
index 89fc85b0d..e758a6fee 100644
--- a/weed/storage/needle/needle_read_write.go
+++ b/weed/storage/needle/needle_read_write.go
@@ -24,6 +24,8 @@ const (
TtlBytesLength = 2
)
+var ErrorSizeMismatch = errors.New("size mismatch")
+
func (n *Needle) DiskSize(version Version) int64 {
return GetActualSize(n.Size, version)
}
@@ -168,6 +170,11 @@ func ReadNeedleBlob(r backend.BackendStorageFile, offset int64, size Size, versi
func (n *Needle) ReadBytes(bytes []byte, offset int64, size Size, version Version) (err error) {
n.ParseNeedleHeader(bytes)
if n.Size != size {
+ // cookie is not always passed in for this API. Use size to do preliminary checking.
+ if OffsetSize == 4 && offset < int64(MaxPossibleVolumeSize) {
+ glog.Errorf("entry not found1: offset %d found id %x size %d, expected size %d", offset, n.Id, n.Size, size)
+ return ErrorSizeMismatch
+ }
return fmt.Errorf("entry not found: offset %d found id %x size %d, expected size %d", offset, n.Id, n.Size, size)
}
switch version {
diff --git a/weed/storage/store.go b/weed/storage/store.go
index b9fcfcba9..38f167cef 100644
--- a/weed/storage/store.go
+++ b/weed/storage/store.go
@@ -194,13 +194,19 @@ func (s *Store) SetDataCenter(dataCenter string) {
func (s *Store) SetRack(rack string) {
s.rack = rack
}
+func (s *Store) GetDataCenter() string {
+ return s.dataCenter
+}
+func (s *Store) GetRack() string {
+ return s.rack
+}
func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
var volumeMessages []*master_pb.VolumeInformationMessage
maxVolumeCount := 0
var maxFileKey NeedleId
collectionVolumeSize := make(map[string]uint64)
- collectionVolumeReadOnlyCount := make(map[string]uint8)
+ collectionVolumeReadOnlyCount := make(map[string]map[string]uint8)
for _, location := range s.Locations {
var deleteVids []needle.VolumeId
maxVolumeCount = maxVolumeCount + location.MaxVolumeCount
@@ -220,11 +226,24 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
}
}
collectionVolumeSize[v.Collection] += volumeMessage.Size
+ if _, exist := collectionVolumeReadOnlyCount[v.Collection]; !exist {
+ collectionVolumeReadOnlyCount[v.Collection] = map[string]uint8{
+ "IsReadOnly": 0,
+ "noWriteOrDelete": 0,
+ "noWriteCanDelete": 0,
+ "isDiskSpaceLow": 0,
+ }
+ }
if v.IsReadOnly() {
- collectionVolumeReadOnlyCount[v.Collection] += 1
- } else {
- if _, exist := collectionVolumeReadOnlyCount[v.Collection]; !exist {
- collectionVolumeReadOnlyCount[v.Collection] = 0
+ collectionVolumeReadOnlyCount[v.Collection]["IsReadOnly"] += 1
+ if v.noWriteOrDelete {
+ collectionVolumeReadOnlyCount[v.Collection]["noWriteOrDelete"] += 1
+ }
+ if v.noWriteCanDelete {
+ collectionVolumeReadOnlyCount[v.Collection]["noWriteCanDelete"] += 1
+ }
+ if v.location.isDiskSpaceLow {
+ collectionVolumeReadOnlyCount[v.Collection]["isDiskSpaceLow"] += 1
}
}
}
@@ -251,8 +270,10 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
stats.VolumeServerDiskSizeGauge.WithLabelValues(col, "normal").Set(float64(size))
}
- for col, count := range collectionVolumeReadOnlyCount {
- stats.VolumeServerReadOnlyVolumeGauge.WithLabelValues(col, "normal").Set(float64(count))
+ for col, types := range collectionVolumeReadOnlyCount {
+ for t, count := range types {
+ stats.VolumeServerReadOnlyVolumeGauge.WithLabelValues(col, t).Set(float64(count))
+ }
}
return &master_pb.Heartbeat{
@@ -440,7 +461,8 @@ func (s *Store) GetVolumeSizeLimit() uint64 {
func (s *Store) MaybeAdjustVolumeMax() (hasChanges bool) {
volumeSizeLimit := s.GetVolumeSizeLimit()
for _, diskLocation := range s.Locations {
- if diskLocation.MaxVolumeCount == 0 {
+ if diskLocation.OriginalMaxVolumeCount == 0 {
+ currentMaxVolumeCount := diskLocation.MaxVolumeCount
diskStatus := stats.NewDiskStatus(diskLocation.Directory)
unusedSpace := diskLocation.UnUsedSpace(volumeSizeLimit)
unclaimedSpaces := int64(diskStatus.Free) - int64(unusedSpace)
@@ -452,7 +474,7 @@ func (s *Store) MaybeAdjustVolumeMax() (hasChanges bool) {
diskLocation.MaxVolumeCount = maxVolumeCount
glog.V(0).Infof("disk %s max %d unclaimedSpace:%dMB, unused:%dMB volumeSizeLimit:%dMB",
diskLocation.Directory, maxVolumeCount, unclaimedSpaces/1024/1024, unusedSpace/1024/1024, volumeSizeLimit/1024/1024)
- hasChanges = true
+ hasChanges = hasChanges || currentMaxVolumeCount != diskLocation.MaxVolumeCount
}
}
return
diff --git a/weed/storage/volume_checking.go b/weed/storage/volume_checking.go
index e42fb238b..00e04047f 100644
--- a/weed/storage/volume_checking.go
+++ b/weed/storage/volume_checking.go
@@ -2,8 +2,10 @@ package storage
import (
"fmt"
+ "io"
"os"
+ "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/storage/idx"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
@@ -11,17 +13,40 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
)
-func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAtNs uint64, e error) {
+func CheckAndFixVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAtNs uint64, err error) {
var indexSize int64
- if indexSize, e = verifyIndexFileIntegrity(indexFile); e != nil {
- return 0, fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", indexFile.Name(), e)
+ if indexSize, err = verifyIndexFileIntegrity(indexFile); err != nil {
+ return 0, fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", indexFile.Name(), err)
}
if indexSize == 0 {
return 0, nil
}
+ healthyIndexSize := indexSize
+ for i := 1; i <= 10 && indexSize >= int64(i)*NeedleMapEntrySize; i++ {
+ // check and fix last 10 entries
+ lastAppendAtNs, err = doCheckAndFixVolumeData(v, indexFile, indexSize-int64(i)*NeedleMapEntrySize)
+ if err == io.EOF {
+ healthyIndexSize = indexSize - int64(i)*NeedleMapEntrySize
+ continue
+ }
+ if err != ErrorSizeMismatch {
+ break
+ }
+ }
+ if healthyIndexSize < indexSize {
+ glog.Warningf("CheckAndFixVolumeDataIntegrity truncate idx file %s from %d to %d", indexFile.Name(), indexSize, healthyIndexSize)
+ err = indexFile.Truncate(healthyIndexSize)
+ if err != nil {
+ glog.Warningf("CheckAndFixVolumeDataIntegrity truncate idx file %s from %d to %d: %v", indexFile.Name(), indexSize, healthyIndexSize, err)
+ }
+ }
+ return
+}
+
+func doCheckAndFixVolumeData(v *Volume, indexFile *os.File, indexOffset int64) (lastAppendAtNs uint64, err error) {
var lastIdxEntry []byte
- if lastIdxEntry, e = readIndexEntryAtOffset(indexFile, indexSize-NeedleMapEntrySize); e != nil {
- return 0, fmt.Errorf("readLastIndexEntry %s failed: %v", indexFile.Name(), e)
+ if lastIdxEntry, err = readIndexEntryAtOffset(indexFile, indexOffset); err != nil {
+ return 0, fmt.Errorf("readLastIndexEntry %s failed: %v", indexFile.Name(), err)
}
key, offset, size := idx.IdxFileEntry(lastIdxEntry)
if offset.IsZero() {
@@ -29,15 +54,15 @@ func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAtNs uin
}
if size < 0 {
// read the deletion entry
- if lastAppendAtNs, e = verifyDeletedNeedleIntegrity(v.DataBackend, v.Version(), key); e != nil {
- return lastAppendAtNs, fmt.Errorf("verifyNeedleIntegrity %s failed: %v", indexFile.Name(), e)
+ if lastAppendAtNs, err = verifyDeletedNeedleIntegrity(v.DataBackend, v.Version(), key); err != nil {
+ return lastAppendAtNs, fmt.Errorf("verifyNeedleIntegrity %s failed: %v", indexFile.Name(), err)
}
} else {
- if lastAppendAtNs, e = verifyNeedleIntegrity(v.DataBackend, v.Version(), offset.ToAcutalOffset(), key, size); e != nil {
- return lastAppendAtNs, fmt.Errorf("verifyNeedleIntegrity %s failed: %v", indexFile.Name(), e)
+ if lastAppendAtNs, err = verifyNeedleIntegrity(v.DataBackend, v.Version(), offset.ToAcutalOffset(), key, size); err != nil {
+ return lastAppendAtNs, err
}
}
- return
+ return lastAppendAtNs, nil
}
func verifyIndexFileIntegrity(indexFile *os.File) (indexSize int64, err error) {
@@ -60,7 +85,44 @@ func readIndexEntryAtOffset(indexFile *os.File, offset int64) (bytes []byte, err
}
func verifyNeedleIntegrity(datFile backend.BackendStorageFile, v needle.Version, offset int64, key NeedleId, size Size) (lastAppendAtNs uint64, err error) {
- n := new(needle.Needle)
+ n, _, _, err := needle.ReadNeedleHeader(datFile, v, offset)
+ if err == io.EOF {
+ return 0, err
+ }
+ if err != nil {
+ return 0, fmt.Errorf("read %s at %d", datFile.Name(), offset)
+ }
+ if n.Size != size {
+ return 0, ErrorSizeMismatch
+ }
+ if v == needle.Version3 {
+ bytes := make([]byte, TimestampSize)
+ _, err = datFile.ReadAt(bytes, offset+NeedleHeaderSize+int64(size)+needle.NeedleChecksumSize)
+ if err == io.EOF {
+ return 0, err
+ }
+ if err != nil {
+ return 0, fmt.Errorf("verifyNeedleIntegrity check %s entry offset %d size %d: %v", datFile.Name(), offset, size, err)
+ }
+ n.AppendAtNs = util.BytesToUint64(bytes)
+ fileTailOffset := offset + needle.GetActualSize(size, v)
+ fileSize, _, err := datFile.GetStat()
+ if err != nil {
+ return 0, fmt.Errorf("stat file %s: %v", datFile.Name(), err)
+ }
+ if fileSize == fileTailOffset {
+ return n.AppendAtNs, nil
+ }
+ if fileSize > fileTailOffset {
+ glog.Warningf("Truncate %s from %d bytes to %d bytes!", datFile.Name(), fileSize, fileTailOffset)
+ err = datFile.Truncate(fileTailOffset)
+ if err == nil {
+ return n.AppendAtNs, nil
+ }
+ return n.AppendAtNs, fmt.Errorf("truncate file %s: %v", datFile.Name(), err)
+ }
+ glog.Warningf("data file %s has %d bytes, less than expected %d bytes!", datFile.Name(), fileSize, fileTailOffset)
+ }
if err = n.ReadData(datFile, offset, size, v); err != nil {
return n.AppendAtNs, fmt.Errorf("read data [%d,%d) : %v", offset, offset+int64(size), err)
}
diff --git a/weed/storage/volume_loading.go b/weed/storage/volume_loading.go
index 73e2de02b..05684cbdb 100644
--- a/weed/storage/volume_loading.go
+++ b/weed/storage/volume_loading.go
@@ -89,7 +89,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, err)
}
}
- if v.lastAppendAtNs, err = CheckVolumeDataIntegrity(v, indexFile); err != nil {
+ if v.lastAppendAtNs, err = CheckAndFixVolumeDataIntegrity(v, indexFile); err != nil {
v.noWriteOrDelete = true
glog.V(0).Infof("volumeDataIntegrityChecking failed %v", err)
}
diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go
index 94c1d0ea1..869796a3f 100644
--- a/weed/storage/volume_read_write.go
+++ b/weed/storage/volume_read_write.go
@@ -17,6 +17,7 @@ import (
var ErrorNotFound = errors.New("not found")
var ErrorDeleted = errors.New("already deleted")
+var ErrorSizeMismatch = errors.New("size mismatch")
// isFileUnchanged checks whether this needle to write is same as last one.
// It requires serialized access in the same volume.
@@ -55,16 +56,21 @@ func (v *Volume) Destroy() (err error) {
}
}
v.Close()
- os.Remove(v.FileName() + ".dat")
- os.Remove(v.FileName() + ".idx")
- os.Remove(v.FileName() + ".vif")
- os.Remove(v.FileName() + ".sdx")
- os.Remove(v.FileName() + ".cpd")
- os.Remove(v.FileName() + ".cpx")
- os.RemoveAll(v.FileName() + ".ldb")
+ removeVolumeFiles(v.FileName())
return
}
+func removeVolumeFiles(filename string) {
+ os.Remove(filename + ".dat")
+ os.Remove(filename + ".idx")
+ os.Remove(filename + ".vif")
+ os.Remove(filename + ".sdx")
+ os.Remove(filename + ".cpd")
+ os.Remove(filename + ".cpx")
+ os.RemoveAll(filename + ".ldb")
+ os.Remove(filename + ".note")
+}
+
func (v *Volume) asyncRequestAppend(request *needle.AsyncRequest) {
v.asyncRequestsChan <- request
}
@@ -274,6 +280,9 @@ func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption) (int, erro
return 0, nil
}
err := n.ReadData(v.DataBackend, nv.Offset.ToAcutalOffset(), readSize, v.Version())
+ if err == needle.ErrorSizeMismatch && OffsetSize == 4 {
+ err = n.ReadData(v.DataBackend, nv.Offset.ToAcutalOffset()+int64(MaxPossibleVolumeSize), readSize, v.Version())
+ }
if err != nil {
return 0, err
}
diff --git a/weed/util/constants.go b/weed/util/constants.go
index 177c20a60..498ef11a2 100644
--- a/weed/util/constants.go
+++ b/weed/util/constants.go
@@ -5,7 +5,7 @@ import (
)
var (
- VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 07)
+ VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 8)
COMMIT = ""
)
diff --git a/weed/util/retry.go b/weed/util/retry.go
new file mode 100644
index 000000000..faaab0351
--- /dev/null
+++ b/weed/util/retry.go
@@ -0,0 +1,31 @@
+package util
+
+import (
+ "strings"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+)
+
+var RetryWaitTime = 6 * time.Second
+
+func Retry(name string, job func() error) (err error) {
+ waitTime := time.Second
+ hasErr := false
+ for waitTime < RetryWaitTime {
+ err = job()
+ if err == nil {
+ if hasErr {
+ glog.V(0).Infof("retry %s successfully", name)
+ }
+ break
+ }
+ if strings.Contains(err.Error(), "transport") {
+ hasErr = true
+ glog.V(0).Infof("retry %s", name)
+ time.Sleep(waitTime)
+ waitTime += waitTime / 2
+ }
+ }
+ return err
+}
diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go
index 60753e582..df8c186f2 100644
--- a/weed/wdclient/masterclient.go
+++ b/weed/wdclient/masterclient.go
@@ -5,6 +5,7 @@ import (
"math/rand"
"time"
+ "github.com/chrislusf/seaweedfs/weed/util"
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -150,10 +151,12 @@ func (mc *MasterClient) tryConnectToMaster(master string) (nextHintedLeader stri
}
func (mc *MasterClient) WithClient(fn func(client master_pb.SeaweedClient) error) error {
- for mc.currentMaster == "" {
- time.Sleep(3 * time.Second)
- }
- return pb.WithMasterClient(mc.currentMaster, mc.grpcDialOption, func(client master_pb.SeaweedClient) error {
- return fn(client)
+ return util.Retry("master grpc", func() error {
+ for mc.currentMaster == "" {
+ time.Sleep(3 * time.Second)
+ }
+ return pb.WithMasterClient(mc.currentMaster, mc.grpcDialOption, func(client master_pb.SeaweedClient) error {
+ return fn(client)
+ })
})
}