diff options
| author | Chris Lu <chris.lu@gmail.com> | 2020-02-25 21:50:12 -0800 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2020-02-25 21:50:12 -0800 |
| commit | 892e726eb9c2427634c46f8ae9b7bcf0b6d1b082 (patch) | |
| tree | 3bf821356579902219633c6f6d42739deb1edd2d /weed/operation | |
| parent | bd3254b53f78b8f42e31ea50cbf2e0d7e87b2bbc (diff) | |
| download | seaweedfs-892e726eb9c2427634c46f8ae9b7bcf0b6d1b082.tar.xz seaweedfs-892e726eb9c2427634c46f8ae9b7bcf0b6d1b082.zip | |
avoid reusing context object
fix https://github.com/chrislusf/seaweedfs/issues/1182
Diffstat (limited to 'weed/operation')
| -rw-r--r-- | weed/operation/assign_file_id.go | 2 | ||||
| -rw-r--r-- | weed/operation/delete_content.go | 5 | ||||
| -rw-r--r-- | weed/operation/grpc_client.go | 25 | ||||
| -rw-r--r-- | weed/operation/lookup.go | 4 | ||||
| -rw-r--r-- | weed/operation/stats.go | 4 | ||||
| -rw-r--r-- | weed/operation/sync_volume.go | 4 | ||||
| -rw-r--r-- | weed/operation/tail_volume.go | 7 |
7 files changed, 23 insertions, 28 deletions
diff --git a/weed/operation/assign_file_id.go b/weed/operation/assign_file_id.go index 6da30605b..893bf516c 100644 --- a/weed/operation/assign_file_id.go +++ b/weed/operation/assign_file_id.go @@ -46,7 +46,7 @@ func Assign(server string, grpcDialOption grpc.DialOption, primaryRequest *Volum continue } - lastError = WithMasterServerClient(server, grpcDialOption, func(ctx context.Context, masterClient master_pb.SeaweedClient) error { + lastError = WithMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error { req := &master_pb.AssignRequest{ Count: primaryRequest.Count, diff --git a/weed/operation/delete_content.go b/weed/operation/delete_content.go index 95bbde9f9..361c09e7e 100644 --- a/weed/operation/delete_content.go +++ b/weed/operation/delete_content.go @@ -10,7 +10,6 @@ import ( "google.golang.org/grpc" - "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" ) @@ -109,15 +108,13 @@ func DeleteFilesWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []str ret = append(ret, result...) } - glog.V(1).Infof("deleted %d items", len(ret)) - return ret, err } // DeleteFilesAtOneVolumeServer deletes a list of files that is on one volume server via gRpc func DeleteFilesAtOneVolumeServer(volumeServer string, grpcDialOption grpc.DialOption, fileIds []string) (ret []*volume_server_pb.DeleteResult, err error) { - err = WithVolumeServerClient(volumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error { + err = WithVolumeServerClient(volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { req := &volume_server_pb.BatchDeleteRequest{ FileIds: fileIds, diff --git a/weed/operation/grpc_client.go b/weed/operation/grpc_client.go index e7ee2d2ba..7eed66503 100644 --- a/weed/operation/grpc_client.go +++ b/weed/operation/grpc_client.go @@ -1,29 +1,28 @@ package operation import ( - "context" "fmt" + "strconv" + "strings" + + "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/util" - "google.golang.org/grpc" - "strconv" - "strings" ) -func WithVolumeServerClient(volumeServer string, grpcDialOption grpc.DialOption, fn func(context.Context, volume_server_pb.VolumeServerClient) error) error { - - ctx := context.Background() +func WithVolumeServerClient(volumeServer string, grpcDialOption grpc.DialOption, fn func(volume_server_pb.VolumeServerClient) error) error { grpcAddress, err := toVolumeServerGrpcAddress(volumeServer) if err != nil { return err } - return util.WithCachedGrpcClient(ctx, func(ctx2 context.Context, grpcConnection *grpc.ClientConn) error { + return util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { client := volume_server_pb.NewVolumeServerClient(grpcConnection) - return fn(ctx2, client) + return fn(client) }, grpcAddress, grpcDialOption) } @@ -38,18 +37,16 @@ func toVolumeServerGrpcAddress(volumeServer string) (grpcAddress string, err err return fmt.Sprintf("%s:%d", volumeServer[0:sepIndex], port+10000), nil } -func WithMasterServerClient(masterServer string, grpcDialOption grpc.DialOption, fn func(ctx2 context.Context, masterClient master_pb.SeaweedClient) error) error { - - ctx := context.Background() +func WithMasterServerClient(masterServer string, grpcDialOption grpc.DialOption, fn func(masterClient master_pb.SeaweedClient) error) error { masterGrpcAddress, parseErr := util.ParseServerToGrpcAddress(masterServer) if parseErr != nil { return fmt.Errorf("failed to parse master grpc %v: %v", masterServer, parseErr) } - return util.WithCachedGrpcClient(ctx, func(ctx2 context.Context, grpcConnection *grpc.ClientConn) error { + return util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { client := master_pb.NewSeaweedClient(grpcConnection) - return fn(ctx2, client) + return fn(client) }, masterGrpcAddress, grpcDialOption) } diff --git a/weed/operation/lookup.go b/weed/operation/lookup.go index 78769ac5a..d0773e7fd 100644 --- a/weed/operation/lookup.go +++ b/weed/operation/lookup.go @@ -99,12 +99,12 @@ func LookupVolumeIds(server string, grpcDialOption grpc.DialOption, vids []strin //only query unknown_vids - err := WithMasterServerClient(server, grpcDialOption, func(ctx context.Context, masterClient master_pb.SeaweedClient) error { + err := WithMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error { req := &master_pb.LookupVolumeRequest{ VolumeIds: unknown_vids, } - resp, grpcErr := masterClient.LookupVolume(ctx, req) + resp, grpcErr := masterClient.LookupVolume(context.Background(), req) if grpcErr != nil { return grpcErr } diff --git a/weed/operation/stats.go b/weed/operation/stats.go index 3e6327f19..b69a33750 100644 --- a/weed/operation/stats.go +++ b/weed/operation/stats.go @@ -9,9 +9,9 @@ import ( func Statistics(server string, grpcDialOption grpc.DialOption, req *master_pb.StatisticsRequest) (resp *master_pb.StatisticsResponse, err error) { - err = WithMasterServerClient(server, grpcDialOption, func(ctx context.Context, masterClient master_pb.SeaweedClient) error { + err = WithMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error { - grpcResponse, grpcErr := masterClient.Statistics(ctx, req) + grpcResponse, grpcErr := masterClient.Statistics(context.Background(), req) if grpcErr != nil { return grpcErr } diff --git a/weed/operation/sync_volume.go b/weed/operation/sync_volume.go index 4b39ad544..5562f12ab 100644 --- a/weed/operation/sync_volume.go +++ b/weed/operation/sync_volume.go @@ -8,9 +8,9 @@ import ( func GetVolumeSyncStatus(server string, grpcDialOption grpc.DialOption, vid uint32) (resp *volume_server_pb.VolumeSyncStatusResponse, err error) { - WithVolumeServerClient(server, grpcDialOption, func(ctx context.Context, client volume_server_pb.VolumeServerClient) error { + WithVolumeServerClient(server, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { - resp, err = client.VolumeSyncStatus(ctx, &volume_server_pb.VolumeSyncStatusRequest{ + resp, err = client.VolumeSyncStatus(context.Background(), &volume_server_pb.VolumeSyncStatusRequest{ VolumeId: vid, }) return nil diff --git a/weed/operation/tail_volume.go b/weed/operation/tail_volume.go index 1e8b0a16e..3cd66b5da 100644 --- a/weed/operation/tail_volume.go +++ b/weed/operation/tail_volume.go @@ -5,9 +5,10 @@ import ( "fmt" "io" + "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/storage/needle" - "google.golang.org/grpc" ) func TailVolume(master string, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, timeoutSeconds int, fn func(n *needle.Needle) error) error { @@ -26,9 +27,9 @@ func TailVolume(master string, grpcDialOption grpc.DialOption, vid needle.Volume } func TailVolumeFromSource(volumeServer string, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, idleTimeoutSeconds int, fn func(n *needle.Needle) error) error { - return WithVolumeServerClient(volumeServer, grpcDialOption, func(ctx context.Context, client volume_server_pb.VolumeServerClient) error { + return WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { - stream, err := client.VolumeTailSender(ctx, &volume_server_pb.VolumeTailSenderRequest{ + stream, err := client.VolumeTailSender(context.Background(), &volume_server_pb.VolumeTailSenderRequest{ VolumeId: uint32(vid), SinceNs: sinceNs, IdleTimeoutSeconds: uint32(idleTimeoutSeconds), |
