aboutsummaryrefslogtreecommitdiff
path: root/weed/filer
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2021-12-26 00:15:03 -0800
committerchrislu <chris.lu@gmail.com>2021-12-26 00:15:03 -0800
commit9f9ef1340c6441c10c15e2642b5074d34fe40332 (patch)
tree1e897171c804e63ba6edef4778ea8b243f2ad8d6 /weed/filer
parentc935b9669e6b18a07c28939b1bd839552e7d2cf5 (diff)
downloadseaweedfs-9f9ef1340c6441c10c15e2642b5074d34fe40332.tar.xz
seaweedfs-9f9ef1340c6441c10c15e2642b5074d34fe40332.zip
use streaming mode for long poll grpc calls
streaming mode would create separate grpc connections for each call. this is to ensure the long poll connections are properly closed.
Diffstat (limited to 'weed/filer')
-rw-r--r--weed/filer/filer.go4
-rw-r--r--weed/filer/filer_conf.go2
-rw-r--r--weed/filer/filer_delete_entry.go2
-rw-r--r--weed/filer/meta_aggregator.go6
-rw-r--r--weed/filer/read_remote.go2
-rw-r--r--weed/filer/reader_at.go2
-rw-r--r--weed/filer/remote_mapping.go10
-rw-r--r--weed/filer/remote_storage.go2
8 files changed, 15 insertions, 15 deletions
diff --git a/weed/filer/filer.go b/weed/filer/filer.go
index 9eabbc337..7ca198b38 100644
--- a/weed/filer/filer.go
+++ b/weed/filer/filer.go
@@ -79,9 +79,9 @@ func (f *Filer) AggregateFromPeers(self pb.ServerAddress) {
}
-func (f *Filer) ListExistingPeerUpdates() (existingNodes []*master_pb.ClusterNodeUpdate){
+func (f *Filer) ListExistingPeerUpdates() (existingNodes []*master_pb.ClusterNodeUpdate) {
- if grpcErr := pb.WithMasterClient(f.MasterClient.GetMaster(), f.GrpcDialOption, func(client master_pb.SeaweedClient) error {
+ if grpcErr := pb.WithMasterClient(false, f.MasterClient.GetMaster(), f.GrpcDialOption, func(client master_pb.SeaweedClient) error {
resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
ClientType: cluster.FilerType,
})
diff --git a/weed/filer/filer_conf.go b/weed/filer/filer_conf.go
index 1229f0139..45c368b9b 100644
--- a/weed/filer/filer_conf.go
+++ b/weed/filer/filer_conf.go
@@ -32,7 +32,7 @@ type FilerConf struct {
func ReadFilerConf(filerGrpcAddress pb.ServerAddress, grpcDialOption grpc.DialOption, masterClient *wdclient.MasterClient) (*FilerConf, error) {
var buf bytes.Buffer
- if err := pb.WithGrpcFilerClient(filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ if err := pb.WithGrpcFilerClient(false, filerGrpcAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
if masterClient != nil {
return ReadEntry(masterClient, client, DirectoryEtcSeaweedFS, FilerConfName, &buf)
} else {
diff --git a/weed/filer/filer_delete_entry.go b/weed/filer/filer_delete_entry.go
index cc33811aa..bda69b15f 100644
--- a/weed/filer/filer_delete_entry.go
+++ b/weed/filer/filer_delete_entry.go
@@ -139,7 +139,7 @@ func (f *Filer) doDeleteEntryMetaAndData(ctx context.Context, entry *Entry, shou
func (f *Filer) doDeleteCollection(collectionName string) (err error) {
- return f.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ return f.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
_, err := client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{
Name: collectionName,
})
diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go
index bb2c947e5..282668146 100644
--- a/weed/filer/meta_aggregator.go
+++ b/weed/filer/meta_aggregator.go
@@ -72,7 +72,7 @@ func (ma *MetaAggregator) setActive(address pb.ServerAddress, isActive bool) {
delete(ma.peerStatues, address)
}
}
-func (ma *MetaAggregator) isActive(address pb.ServerAddress)(isActive bool) {
+func (ma *MetaAggregator) isActive(address pb.ServerAddress) (isActive bool) {
ma.peerStatuesLock.Lock()
defer ma.peerStatuesLock.Unlock()
_, isActive = ma.peerStatues[address]
@@ -152,7 +152,7 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self pb.ServerAddress, p
for {
glog.V(4).Infof("subscribing remote %s meta change: %v", peer, time.Unix(0, lastTsNs))
- err := pb.WithFilerClient(peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ err := pb.WithFilerClient(false, peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := client.SubscribeLocalMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
@@ -194,7 +194,7 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self pb.ServerAddress, p
}
func (ma *MetaAggregator) readFilerStoreSignature(peer pb.ServerAddress) (sig int32, err error) {
- err = pb.WithFilerClient(peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ err = pb.WithFilerClient(false, peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return err
diff --git a/weed/filer/read_remote.go b/weed/filer/read_remote.go
index a3fb48ae0..6372dac72 100644
--- a/weed/filer/read_remote.go
+++ b/weed/filer/read_remote.go
@@ -26,7 +26,7 @@ func MapRemoteStorageLocationPathToFullPath(localMountedDir util.FullPath, remot
}
func CacheRemoteObjectToLocalCluster(filerClient filer_pb.FilerClient, remoteConf *remote_pb.RemoteConf, remoteLocation *remote_pb.RemoteStorageLocation, parent util.FullPath, entry *filer_pb.Entry) error {
- return filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ return filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
_, err := client.CacheRemoteObjectToLocalCluster(context.Background(), &filer_pb.CacheRemoteObjectToLocalClusterRequest{
Directory: string(parent),
Name: entry.Name,
diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go
index 68594cb03..b73526761 100644
--- a/weed/filer/reader_at.go
+++ b/weed/filer/reader_at.go
@@ -44,7 +44,7 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp
if !found {
util.Retry("lookup volume "+vid, func() error {
- err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
VolumeIds: []string{vid},
})
diff --git a/weed/filer/remote_mapping.go b/weed/filer/remote_mapping.go
index c95c4e5bd..b0534e2ca 100644
--- a/weed/filer/remote_mapping.go
+++ b/weed/filer/remote_mapping.go
@@ -11,7 +11,7 @@ import (
func ReadMountMappings(grpcDialOption grpc.DialOption, filerAddress pb.ServerAddress) (mappings *remote_pb.RemoteStorageMapping, readErr error) {
var oldContent []byte
- if readErr = pb.WithFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ if readErr = pb.WithFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
oldContent, readErr = ReadInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE)
return readErr
}); readErr != nil {
@@ -30,7 +30,7 @@ func InsertMountMapping(filerClient filer_pb.FilerClient, dir string, remoteStor
// read current mapping
var oldContent, newContent []byte
- err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
oldContent, err = ReadInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE)
return err
})
@@ -47,7 +47,7 @@ func InsertMountMapping(filerClient filer_pb.FilerClient, dir string, remoteStor
}
// save back
- err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
return SaveInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE, newContent)
})
if err != nil {
@@ -61,7 +61,7 @@ func DeleteMountMapping(filerClient filer_pb.FilerClient, dir string) (err error
// read current mapping
var oldContent, newContent []byte
- err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
oldContent, err = ReadInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE)
return err
})
@@ -78,7 +78,7 @@ func DeleteMountMapping(filerClient filer_pb.FilerClient, dir string) (err error
}
// save back
- err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
return SaveInsideFiler(client, DirectoryEtcRemote, REMOTE_STORAGE_MOUNT_FILE, newContent)
})
if err != nil {
diff --git a/weed/filer/remote_storage.go b/weed/filer/remote_storage.go
index 9d682b698..5362ba738 100644
--- a/weed/filer/remote_storage.go
+++ b/weed/filer/remote_storage.go
@@ -133,7 +133,7 @@ func UnmarshalRemoteStorageMappings(oldContent []byte) (mappings *remote_pb.Remo
func ReadRemoteStorageConf(grpcDialOption grpc.DialOption, filerAddress pb.ServerAddress, storageName string) (conf *remote_pb.RemoteConf, readErr error) {
var oldContent []byte
- if readErr = pb.WithFilerClient(filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ if readErr = pb.WithFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
oldContent, readErr = ReadInsideFiler(client, DirectoryEtcRemote, storageName+REMOTE_STORAGE_CONF_SUFFIX)
return readErr
}); readErr != nil {