aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-08-04 16:25:46 -0700
committerChris Lu <chris.lu@gmail.com>2021-08-04 16:25:46 -0700
commit6b743dbbf96f863e70ee80e4b32c0928f594891a (patch)
tree0e3551b5c411ad2131a1b5467318fd973bee71b7
parentb9ecf1e3a8685c62ccac80ed0fbc180ed34b48e2 (diff)
downloadseaweedfs-6b743dbbf96f863e70ee80e4b32c0928f594891a.tar.xz
seaweedfs-6b743dbbf96f863e70ee80e4b32c0928f594891a.zip
refactor client subscribe metadata
-rw-r--r--unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go35
-rw-r--r--weed/command/filer_backup.go51
-rw-r--r--weed/command/filer_meta_backup.go48
-rw-r--r--weed/command/filer_meta_tail.go36
-rw-r--r--weed/command/filer_sync.go49
-rw-r--r--weed/filer/filer_remote_storage.go41
-rw-r--r--weed/filesys/meta_cache/meta_cache_subscribe.go41
-rw-r--r--weed/pb/filer_pb_tail.go94
-rw-r--r--weed/remote_storage/mount_mapping.go46
-rw-r--r--weed/s3api/auth_credentials_subscribe.go41
-rw-r--r--weed/shell/command_remote_mount.go5
11 files changed, 174 insertions, 313 deletions
diff --git a/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go b/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go
index 53cb2f912..c521ce33e 100644
--- a/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go
+++ b/unmaintained/load_test/load_test_meta_tail/load_test_meta_tail.go
@@ -1,14 +1,12 @@
package main
import (
- "context"
"flag"
"fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"google.golang.org/grpc"
- "io"
"strconv"
"time"
)
@@ -74,38 +72,9 @@ func startGenerateMetadata() {
func startSubscribeMetadata(eachEntryFunc func(event *filer_pb.SubscribeMetadataResponse) error) {
- lastTsNs := int64(0)
+ tailErr := pb.FollowMetadata(*tailFiler, grpc.WithInsecure(), "tail",
+ *dir, 0, 0, eachEntryFunc, false)
- tailErr := pb.WithFilerClient(*tailFiler, grpc.WithInsecure(), func(client filer_pb.SeaweedFilerClient) error {
-
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
- ClientName: "tail",
- PathPrefix: *dir,
- SinceNs: lastTsNs,
- })
- if err != nil {
- return fmt.Errorf("listen: %v", err)
- }
-
- for {
- resp, listenErr := stream.Recv()
- if listenErr == io.EOF {
- return nil
- }
- if listenErr != nil {
- return listenErr
- }
- if err = eachEntryFunc(resp); err != nil {
- glog.V(0).Infof("tail last record:%+v", time.Unix(0, lastTsNs))
- return err
- }
- lastTsNs = resp.TsNs
- }
-
- })
if tailErr != nil {
fmt.Printf("tail %s: %v\n", *tailFiler, tailErr)
}
diff --git a/weed/command/filer_backup.go b/weed/command/filer_backup.go
index fc4dd8298..2828ccb39 100644
--- a/weed/command/filer_backup.go
+++ b/weed/command/filer_backup.go
@@ -1,16 +1,13 @@
package command
import (
- "context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/replication/source"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
"google.golang.org/grpc"
- "io"
"time"
)
@@ -110,48 +107,12 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti
processEventFn := genProcessFunction(sourcePath, targetPath, dataSink, debug)
- return pb.WithFilerClient(sourceFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
-
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
- ClientName: "backup_" + dataSink.GetName(),
- PathPrefix: sourcePath,
- SinceNs: startFrom.UnixNano(),
- })
- if err != nil {
- return fmt.Errorf("listen: %v", err)
- }
-
- var counter int64
- var lastWriteTime time.Time
- for {
- resp, listenErr := stream.Recv()
-
- if listenErr == io.EOF {
- return nil
- }
- if listenErr != nil {
- return listenErr
- }
-
- if err := processEventFn(resp); err != nil {
- return fmt.Errorf("processEventFn: %v", err)
- }
-
- counter++
- if lastWriteTime.Add(3 * time.Second).Before(time.Now()) {
- glog.V(0).Infof("backup %s progressed to %v %0.2f/sec", sourceFiler, time.Unix(0, resp.TsNs), float64(counter)/float64(3))
- counter = 0
- lastWriteTime = time.Now()
- if err := setOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId), resp.TsNs); err != nil {
- return fmt.Errorf("setOffset: %v", err)
- }
- }
-
- }
-
+ processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3 * time.Second, func(counter int64, lastTsNs int64) error {
+ glog.V(0).Infof("backup %s progressed to %v %0.2f/sec", sourceFiler, time.Unix(0, lastTsNs), float64(counter)/float64(3))
+ return setOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId), lastTsNs)
})
+ return pb.FollowMetadata(sourceFiler, grpcDialOption, "backup_" + dataSink.GetName(),
+ sourcePath, startFrom.UnixNano(), 0, processEventFnWithOffset, false)
+
}
diff --git a/weed/command/filer_meta_backup.go b/weed/command/filer_meta_backup.go
index 28bd367e7..108e76566 100644
--- a/weed/command/filer_meta_backup.go
+++ b/weed/command/filer_meta_backup.go
@@ -7,7 +7,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/spf13/viper"
"google.golang.org/grpc"
- "io"
"reflect"
"time"
@@ -190,48 +189,15 @@ func (metaBackup *FilerMetaBackupOptions) streamMetadataBackup() error {
return nil
}
- tailErr := pb.WithFilerClient(*metaBackup.filerAddress, metaBackup.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
-
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
- ClientName: "meta_backup",
- PathPrefix: *metaBackup.filerDirectory,
- SinceNs: startTime.UnixNano(),
- })
- if err != nil {
- return fmt.Errorf("listen: %v", err)
- }
-
- var counter int64
- var lastWriteTime time.Time
- for {
- resp, listenErr := stream.Recv()
- if listenErr == io.EOF {
- return nil
- }
- if listenErr != nil {
- return listenErr
- }
- if err = eachEntryFunc(resp); err != nil {
- return err
- }
-
- counter++
- if lastWriteTime.Add(3 * time.Second).Before(time.Now()) {
- glog.V(0).Infof("meta backup %s progressed to %v %0.2f/sec", *metaBackup.filerAddress, time.Unix(0, resp.TsNs), float64(counter)/float64(3))
- counter = 0
- lastWriteTime = time.Now()
- if err2 := metaBackup.setOffset(lastWriteTime); err2 != nil {
- return err2
- }
- }
+ processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3 * time.Second, func(counter int64, lastTsNs int64) error {
+ lastTime := time.Unix(0, lastTsNs)
+ glog.V(0).Infof("meta backup %s progressed to %v %0.2f/sec", *metaBackup.filerAddress, lastTime, float64(counter)/float64(3))
+ return metaBackup.setOffset(lastTime)
+ })
- }
+ return pb.FollowMetadata(*metaBackup.filerAddress, metaBackup.grpcDialOption, "meta_backup",
+ *metaBackup.filerDirectory, startTime.UnixNano(), 0, processEventFnWithOffset, false)
- })
- return tailErr
}
func (metaBackup *FilerMetaBackupOptions) getOffset() (lastWriteTime time.Time, err error) {
diff --git a/weed/command/filer_meta_tail.go b/weed/command/filer_meta_tail.go
index 76699bb5e..28c0db99b 100644
--- a/weed/command/filer_meta_tail.go
+++ b/weed/command/filer_meta_tail.go
@@ -3,16 +3,15 @@ package command
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/golang/protobuf/jsonpb"
jsoniter "github.com/json-iterator/go"
"github.com/olivere/elastic/v7"
- "io"
"os"
"path/filepath"
"strings"
"time"
- "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
@@ -104,37 +103,18 @@ func runFilerMetaTail(cmd *Command, args []string) bool {
}
}
- tailErr := pb.WithFilerClient(*tailFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
-
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
- ClientName: "tail",
- PathPrefix: *tailTarget,
- SinceNs: time.Now().Add(-*tailStart).UnixNano(),
- })
- if err != nil {
- return fmt.Errorf("listen: %v", err)
- }
-
- for {
- resp, listenErr := stream.Recv()
- if listenErr == io.EOF {
- return nil
- }
- if listenErr != nil {
- return listenErr
- }
+ tailErr := pb.FollowMetadata(*tailFiler, grpcDialOption, "tail",
+ *tailTarget, time.Now().Add(-*tailStart).UnixNano(), 0,
+ func(resp *filer_pb.SubscribeMetadataResponse) error {
if !shouldPrint(resp) {
- continue
+ return nil
}
- if err = eachEntryFunc(resp); err != nil {
+ if err := eachEntryFunc(resp); err != nil {
return err
}
- }
+ return nil
+ }, false)
- })
if tailErr != nil {
fmt.Printf("tail %s: %v\n", *tailFiler, tailErr)
}
diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go
index 7cfc8a7fe..a20f17201 100644
--- a/weed/command/filer_sync.go
+++ b/weed/command/filer_sync.go
@@ -15,7 +15,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/util/grace"
"google.golang.org/grpc"
- "io"
"strings"
"time"
)
@@ -166,50 +165,14 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so
return persistEventFn(resp)
}
- return pb.WithFilerClient(sourceFiler, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
-
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
- ClientName: "syncTo_" + targetFiler,
- PathPrefix: sourcePath,
- SinceNs: sourceFilerOffsetTsNs,
- Signature: targetFilerSignature,
- })
- if err != nil {
- return fmt.Errorf("listen: %v", err)
- }
-
- var counter int64
- var lastWriteTime time.Time
- for {
- resp, listenErr := stream.Recv()
- if listenErr == io.EOF {
- return nil
- }
- if listenErr != nil {
- return listenErr
- }
-
- if err := processEventFn(resp); err != nil {
- return err
- }
-
- counter++
- if lastWriteTime.Add(3 * time.Second).Before(time.Now()) {
- glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, resp.TsNs), float64(counter)/float64(3))
- counter = 0
- lastWriteTime = time.Now()
- if err := setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, resp.TsNs); err != nil {
- return err
- }
- }
-
- }
-
+ processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3 * time.Second, func(counter int64, lastTsNs int64) error {
+ glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, lastTsNs), float64(counter)/float64(3))
+ return setOffset(grpcDialOption, targetFiler, SyncKeyPrefix, sourceFilerSignature, lastTsNs)
})
+ return pb.FollowMetadata(sourceFiler, grpcDialOption, "syncTo_" + targetFiler,
+ sourcePath, sourceFilerOffsetTsNs, targetFilerSignature, processEventFnWithOffset, false)
+
}
const (
diff --git a/weed/filer/filer_remote_storage.go b/weed/filer/filer_remote_storage.go
index a859ad34b..b1ee96a42 100644
--- a/weed/filer/filer_remote_storage.go
+++ b/weed/filer/filer_remote_storage.go
@@ -3,10 +3,12 @@ package filer
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/remote_storage"
_ "github.com/chrislusf/seaweedfs/weed/remote_storage/s3"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/golang/protobuf/proto"
+ "google.golang.org/grpc"
"math"
"strings"
@@ -141,4 +143,41 @@ func AddRemoteStorageMapping(oldContent []byte, dir string, storageLocation *fil
}
return
-} \ No newline at end of file
+}
+
+
+func ReadMountMappings(grpcDialOption grpc.DialOption, filerAddress string) (mappings *filer_pb.RemoteStorageMapping, readErr error) {
+ var oldContent []byte
+ if readErr = pb.WithFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ oldContent, readErr = ReadInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE)
+ return readErr
+ }); readErr != nil {
+ return nil, readErr
+ }
+
+ mappings, readErr = UnmarshalRemoteStorageMappings(oldContent)
+ if readErr != nil {
+ return nil, fmt.Errorf("unmarshal mappings: %v", readErr)
+ }
+
+ return
+}
+
+func ReadRemoteStorageConf(grpcDialOption grpc.DialOption, filerAddress string, storageName string) (conf *filer_pb.RemoteConf, readErr error) {
+ var oldContent []byte
+ if readErr = pb.WithFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ oldContent, readErr = ReadInsideFiler(client, DirectoryEtcRemote, storageName+REMOTE_STORAGE_CONF_SUFFIX)
+ return readErr
+ }); readErr != nil {
+ return nil, readErr
+ }
+
+ // unmarshal storage configuration
+ conf = &filer_pb.RemoteConf{}
+ if unMarshalErr := proto.Unmarshal(oldContent, conf); unMarshalErr != nil {
+ readErr = fmt.Errorf("unmarshal %s/%s: %v", DirectoryEtcRemote, storageName+REMOTE_STORAGE_CONF_SUFFIX, unMarshalErr)
+ return
+ }
+
+ return
+}
diff --git a/weed/filesys/meta_cache/meta_cache_subscribe.go b/weed/filesys/meta_cache/meta_cache_subscribe.go
index 747ac3cb9..c650b8024 100644
--- a/weed/filesys/meta_cache/meta_cache_subscribe.go
+++ b/weed/filesys/meta_cache/meta_cache_subscribe.go
@@ -2,12 +2,9 @@ package meta_cache
import (
"context"
- "fmt"
- "io"
- "time"
-
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -62,38 +59,8 @@ func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.Fil
}
- for {
- err := client.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
- ClientName: "mount",
- PathPrefix: dir,
- SinceNs: lastTsNs,
- Signature: selfSignature,
- })
- if err != nil {
- return fmt.Errorf("subscribe: %v", err)
- }
-
- for {
- resp, listenErr := stream.Recv()
- if listenErr == io.EOF {
- return nil
- }
- if listenErr != nil {
- return listenErr
- }
+ return util.Retry("followMetaUpdates", func() error {
+ return pb.WithFilerClientFollowMetadata(client, "mount", dir, lastTsNs, selfSignature, processEventFn, true)
+ })
- if err := processEventFn(resp); err != nil {
- glog.Fatalf("process %v: %v", resp, err)
- }
- lastTsNs = resp.TsNs
- }
- })
- if err != nil {
- glog.Errorf("subscribing filer meta change: %v", err)
- }
- time.Sleep(time.Second)
- }
}
diff --git a/weed/pb/filer_pb_tail.go b/weed/pb/filer_pb_tail.go
new file mode 100644
index 000000000..31fb62fb3
--- /dev/null
+++ b/weed/pb/filer_pb_tail.go
@@ -0,0 +1,94 @@
+package pb
+
+import (
+ "context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "google.golang.org/grpc"
+ "io"
+ "time"
+)
+
+type ProcessMetadataFunc func(resp *filer_pb.SubscribeMetadataResponse) error
+
+func FollowMetadata(filerAddress string, grpcDialOption grpc.DialOption,
+ clientName string, pathPrefix string, lastTsNs int64, selfSignature int32,
+ processEventFn ProcessMetadataFunc, fatalOnError bool) error {
+
+ err := WithFilerClient(filerAddress, grpcDialOption, makeFunc(
+ clientName, pathPrefix, lastTsNs, selfSignature, processEventFn, fatalOnError))
+ if err != nil {
+ return fmt.Errorf("subscribing filer meta change: %v", err)
+ }
+ return err
+}
+
+func WithFilerClientFollowMetadata(filerClient filer_pb.FilerClient,
+ clientName string, pathPrefix string, lastTsNs int64, selfSignature int32,
+ processEventFn ProcessMetadataFunc, fatalOnError bool) error {
+
+ err := filerClient.WithFilerClient(makeFunc(
+ clientName, pathPrefix, lastTsNs, selfSignature, processEventFn, fatalOnError))
+ if err != nil {
+ return fmt.Errorf("subscribing filer meta change: %v", err)
+ }
+
+ return nil
+}
+
+func makeFunc(clientName string, pathPrefix string, lastTsNs int64, selfSignature int32,
+ processEventFn ProcessMetadataFunc, fatalOnError bool) func(client filer_pb.SeaweedFilerClient) error {
+ return func(client filer_pb.SeaweedFilerClient) error {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
+ ClientName: clientName,
+ PathPrefix: pathPrefix,
+ SinceNs: lastTsNs,
+ Signature: selfSignature,
+ })
+ if err != nil {
+ return fmt.Errorf("subscribe: %v", err)
+ }
+
+ for {
+ resp, listenErr := stream.Recv()
+ if listenErr == io.EOF {
+ return nil
+ }
+ if listenErr != nil {
+ return listenErr
+ }
+
+ if err := processEventFn(resp); err != nil {
+ if fatalOnError {
+ glog.Fatalf("process %v: %v", resp, err)
+ } else {
+ glog.Errorf("process %v: %v", resp, err)
+ }
+ }
+ lastTsNs = resp.TsNs
+ }
+ }
+}
+
+func AddOffsetFunc(processEventFn ProcessMetadataFunc, offsetInterval time.Duration, offsetFunc func(counter int64, offset int64) error) ProcessMetadataFunc {
+ var counter int64
+ var lastWriteTime time.Time
+ return func(resp *filer_pb.SubscribeMetadataResponse) error {
+ if err := processEventFn(resp); err != nil {
+ return err
+ }
+ counter++
+ if lastWriteTime.Add(offsetInterval).Before(time.Now()) {
+ counter = 0
+ lastWriteTime = time.Now()
+ if err := offsetFunc(counter, resp.TsNs); err != nil {
+ return err
+ }
+ }
+ return nil
+ }
+
+}
diff --git a/weed/remote_storage/mount_mapping.go b/weed/remote_storage/mount_mapping.go
deleted file mode 100644
index 767de5bed..000000000
--- a/weed/remote_storage/mount_mapping.go
+++ /dev/null
@@ -1,46 +0,0 @@
-package remote_storage
-
-import (
- "fmt"
- "github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/chrislusf/seaweedfs/weed/pb"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/golang/protobuf/proto"
- "google.golang.org/grpc"
-)
-
-func ReadMountMappings(grpcDialOption grpc.DialOption, filerAddress string) (mappings *filer_pb.RemoteStorageMapping, readErr error) {
- var oldContent []byte
- if readErr = pb.WithFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
- oldContent, readErr = filer.ReadInsideFiler(client, filer.DirectoryEtcRemote, filer.REMOTE_STORAGE_MOUNT_FILE)
- return readErr
- }); readErr != nil {
- return nil, readErr
- }
-
- mappings, readErr = filer.UnmarshalRemoteStorageMappings(oldContent)
- if readErr != nil {
- return nil, fmt.Errorf("unmarshal mappings: %v", readErr)
- }
-
- return
-}
-
-func ReadRemoteStorageConf(grpcDialOption grpc.DialOption, filerAddress string, storageName string) (conf *filer_pb.RemoteConf, readErr error) {
- var oldContent []byte
- if readErr = pb.WithFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
- oldContent, readErr = filer.ReadInsideFiler(client, filer.DirectoryEtcRemote, storageName+filer.REMOTE_STORAGE_CONF_SUFFIX)
- return readErr
- }); readErr != nil {
- return nil, readErr
- }
-
- // unmarshal storage configuration
- conf = &filer_pb.RemoteConf{}
- if unMarshalErr := proto.Unmarshal(oldContent, conf); unMarshalErr != nil {
- readErr = fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, storageName+filer.REMOTE_STORAGE_CONF_SUFFIX, unMarshalErr)
- return
- }
-
- return
-}
diff --git a/weed/s3api/auth_credentials_subscribe.go b/weed/s3api/auth_credentials_subscribe.go
index ea4b69550..05cce632a 100644
--- a/weed/s3api/auth_credentials_subscribe.go
+++ b/weed/s3api/auth_credentials_subscribe.go
@@ -1,13 +1,11 @@
package s3api
import (
- "context"
- "fmt"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "io"
- "time"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
func (s3a *S3ApiServer) subscribeMetaEvents(clientName string, prefix string, lastTsNs int64) error {
@@ -34,37 +32,8 @@ func (s3a *S3ApiServer) subscribeMetaEvents(clientName string, prefix string, la
return nil
}
- for {
- err := s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
- ClientName: clientName,
- PathPrefix: prefix,
- SinceNs: lastTsNs,
- })
- if err != nil {
- return fmt.Errorf("subscribe: %v", err)
- }
-
- for {
- resp, listenErr := stream.Recv()
- if listenErr == io.EOF {
- return nil
- }
- if listenErr != nil {
- return listenErr
- }
+ return util.Retry("followIamChanges", func() error {
+ return pb.WithFilerClientFollowMetadata(s3a, clientName, prefix, lastTsNs, 0, processEventFn, true)
+ })
- if err := processEventFn(resp); err != nil {
- glog.Fatalf("process %v: %v", resp, err)
- }
- lastTsNs = resp.TsNs
- }
- })
- if err != nil {
- glog.Errorf("subscribing filer meta change: %v", err)
- }
- time.Sleep(time.Second)
- }
}
diff --git a/weed/shell/command_remote_mount.go b/weed/shell/command_remote_mount.go
index 35aad9498..73a5119d5 100644
--- a/weed/shell/command_remote_mount.go
+++ b/weed/shell/command_remote_mount.go
@@ -9,7 +9,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/remote_storage"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/golang/protobuf/jsonpb"
- "github.com/golang/protobuf/proto"
"io"
)
@@ -79,7 +78,7 @@ func (c *commandRemoteMount) Do(args []string, commandEnv *CommandEnv, writer io
func (c *commandRemoteMount) listExistingRemoteStorageMounts(commandEnv *CommandEnv, writer io.Writer) (err error) {
// read current mapping
- mappings, readErr := remote_storage.ReadMountMappings(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress)
+ mappings, readErr := filer.ReadMountMappings(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress)
if readErr != nil {
return readErr
}
@@ -95,7 +94,7 @@ func (c *commandRemoteMount) listExistingRemoteStorageMounts(commandEnv *Command
func (c *commandRemoteMount) findRemoteStorageConfiguration(commandEnv *CommandEnv, writer io.Writer, remote *filer_pb.RemoteStorageLocation) (conf *filer_pb.RemoteConf, err error) {
- return remote_storage.ReadRemoteStorageConf(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress, remote.Name)
+ return filer.ReadRemoteStorageConf(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress, remote.Name)
}