aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/server/filer_grpc_server.go126
-rw-r--r--weed/server/filer_grpc_server_admin.go135
2 files changed, 135 insertions, 126 deletions
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go
index 5a5714156..67d4aaaaf 100644
--- a/weed/server/filer_grpc_server.go
+++ b/weed/server/filer_grpc_server.go
@@ -3,7 +3,6 @@ package weed_server
import (
"context"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/pb"
"os"
"path/filepath"
"strconv"
@@ -357,128 +356,3 @@ func (fs *FilerServer) DeleteCollection(ctx context.Context, req *filer_pb.Delet
return &filer_pb.DeleteCollectionResponse{}, err
}
-
-func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsRequest) (resp *filer_pb.StatisticsResponse, err error) {
-
- var output *master_pb.StatisticsResponse
-
- err = fs.filer.MasterClient.WithClient(false, func(masterClient master_pb.SeaweedClient) error {
- grpcResponse, grpcErr := masterClient.Statistics(context.Background(), &master_pb.StatisticsRequest{
- Replication: req.Replication,
- Collection: req.Collection,
- Ttl: req.Ttl,
- DiskType: req.DiskType,
- })
- if grpcErr != nil {
- return grpcErr
- }
-
- output = grpcResponse
- return nil
- })
-
- if err != nil {
- return nil, err
- }
-
- return &filer_pb.StatisticsResponse{
- TotalSize: output.TotalSize,
- UsedSize: output.UsedSize,
- FileCount: output.FileCount,
- }, nil
-}
-
-func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb.GetFilerConfigurationRequest) (resp *filer_pb.GetFilerConfigurationResponse, err error) {
-
- clusterId, _ := fs.filer.Store.KvGet(context.Background(), []byte("clusterId"))
-
- t := &filer_pb.GetFilerConfigurationResponse{
- Masters: pb.ToAddressStringsFromMap(fs.option.Masters),
- Collection: fs.option.Collection,
- Replication: fs.option.DefaultReplication,
- MaxMb: uint32(fs.option.MaxMB),
- DirBuckets: fs.filer.DirBucketsPath,
- Cipher: fs.filer.Cipher,
- Signature: fs.filer.Signature,
- MetricsAddress: fs.metricsAddress,
- MetricsIntervalSec: int32(fs.metricsIntervalSec),
- Version: util.Version(),
- ClusterId: string(clusterId),
- }
-
- glog.V(4).Infof("GetFilerConfiguration: %v", t)
-
- return t, nil
-}
-
-func (fs *FilerServer) KeepConnected(stream filer_pb.SeaweedFiler_KeepConnectedServer) error {
-
- req, err := stream.Recv()
- if err != nil {
- return err
- }
-
- clientName := util.JoinHostPort(req.Name, int(req.GrpcPort))
- m := make(map[string]bool)
- for _, tp := range req.Resources {
- m[tp] = true
- }
- fs.brokersLock.Lock()
- fs.brokers[clientName] = m
- glog.V(0).Infof("+ broker %v", clientName)
- fs.brokersLock.Unlock()
-
- defer func() {
- fs.brokersLock.Lock()
- delete(fs.brokers, clientName)
- glog.V(0).Infof("- broker %v: %v", clientName, err)
- fs.brokersLock.Unlock()
- }()
-
- for {
- if err := stream.Send(&filer_pb.KeepConnectedResponse{}); err != nil {
- glog.V(0).Infof("send broker %v: %+v", clientName, err)
- return err
- }
- // println("replied")
-
- if _, err := stream.Recv(); err != nil {
- glog.V(0).Infof("recv broker %v: %v", clientName, err)
- return err
- }
- // println("received")
- }
-
-}
-
-func (fs *FilerServer) LocateBroker(ctx context.Context, req *filer_pb.LocateBrokerRequest) (resp *filer_pb.LocateBrokerResponse, err error) {
-
- resp = &filer_pb.LocateBrokerResponse{}
-
- fs.brokersLock.Lock()
- defer fs.brokersLock.Unlock()
-
- var localBrokers []*filer_pb.LocateBrokerResponse_Resource
-
- for b, m := range fs.brokers {
- if _, found := m[req.Resource]; found {
- resp.Found = true
- resp.Resources = []*filer_pb.LocateBrokerResponse_Resource{
- {
- GrpcAddresses: b,
- ResourceCount: int32(len(m)),
- },
- }
- return
- }
- localBrokers = append(localBrokers, &filer_pb.LocateBrokerResponse_Resource{
- GrpcAddresses: b,
- ResourceCount: int32(len(m)),
- })
- }
-
- resp.Resources = localBrokers
-
- return resp, nil
-
-}
diff --git a/weed/server/filer_grpc_server_admin.go b/weed/server/filer_grpc_server_admin.go
new file mode 100644
index 000000000..b1a936e81
--- /dev/null
+++ b/weed/server/filer_grpc_server_admin.go
@@ -0,0 +1,135 @@
+package weed_server
+
+import (
+ "context"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsRequest) (resp *filer_pb.StatisticsResponse, err error) {
+
+ var output *master_pb.StatisticsResponse
+
+ err = fs.filer.MasterClient.WithClient(false, func(masterClient master_pb.SeaweedClient) error {
+ grpcResponse, grpcErr := masterClient.Statistics(context.Background(), &master_pb.StatisticsRequest{
+ Replication: req.Replication,
+ Collection: req.Collection,
+ Ttl: req.Ttl,
+ DiskType: req.DiskType,
+ })
+ if grpcErr != nil {
+ return grpcErr
+ }
+
+ output = grpcResponse
+ return nil
+ })
+
+ if err != nil {
+ return nil, err
+ }
+
+ return &filer_pb.StatisticsResponse{
+ TotalSize: output.TotalSize,
+ UsedSize: output.UsedSize,
+ FileCount: output.FileCount,
+ }, nil
+}
+
+func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb.GetFilerConfigurationRequest) (resp *filer_pb.GetFilerConfigurationResponse, err error) {
+
+ clusterId, _ := fs.filer.Store.KvGet(context.Background(), []byte("clusterId"))
+
+ t := &filer_pb.GetFilerConfigurationResponse{
+ Masters: pb.ToAddressStringsFromMap(fs.option.Masters),
+ Collection: fs.option.Collection,
+ Replication: fs.option.DefaultReplication,
+ MaxMb: uint32(fs.option.MaxMB),
+ DirBuckets: fs.filer.DirBucketsPath,
+ Cipher: fs.filer.Cipher,
+ Signature: fs.filer.Signature,
+ MetricsAddress: fs.metricsAddress,
+ MetricsIntervalSec: int32(fs.metricsIntervalSec),
+ Version: util.Version(),
+ ClusterId: string(clusterId),
+ }
+
+ glog.V(4).Infof("GetFilerConfiguration: %v", t)
+
+ return t, nil
+}
+
+func (fs *FilerServer) KeepConnected(stream filer_pb.SeaweedFiler_KeepConnectedServer) error {
+
+ req, err := stream.Recv()
+ if err != nil {
+ return err
+ }
+
+ clientName := util.JoinHostPort(req.Name, int(req.GrpcPort))
+ m := make(map[string]bool)
+ for _, tp := range req.Resources {
+ m[tp] = true
+ }
+ fs.brokersLock.Lock()
+ fs.brokers[clientName] = m
+ glog.V(0).Infof("+ broker %v", clientName)
+ fs.brokersLock.Unlock()
+
+ defer func() {
+ fs.brokersLock.Lock()
+ delete(fs.brokers, clientName)
+ glog.V(0).Infof("- broker %v: %v", clientName, err)
+ fs.brokersLock.Unlock()
+ }()
+
+ for {
+ if err := stream.Send(&filer_pb.KeepConnectedResponse{}); err != nil {
+ glog.V(0).Infof("send broker %v: %+v", clientName, err)
+ return err
+ }
+ // println("replied")
+
+ if _, err := stream.Recv(); err != nil {
+ glog.V(0).Infof("recv broker %v: %v", clientName, err)
+ return err
+ }
+ // println("received")
+ }
+
+}
+
+func (fs *FilerServer) LocateBroker(ctx context.Context, req *filer_pb.LocateBrokerRequest) (resp *filer_pb.LocateBrokerResponse, err error) {
+
+ resp = &filer_pb.LocateBrokerResponse{}
+
+ fs.brokersLock.Lock()
+ defer fs.brokersLock.Unlock()
+
+ var localBrokers []*filer_pb.LocateBrokerResponse_Resource
+
+ for b, m := range fs.brokers {
+ if _, found := m[req.Resource]; found {
+ resp.Found = true
+ resp.Resources = []*filer_pb.LocateBrokerResponse_Resource{
+ {
+ GrpcAddresses: b,
+ ResourceCount: int32(len(m)),
+ },
+ }
+ return
+ }
+ localBrokers = append(localBrokers, &filer_pb.LocateBrokerResponse_Resource{
+ GrpcAddresses: b,
+ ResourceCount: int32(len(m)),
+ })
+ }
+
+ resp.Resources = localBrokers
+
+ return resp, nil
+
+}