diff options
| -rw-r--r-- | .github/ISSUE_TEMPLATE/bug_report.md | 1 | ||||
| -rw-r--r-- | go.mod | 2 | ||||
| -rw-r--r-- | go.sum | 2 | ||||
| -rw-r--r-- | weed/command/filer_replication.go | 14 | ||||
| -rw-r--r-- | weed/command/mount_std.go | 4 | ||||
| -rw-r--r-- | weed/filesys/dir.go | 16 | ||||
| -rw-r--r-- | weed/filesys/file.go | 7 | ||||
| -rw-r--r-- | weed/filesys/file_darwin.go | 8 | ||||
| -rw-r--r-- | weed/filesys/file_other.go | 7 | ||||
| -rw-r--r-- | weed/filesys/wfs.go | 14 | ||||
| -rw-r--r-- | weed/replication/sub/notification_aws_sqs.go | 2 | ||||
| -rw-r--r-- | weed/replication/sub/notification_gocdk_pub_sub.go | 17 | ||||
| -rw-r--r-- | weed/replication/sub/notification_google_pub_sub.go | 10 | ||||
| -rw-r--r-- | weed/replication/sub/notification_kafka.go | 2 | ||||
| -rw-r--r-- | weed/replication/sub/notifications.go | 2 | ||||
| -rw-r--r-- | weed/shell/command_volume_fix_replication.go | 30 |
16 files changed, 96 insertions, 42 deletions
diff --git a/.github/ISSUE_TEMPLATE/bug_report.md b/.github/ISSUE_TEMPLATE/bug_report.md index 53423f4b7..edb95fac1 100644 --- a/.github/ISSUE_TEMPLATE/bug_report.md +++ b/.github/ISSUE_TEMPLATE/bug_report.md @@ -9,6 +9,7 @@ assignees: '' Sponsors SeaweedFS via Patreon https://www.patreon.com/seaweedfs Report issues here. Ask questions here https://stackoverflow.com/questions/tagged/seaweedfs +Please ask questions in https://github.com/chrislusf/seaweedfs/discussions example of a good issue report: https://github.com/chrislusf/seaweedfs/issues/1005 @@ -56,7 +56,7 @@ require ( github.com/prometheus/client_golang v1.3.0 github.com/rakyll/statik v0.1.7 github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563 // indirect - github.com/seaweedfs/fuse v1.0.9 + github.com/seaweedfs/fuse v1.1.0 github.com/seaweedfs/goexif v1.0.2 github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e github.com/spaolacci/murmur3 v1.1.0 // indirect @@ -577,6 +577,8 @@ github.com/seaweedfs/fuse v1.0.8 h1:HBPJTC77OlxwSd2JiTwvLPn8bWTElqQp3xs9vf3C15s= github.com/seaweedfs/fuse v1.0.8/go.mod h1:W7ubwr1l7KQsMeUpxFFOFOSxUL/ucTRMAlVYs4xdfQ8= github.com/seaweedfs/fuse v1.0.9 h1:3JZoGsW7cmmrd/U5KYcIGR2+EqyBvCYCoCpEdZAz/Dc= github.com/seaweedfs/fuse v1.0.9/go.mod h1:+PP6WlkrRUG6KPE+Th2EX5To/PjHaFsvqg/UgQ39aj8= +github.com/seaweedfs/fuse v1.1.0 h1:cL1qPHFNtFv0UuJTLjKKgWDzfJ4iZzTa4Y7ipc2acGw= +github.com/seaweedfs/fuse v1.1.0/go.mod h1:+PP6WlkrRUG6KPE+Th2EX5To/PjHaFsvqg/UgQ39aj8= github.com/seaweedfs/goexif v1.0.2 h1:p+rTXYdQ2mgxd+1JaTrQ9N8DvYuw9UH9xgYmJ+Bb29E= github.com/seaweedfs/goexif v1.0.2/go.mod h1:MrKs5LK0HXdffrdCZrW3OIMegL2xXpC6ThLyXMyjdrk= github.com/secsy/goftp v0.0.0-20190720192957-f31499d7c79a h1:C6IhVTxNkhlb0tlCB6JfHOUv1f0xHPK7V8X4HlJZEJw= diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go index b6515e505..e8c06b208 100644 --- a/weed/command/filer_replication.go +++ b/weed/command/filer_replication.go @@ -98,13 +98,19 @@ func runFilerReplicate(cmd *Command, args []string) bool { replicator := replication.NewReplicator(config, "source.filer.", dataSink) for { - key, m, err := notificationInput.ReceiveMessage() + key, m, onSuccessFn, onFailureFn, err := notificationInput.ReceiveMessage() if err != nil { glog.Errorf("receive %s: %+v", key, err) + if onFailureFn != nil { + onFailureFn() + } continue } if key == "" { // long poll received no messages + if onSuccessFn != nil { + onSuccessFn() + } continue } if m.OldEntry != nil && m.NewEntry == nil { @@ -116,8 +122,14 @@ func runFilerReplicate(cmd *Command, args []string) bool { } if err = replicator.Replicate(context.Background(), key, m); err != nil { glog.Errorf("replicate %s: %+v", key, err) + if onFailureFn != nil { + onFailureFn() + } } else { glog.V(1).Infof("replicated %s", key) + if onSuccessFn != nil { + onSuccessFn() + } } } diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index 23f224cc2..eaa66519e 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -208,7 +208,9 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { }) glog.V(0).Infof("mounted %s%s to %s", filer, mountRoot, dir) - err = fs.Serve(c, seaweedFileSystem) + server := fs.New(c, nil) + seaweedFileSystem.Server = server + err = server.Serve(seaweedFileSystem) // check if the mount process has an error to report <-c.Ready diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go index 3d0a00a8b..e3d0057db 100644 --- a/weed/filesys/dir.go +++ b/weed/filesys/dir.go @@ -58,7 +58,7 @@ func (dir *Dir) Attr(ctx context.Context, attr *fuse.Attr) error { return err } - attr.Inode = util.FullPath(dir.FullPath()).AsInode() + // attr.Inode = util.FullPath(dir.FullPath()).AsInode() attr.Mode = os.FileMode(dir.entry.Attributes.FileMode) | os.ModeDir attr.Mtime = time.Unix(dir.entry.Attributes.Mtime, 0) attr.Crtime = time.Unix(dir.entry.Attributes.Crtime, 0) @@ -82,8 +82,8 @@ func (dir *Dir) Getxattr(ctx context.Context, req *fuse.GetxattrRequest, resp *f } func (dir *Dir) setRootDirAttributes(attr *fuse.Attr) { - attr.Inode = 1 // filer2.FullPath(dir.Path).AsInode() - attr.Valid = time.Hour + // attr.Inode = 1 // filer2.FullPath(dir.Path).AsInode() + attr.Valid = time.Second attr.Uid = dir.wfs.option.MountUid attr.Gid = dir.wfs.option.MountGid attr.Mode = dir.wfs.option.MountMode @@ -91,7 +91,7 @@ func (dir *Dir) setRootDirAttributes(attr *fuse.Attr) { attr.Ctime = dir.wfs.option.MountCtime attr.Mtime = dir.wfs.option.MountMtime attr.Atime = dir.wfs.option.MountMtime - attr.BlockSize = 1024 * 1024 + attr.BlockSize = blockSize } func (dir *Dir) Fsync(ctx context.Context, req *fuse.FsyncRequest) error { @@ -285,7 +285,7 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse. } // resp.EntryValid = time.Second - resp.Attr.Inode = fullFilePath.AsInode() + // resp.Attr.Inode = fullFilePath.AsInode() resp.Attr.Valid = time.Second resp.Attr.Mtime = time.Unix(entry.Attributes.Mtime, 0) resp.Attr.Crtime = time.Unix(entry.Attributes.Crtime, 0) @@ -308,13 +308,11 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) { glog.V(4).Infof("dir ReadDirAll %s", dir.FullPath()) processEachEntryFn := func(entry *filer_pb.Entry, isLast bool) error { - fullpath := util.NewFullPath(dir.FullPath(), entry.Name) - inode := fullpath.AsInode() if entry.IsDirectory { - dirent := fuse.Dirent{Inode: inode, Name: entry.Name, Type: fuse.DT_Dir} + dirent := fuse.Dirent{Name: entry.Name, Type: fuse.DT_Dir} ret = append(ret, dirent) } else { - dirent := fuse.Dirent{Inode: inode, Name: entry.Name, Type: findFileType(uint16(entry.Attributes.FileMode))} + dirent := fuse.Dirent{Name: entry.Name, Type: findFileType(uint16(entry.Attributes.FileMode))} ret = append(ret, dirent) } return nil diff --git a/weed/filesys/file.go b/weed/filesys/file.go index a8d6dac29..a210c5152 100644 --- a/weed/filesys/file.go +++ b/weed/filesys/file.go @@ -45,7 +45,7 @@ func (file *File) fullpath() util.FullPath { func (file *File) Attr(ctx context.Context, attr *fuse.Attr) (err error) { - glog.V(4).Infof("file Attr %s, open:%v, existing attr: %+v", file.fullpath(), file.isOpen, attr) + glog.V(4).Infof("file Attr %s, open:%v existing:%v", file.fullpath(), file.isOpen, attr) entry := file.entry if file.isOpen <= 0 || entry == nil { @@ -54,7 +54,7 @@ func (file *File) Attr(ctx context.Context, attr *fuse.Attr) (err error) { } } - attr.Inode = file.fullpath().AsInode() + // attr.Inode = file.fullpath().AsInode() attr.Valid = time.Second attr.Mode = os.FileMode(entry.Attributes.FileMode) attr.Size = filer.FileSize(entry) @@ -91,9 +91,6 @@ func (file *File) Getxattr(ctx context.Context, req *fuse.GetxattrRequest, resp func (file *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fs.Handle, error) { glog.V(4).Infof("file %v open %+v", file.fullpath(), req) - if USE_DIRECT_IO { - resp.Flags |= fuse.OpenDirectIO - } handle := file.wfs.AcquireHandle(file, req.Uid, req.Gid) diff --git a/weed/filesys/file_darwin.go b/weed/filesys/file_darwin.go deleted file mode 100644 index b8806cff7..000000000 --- a/weed/filesys/file_darwin.go +++ /dev/null @@ -1,8 +0,0 @@ -//+build darwin - -package filesys - -const ( - USE_DIRECT_IO = false -) - diff --git a/weed/filesys/file_other.go b/weed/filesys/file_other.go deleted file mode 100644 index 8a8693511..000000000 --- a/weed/filesys/file_other.go +++ /dev/null @@ -1,7 +0,0 @@ -//+build !darwin - -package filesys - -const ( - USE_DIRECT_IO = true -) diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index 6cfadcc18..108e23c85 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -76,6 +76,7 @@ type WFS struct { // throttle writers concurrentWriters *util.LimitedConcurrentExecutor + Server *fs.Server } type statsCache struct { filer_pb.StatisticsResponse @@ -104,9 +105,22 @@ func NewSeaweedFileSystem(option *Option) *WFS { fsNode := wfs.fsNodeCache.GetFsNode(filePath) if fsNode != nil { if file, ok := fsNode.(*File); ok { + if err := wfs.Server.InvalidateNodeData(file); err != nil { + glog.V(4).Infof("InvalidateNodeData %s : %v", filePath, err) + } file.clearEntry() } } + dir, name := filePath.DirAndName() + parent := wfs.root + if dir != "/" { + parent = wfs.fsNodeCache.GetFsNode(util.FullPath(dir)) + } + if parent != nil { + if err := wfs.Server.InvalidateEntry(parent, name); err != nil { + glog.V(4).Infof("InvalidateEntry %s : %v", filePath, err) + } + } }) startTime := time.Now() go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano()) diff --git a/weed/replication/sub/notification_aws_sqs.go b/weed/replication/sub/notification_aws_sqs.go index 1dd386ba7..642834c72 100644 --- a/weed/replication/sub/notification_aws_sqs.go +++ b/weed/replication/sub/notification_aws_sqs.go @@ -68,7 +68,7 @@ func (k *AwsSqsInput) initialize(awsAccessKeyId, awsSecretAccessKey, region, que return nil } -func (k *AwsSqsInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) { +func (k *AwsSqsInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error) { // receive message result, err := k.svc.ReceiveMessage(&sqs.ReceiveMessageInput{ diff --git a/weed/replication/sub/notification_gocdk_pub_sub.go b/weed/replication/sub/notification_gocdk_pub_sub.go index 9726096e5..09a96d5d5 100644 --- a/weed/replication/sub/notification_gocdk_pub_sub.go +++ b/weed/replication/sub/notification_gocdk_pub_sub.go @@ -38,13 +38,24 @@ func (k *GoCDKPubSubInput) Initialize(configuration util.Configuration, prefix s return nil } -func (k *GoCDKPubSubInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) { +func (k *GoCDKPubSubInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error) { msg, err := k.sub.Receive(context.Background()) + if err != nil { + return + } + onSuccessFn = func() { + msg.Ack() + } + onFailureFn = func() { + if msg.Nackable() { + msg.Nack() + } + } key = msg.Metadata["key"] message = &filer_pb.EventNotification{} err = proto.Unmarshal(msg.Body, message) if err != nil { - return "", nil, err + return "", nil, onSuccessFn, onFailureFn, err } - return key, message, nil + return key, message, onSuccessFn, onFailureFn, nil } diff --git a/weed/replication/sub/notification_google_pub_sub.go b/weed/replication/sub/notification_google_pub_sub.go index a950bb42b..f7c767d4a 100644 --- a/weed/replication/sub/notification_google_pub_sub.go +++ b/weed/replication/sub/notification_google_pub_sub.go @@ -85,16 +85,22 @@ func (k *GooglePubSubInput) initialize(google_application_credentials, projectId go k.sub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) { k.messageChan <- m - m.Ack() }) return err } -func (k *GooglePubSubInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) { +func (k *GooglePubSubInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error) { m := <-k.messageChan + onSuccessFn = func() { + m.Ack() + } + onFailureFn = func() { + m.Nack() + } + // process the message key = m.Attributes["key"] message = &filer_pb.EventNotification{} diff --git a/weed/replication/sub/notification_kafka.go b/weed/replication/sub/notification_kafka.go index fa9cfad9b..622a759ea 100644 --- a/weed/replication/sub/notification_kafka.go +++ b/weed/replication/sub/notification_kafka.go @@ -97,7 +97,7 @@ func (k *KafkaInput) initialize(hosts []string, topic string, offsetFile string, return nil } -func (k *KafkaInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) { +func (k *KafkaInput) ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error) { msg := <-k.messageChan diff --git a/weed/replication/sub/notifications.go b/weed/replication/sub/notifications.go index 8a2668f98..d5a910db9 100644 --- a/weed/replication/sub/notifications.go +++ b/weed/replication/sub/notifications.go @@ -10,7 +10,7 @@ type NotificationInput interface { GetName() string // Initialize initializes the file store Initialize(configuration util.Configuration, prefix string) error - ReceiveMessage() (key string, message *filer_pb.EventNotification, err error) + ReceiveMessage() (key string, message *filer_pb.EventNotification, onSuccessFn func(), onFailureFn func(), err error) } var ( diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index 9b9abd8eb..c737133f5 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -6,6 +6,7 @@ import ( "fmt" "github.com/chrislusf/seaweedfs/weed/storage/needle" "io" + "path/filepath" "sort" "github.com/chrislusf/seaweedfs/weed/operation" @@ -19,6 +20,7 @@ func init() { } type commandVolumeFixReplication struct { + collectionPattern *string } func (c *commandVolumeFixReplication) Name() string { @@ -33,8 +35,9 @@ func (c *commandVolumeFixReplication) Help() string { This command also finds all under-replicated volumes, and finds volume servers with free slots. If the free slots satisfy the replication requirement, the volume content is copied over and mounted. - volume.fix.replication -n # do not take action - volume.fix.replication # actually deleting or copying the volume files and mount the volume + volume.fix.replication -n # do not take action + volume.fix.replication # actually deleting or copying the volume files and mount the volume + volume.fix.replication -collectionPattern=important* # fix any collections with prefix "important" Note: * each time this will only add back one replica for one volume id. If there are multiple replicas @@ -52,6 +55,7 @@ func (c *commandVolumeFixReplication) Do(args []string, commandEnv *CommandEnv, } volFixReplicationCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + c.collectionPattern = volFixReplicationCommand.String("collectionPattern", "", "match with wildcard characters '*' and '?'") skipChange := volFixReplicationCommand.Bool("n", false, "skip the changes") if err = volFixReplicationCommand.Parse(args); err != nil { return nil @@ -127,6 +131,17 @@ func (c *commandVolumeFixReplication) fixOverReplicatedVolumes(commandEnv *Comma replica := pickOneReplicaToDelete(replicas, replicaPlacement) + // check collection name pattern + if *c.collectionPattern != "" { + matched, err := filepath.Match(*c.collectionPattern, replica.info.Collection) + if err != nil { + return fmt.Errorf("match pattern %s with collection %s: %v", *c.collectionPattern, replica.info.Collection, err) + } + if !matched { + break + } + } + fmt.Fprintf(writer, "deleting volume %d from %s ...\n", replica.info.Id, replica.location.dataNode.Id) if !takeAction { @@ -150,6 +165,17 @@ func (c *commandVolumeFixReplication) fixUnderReplicatedVolumes(commandEnv *Comm for _, dst := range allLocations { // check whether data nodes satisfy the constraints if dst.dataNode.FreeVolumeCount > 0 && satisfyReplicaPlacement(replicaPlacement, replicas, dst) { + // check collection name pattern + if *c.collectionPattern != "" { + matched, err := filepath.Match(*c.collectionPattern, replica.info.Collection) + if err != nil { + return fmt.Errorf("match pattern %s with collection %s: %v", *c.collectionPattern, replica.info.Collection, err) + } + if !matched { + break + } + } + // ask the volume server to replicate the volume foundNewLocation = true fmt.Fprintf(writer, "replicating volume %d %s from %s to dataNode %s ...\n", replica.info.Id, replicaPlacement, replica.location.dataNode.Id, dst.dataNode.Id) |
