aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.github/ISSUE_TEMPLATE/bug_report.md1
-rw-r--r--go.mod2
-rw-r--r--go.sum2
-rw-r--r--weed/command/filer_replication.go14
-rw-r--r--weed/command/mount_std.go4
-rw-r--r--weed/filesys/dir.go16
-rw-r--r--weed/filesys/file.go7
-rw-r--r--weed/filesys/file_darwin.go8
-rw-r--r--weed/filesys/file_other.go7
-rw-r--r--weed/filesys/wfs.go14
-rw-r--r--weed/replication/sub/notification_aws_sqs.go2
-rw-r--r--weed/replication/sub/notification_gocdk_pub_sub.go17
-rw-r--r--weed/replication/sub/notification_google_pub_sub.go10
-rw-r--r--weed/replication/sub/notification_kafka.go2
-rw-r--r--weed/replication/sub/notifications.go2
-rw-r--r--weed/shell/command_volume_fix_replication.go30
16 files changed, 96 insertions, 42 deletions
diff --git a/.github/ISSUE_TEMPLATE/bug_report.md b/.github/ISSUE_TEMPLATE/bug_report.md
index 53423f4b7..edb95fac1 100644
--- a/.github/ISSUE_TEMPLATE/bug_report.md
+++ b/.github/ISSUE_TEMPLATE/bug_report.md
@@ -9,6 +9,7 @@ assignees: ''
Sponsors SeaweedFS via Patreon https://www.patreon.com/seaweedfs
Report issues here. Ask questions here https://stackoverflow.com/questions/tagged/seaweedfs
+Please ask questions in https://github.com/chrislusf/seaweedfs/discussions
example of a good issue report:
https://github.com/chrislusf/seaweedfs/issues/1005
diff --git a/go.mod b/go.mod
index 44950d25c..39acd0e9b 100644
--- a/go.mod
+++ b/go.mod
@@ -56,7 +56,7 @@ require (
github.com/prometheus/client_golang v1.3.0
github.com/rakyll/statik v0.1.7
github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563 // indirect
- github.com/seaweedfs/fuse v1.0.9
+ github.com/seaweedfs/fuse v1.1.0
github.com/seaweedfs/goexif v1.0.2
github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e
github.com/spaolacci/murmur3 v1.1.0 // indirect
diff --git a/go.sum b/go.sum
index f0b86cb62..eb16fac4c 100644
--- a/go.sum
+++ b/go.sum
@@ -577,6 +577,8 @@ github.com/seaweedfs/fuse v1.0.8 h1:HBPJTC77OlxwSd2JiTwvLPn8bWTElqQp3xs9vf3C15s=
github.com/seaweedfs/fuse v1.0.8/go.mod h1:W7ubwr1l7KQsMeUpxFFOFOSxUL/ucTRMAlVYs4xdfQ8=
github.com/seaweedfs/fuse v1.0.9 h1:3JZoGsW7cmmrd/U5KYcIGR2+EqyBvCYCoCpEdZAz/Dc=
github.com/seaweedfs/fuse v1.0.9/go.mod h1:+PP6WlkrRUG6KPE+Th2EX5To/PjHaFsvqg/UgQ39aj8=
+github.com/seaweedfs/fuse v1.1.0 h1:cL1qPHFNtFv0UuJTLjKKgWDzfJ4iZzTa4Y7ipc2acGw=
+github.com/seaweedfs/fuse v1.1.0/go.mod h1:+PP6WlkrRUG6KPE+Th2EX5To/PjHaFsvqg/UgQ39aj8=
github.com/seaweedfs/goexif v1.0.2 h1:p+rTXYdQ2mgxd+1JaTrQ9N8DvYuw9UH9xgYmJ+Bb29E=
github.com/seaweedfs/goexif v1.0.2/go.mod h1:MrKs5LK0HXdffrdCZrW3OIMegL2xXpC6ThLyXMyjdrk=
github.com/secsy/goftp v0.0.0-20190720192957-f31499d7c79a h1:C6IhVTxNkhlb0tlCB6JfHOUv1f0xHPK7V8X4HlJZEJw=
diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go
index b6515e505..e8c06b208 100644
--- a/weed/command/filer_replication.go
+++ b/weed/command/filer_replication.go
@@ -98,13 +98,19 @@ func runFilerReplicate(cmd *Command, args []string) bool {
replicator := replication.NewReplicator(config, "source.filer.", dataSink)
for {
- key, m, err := notificationInput.ReceiveMessage()
+ key, m, onSuccessFn, onFailureFn, err := notificationInput.ReceiveMessage()
if err != nil {
glog.Errorf("receive %s: %+v", key, err)
+ if onFailureFn != nil {
+ onFailureFn()
+ }
continue
}
if key == "" {
// long poll received no messages
+ if onSuccessFn != nil {
+ onSuccessFn()
+ }
continue
}
if m.OldEntry != nil && m.NewEntry == nil {
@@ -116,8 +122,14 @@ func runFilerReplicate(cmd *Command, args []string) bool {
}
if err = replicator.Replicate(context.Background(), key, m); err != nil {
glog.Errorf("replicate %s: %+v", key, err)
+ if onFailureFn != nil {
+ onFailureFn()
+ }
} else {
glog.V(1).Infof("replicated %s", key)
+ if onSuccessFn != nil {
+ onSuccessFn()
+ }
}
}
diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go
index 23f224cc2..eaa66519e 100644
--- a/weed/command/mount_std.go
+++ b/weed/command/mount_std.go
@@ -208,7 +208,9 @@ func RunMount(option *MountOptions, umask os.FileMode) bool {
})
glog.V(0).Infof("mounted %s%s to %s", filer, mountRoot, dir)
- err = fs.Serve(c, seaweedFileSystem)
+ server := fs.New(c, nil)
+ seaweedFileSystem.Server = server
+ err = server.Serve(seaweedFileSystem)
// check if the mount process has an error to report
<-c.Ready
diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go
index 3d0a00a8b..e3d0057db 100644
--- a/weed/filesys/dir.go
+++ b/weed/filesys/dir.go
@@ -58,7 +58,7 @@ func (dir *Dir) Attr(ctx context.Context, attr *fuse.Attr) error {
return err
}
- attr.Inode = util.FullPath(dir.FullPath()).AsInode()
+ // attr.Inode = util.FullPath(dir.FullPath()).AsInode()
attr.Mode = os.FileMode(dir.entry.Attributes.FileMode) | os.ModeDir
attr.Mtime = time.Unix(dir.entry.Attributes.Mtime, 0)
attr.Crtime = time.Unix(dir.entry.Attributes.Crtime, 0)
@@ -82,8 +82,8 @@ func (dir *Dir) Getxattr(ctx context.Context, req *fuse.GetxattrRequest, resp *f
}
func (dir *Dir) setRootDirAttributes(attr *fuse.Attr) {
- attr.Inode = 1 // filer2.FullPath(dir.Path).AsInode()
- attr.Valid = time.Hour
+ // attr.Inode = 1 // filer2.FullPath(dir.Path).AsInode()
+ attr.Valid = time.Second
attr.Uid = dir.wfs.option.MountUid
attr.Gid = dir.wfs.option.MountGid
attr.Mode = dir.wfs.option.MountMode
@@ -91,7 +91,7 @@ func (dir *Dir) setRootDirAttributes(attr *fuse.Attr) {
attr.Ctime = dir.wfs.option.MountCtime
attr.Mtime = dir.wfs.option.MountMtime
attr.Atime = dir.wfs.option.MountMtime
- attr.BlockSize = 1024 * 1024
+ attr.BlockSize = blockSize
}
func (dir *Dir) Fsync(ctx context.Context, req *fuse.FsyncRequest) error {
@@ -285,7 +285,7 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.
}
// resp.EntryValid = time.Second
- resp.Attr.Inode = fullFilePath.AsInode()
+ // resp.Attr.Inode = fullFilePath.AsInode()
resp.Attr.Valid = time.Second
resp.Attr.Mtime = time.Unix(entry.Attributes.Mtime, 0)
resp.Attr.Crtime = time.Unix(entry.Attributes.Crtime, 0)
@@ -308,13 +308,11 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) {
glog.V(4).Infof("dir ReadDirAll %s", dir.FullPath())
processEachEntryFn := func(entry *filer_pb.Entry, isLast bool) error {
- fullpath := util.NewFullPath(dir.FullPath(), entry.Name)
- inode := fullpath.AsInode()
if entry.IsDirectory {
- dirent := fuse.Dirent{Inode: inode, Name: entry.Name, Type: fuse.DT_Dir}
+ dirent := fuse.Dirent{Name: entry.Name, Type: fuse.DT_Dir}
ret = append(ret, dirent)
} else {
- dirent := fuse.Dirent{Inode: inode, Name: entry.Name, Type: findFileType(uint16(entry.Attributes.FileMode))}
+ dirent := fuse.Dirent{Name: entry.Name, Type: findFileType(uint16(entry.Attributes.FileMode))}
ret = append(ret, dirent)
}
return nil
diff --git a/weed/filesys/file.go b/weed/filesys/file.go
index a8d6dac29..a210c5152 100644
--- a/weed/filesys/file.go
+++ b/weed/filesys/file.go
@@ -45,7 +45,7 @@ func (file *File) fullpath() util.FullPath {
func (file *File) Attr(ctx context.Context, attr *fuse.Attr) (err error) {
- glog.V(4).Infof("file Attr %s, open:%v, existing attr: %+v", file.fullpath(), file.isOpen, attr)
+ glog.V(4).Infof("file Attr %s, open:%v existing:%v", file.fullpath(), file.isOpen, attr)
entry := file.entry
if file.isOpen <= 0 || entry == nil {
@@ -54,7 +54,7 @@ func (file *File) Attr(ctx context.Context, attr *fuse.Attr) (err error) {
}
}
- attr.Inode = file.fullpath().AsInode()
+ // attr.Inode = file.fullpath().AsInode()
attr.Valid = time.Second
attr.Mode = os.FileMode(entry.Attributes.FileMode)
attr.Size = filer.FileSize(entry)
@@ -91,9 +91,6 @@ func (file *File) Getxattr(ctx context.Context, req *fuse.GetxattrRequest, resp
func (file *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fs.Handle, error) {
glog.V(4).Infof("file %v open %+v", file.fullpath(), req)
- if USE_DIRECT_IO {
- resp.Flags |= fuse.OpenDirectIO
- }
handle := file.wfs.AcquireHandle(file, req.Uid, req.Gid)
diff --git a/weed/filesys/file_darwin.go b/weed/filesys/file_darwin.go
deleted file mode 100644
index b8806cff7..000000000
--- a/weed/filesys/file_darwin.go
+++ /dev/null
@@ -1,8 +0,0 @@
-//+build darwin
-
-package filesys
-
-const (
- USE_DIRECT_IO = false
-)
-
diff --git a/weed/filesys/file_other.go b/weed/filesys/file_other.go
deleted file mode 100644
index 8a8693511..000000000
--- a/weed/filesys/file_other.go
+++ /dev/null
@@ -1,7 +0,0 @@
-//+build !darwin
-
-package filesys
-
-const (
- USE_DIRECT_IO = true
-)
diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go
index 6cfadcc18..108e23c85 100644
--- a/weed/filesys/wfs.go
+++ b/weed/filesys/wfs.go
@@ -76,6 +76,7 @@ type WFS struct {
// throttle writers
concurrentWriters *util.LimitedConcurrentExecutor
+ Server *fs.Server
}
type statsCache struct {
filer_pb.StatisticsResponse
@@ -104,9 +105,22 @@ func NewSeaweedFileSystem(option *Option) *WFS {
fsNode := wfs.fsNodeCache.GetFsNode(filePath)
if fsNode != nil {
if file, ok := fsNode.(*File); ok {
+ if err := wfs.Server.InvalidateNodeData(file); err != nil {
+ glog.V(4).Infof("InvalidateNodeData %s : %v", filePath, err)
+ }
file.clearEntry()
}
}
+ dir, name := filePath.DirAndName()
+ parent := wfs.root
+ if dir != "/" {
+ parent = wfs.fsNodeCache.GetFsNode(util.FullPath(dir))
+ }
+ if parent != nil {
+ if err := wfs.Server.InvalidateEntry(parent, name); err != nil {
+ glog.V(4).Infof("InvalidateEntry %s : %v", filePath, err)
+ }
+ }
})
startTime := time.Now()
go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
diff --git a/weed/replication/sub/notification_aws_sqs.go b/weed/replication/sub/notification_aws_sqs.go
index 1dd386ba7..642834c72 100644
--- a/weed/replication/sub/notification_aws_sqs.go
+++ b/weed/replication/sub/notification_aws_sqs.go
@@ -68,7 +68,7 @@ func (k *AwsSqsInput) initialize(awsAccessKeyId, awsSecretAccessKey, region, que
return nil
}
-func (k *AwsSqsInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) {
+func (k *AwsSqsInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error) {
// receive message
result, err := k.svc.ReceiveMessage(&sqs.ReceiveMessageInput{
diff --git a/weed/replication/sub/notification_gocdk_pub_sub.go b/weed/replication/sub/notification_gocdk_pub_sub.go
index 9726096e5..09a96d5d5 100644
--- a/weed/replication/sub/notification_gocdk_pub_sub.go
+++ b/weed/replication/sub/notification_gocdk_pub_sub.go
@@ -38,13 +38,24 @@ func (k *GoCDKPubSubInput) Initialize(configuration util.Configuration, prefix s
return nil
}
-func (k *GoCDKPubSubInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) {
+func (k *GoCDKPubSubInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error) {
msg, err := k.sub.Receive(context.Background())
+ if err != nil {
+ return
+ }
+ onSuccessFn = func() {
+ msg.Ack()
+ }
+ onFailureFn = func() {
+ if msg.Nackable() {
+ msg.Nack()
+ }
+ }
key = msg.Metadata["key"]
message = &filer_pb.EventNotification{}
err = proto.Unmarshal(msg.Body, message)
if err != nil {
- return "", nil, err
+ return "", nil, onSuccessFn, onFailureFn, err
}
- return key, message, nil
+ return key, message, onSuccessFn, onFailureFn, nil
}
diff --git a/weed/replication/sub/notification_google_pub_sub.go b/weed/replication/sub/notification_google_pub_sub.go
index a950bb42b..f7c767d4a 100644
--- a/weed/replication/sub/notification_google_pub_sub.go
+++ b/weed/replication/sub/notification_google_pub_sub.go
@@ -85,16 +85,22 @@ func (k *GooglePubSubInput) initialize(google_application_credentials, projectId
go k.sub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) {
k.messageChan <- m
- m.Ack()
})
return err
}
-func (k *GooglePubSubInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) {
+func (k *GooglePubSubInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error) {
m := <-k.messageChan
+ onSuccessFn = func() {
+ m.Ack()
+ }
+ onFailureFn = func() {
+ m.Nack()
+ }
+
// process the message
key = m.Attributes["key"]
message = &filer_pb.EventNotification{}
diff --git a/weed/replication/sub/notification_kafka.go b/weed/replication/sub/notification_kafka.go
index fa9cfad9b..622a759ea 100644
--- a/weed/replication/sub/notification_kafka.go
+++ b/weed/replication/sub/notification_kafka.go
@@ -97,7 +97,7 @@ func (k *KafkaInput) initialize(hosts []string, topic string, offsetFile string,
return nil
}
-func (k *KafkaInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) {
+func (k *KafkaInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error) {
msg := <-k.messageChan
diff --git a/weed/replication/sub/notifications.go b/weed/replication/sub/notifications.go
index 8a2668f98..d5a910db9 100644
--- a/weed/replication/sub/notifications.go
+++ b/weed/replication/sub/notifications.go
@@ -10,7 +10,7 @@ type NotificationInput interface {
GetName() string
// Initialize initializes the file store
Initialize(configuration util.Configuration, prefix string) error
- ReceiveMessage() (key string, message *filer_pb.EventNotification, err error)
+ ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error)
}
var (
diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go
index 9b9abd8eb..c737133f5 100644
--- a/weed/shell/command_volume_fix_replication.go
+++ b/weed/shell/command_volume_fix_replication.go
@@ -6,6 +6,7 @@ import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"io"
+ "path/filepath"
"sort"
"github.com/chrislusf/seaweedfs/weed/operation"
@@ -19,6 +20,7 @@ func init() {
}
type commandVolumeFixReplication struct {
+ collectionPattern *string
}
func (c *commandVolumeFixReplication) Name() string {
@@ -33,8 +35,9 @@ func (c *commandVolumeFixReplication) Help() string {
This command also finds all under-replicated volumes, and finds volume servers with free slots.
If the free slots satisfy the replication requirement, the volume content is copied over and mounted.
- volume.fix.replication -n # do not take action
- volume.fix.replication # actually deleting or copying the volume files and mount the volume
+ volume.fix.replication -n # do not take action
+ volume.fix.replication # actually deleting or copying the volume files and mount the volume
+ volume.fix.replication -collectionPattern=important* # fix any collections with prefix "important"
Note:
* each time this will only add back one replica for one volume id. If there are multiple replicas
@@ -52,6 +55,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv,
}
volFixReplicationCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
+ c.collectionPattern = volFixReplicationCommand.String("collectionPattern", "", "match with wildcard characters '*' and '?'")
skipChange := volFixReplicationCommand.Bool("n", false, "skip the changes")
if err = volFixReplicationCommand.Parse(args); err != nil {
return nil
@@ -127,6 +131,17 @@ func (c *commandVolumeFixReplication) fixOverReplicatedVolumes(commandEnv *Comma
replica := pickOneReplicaToDelete(replicas, replicaPlacement)
+ // check collection name pattern
+ if *c.collectionPattern != "" {
+ matched, err := filepath.Match(*c.collectionPattern, replica.info.Collection)
+ if err != nil {
+ return fmt.Errorf("match pattern %s with collection %s: %v", *c.collectionPattern, replica.info.Collection, err)
+ }
+ if !matched {
+ break
+ }
+ }
+
fmt.Fprintf(writer, "deleting volume %d from %s ...\n", replica.info.Id, replica.location.dataNode.Id)
if !takeAction {
@@ -150,6 +165,17 @@ func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *Comm
for _, dst := range allLocations {
// check whether data nodes satisfy the constraints
if dst.dataNode.FreeVolumeCount > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) {
+ // check collection name pattern
+ if *c.collectionPattern != "" {
+ matched, err := filepath.Match(*c.collectionPattern, replica.info.Collection)
+ if err != nil {
+ return fmt.Errorf("match pattern %s with collection %s: %v", *c.collectionPattern, replica.info.Collection, err)
+ }
+ if !matched {
+ break
+ }
+ }
+
// ask the volume server to replicate the volume
foundNewLocation = true
fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", replica.info.Id, replicaPlacement, replica.location.dataNode.Id, dst.dataNode.Id)