aboutsummaryrefslogtreecommitdiff
path: root/weed/server/filer_grpc_server_admin.go
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-04-01 16:40:49 -0700
committerchrislu <chris.lu@gmail.com>2022-04-01 16:40:49 -0700
commit2305508b6503a9b1c9cf79e2d359e0c6f6b0d2ea (patch)
tree925a242359b2910b2fa9367da7f991bc0bfa9ed4 /weed/server/filer_grpc_server_admin.go
parent800cbc004c84a5113076fdbb70e93d53e8434073 (diff)
downloadseaweedfs-2305508b6503a9b1c9cf79e2d359e0c6f6b0d2ea.tar.xz
seaweedfs-2305508b6503a9b1c9cf79e2d359e0c6f6b0d2ea.zip
refactor: separate into two files
Diffstat (limited to 'weed/server/filer_grpc_server_admin.go')
-rw-r--r--weed/server/filer_grpc_server_admin.go135
1 files changed, 135 insertions, 0 deletions
diff --git a/weed/server/filer_grpc_server_admin.go b/weed/server/filer_grpc_server_admin.go
new file mode 100644
index 000000000..b1a936e81
--- /dev/null
+++ b/weed/server/filer_grpc_server_admin.go
@@ -0,0 +1,135 @@
+package weed_server
+
+import (
+ "context"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsRequest) (resp *filer_pb.StatisticsResponse, err error) {
+
+ var output *master_pb.StatisticsResponse
+
+ err = fs.filer.MasterClient.WithClient(false, func(masterClient master_pb.SeaweedClient) error {
+ grpcResponse, grpcErr := masterClient.Statistics(context.Background(), &master_pb.StatisticsRequest{
+ Replication: req.Replication,
+ Collection: req.Collection,
+ Ttl: req.Ttl,
+ DiskType: req.DiskType,
+ })
+ if grpcErr != nil {
+ return grpcErr
+ }
+
+ output = grpcResponse
+ return nil
+ })
+
+ if err != nil {
+ return nil, err
+ }
+
+ return &filer_pb.StatisticsResponse{
+ TotalSize: output.TotalSize,
+ UsedSize: output.UsedSize,
+ FileCount: output.FileCount,
+ }, nil
+}
+
+func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb.GetFilerConfigurationRequest) (resp *filer_pb.GetFilerConfigurationResponse, err error) {
+
+ clusterId, _ := fs.filer.Store.KvGet(context.Background(), []byte("clusterId"))
+
+ t := &filer_pb.GetFilerConfigurationResponse{
+ Masters: pb.ToAddressStringsFromMap(fs.option.Masters),
+ Collection: fs.option.Collection,
+ Replication: fs.option.DefaultReplication,
+ MaxMb: uint32(fs.option.MaxMB),
+ DirBuckets: fs.filer.DirBucketsPath,
+ Cipher: fs.filer.Cipher,
+ Signature: fs.filer.Signature,
+ MetricsAddress: fs.metricsAddress,
+ MetricsIntervalSec: int32(fs.metricsIntervalSec),
+ Version: util.Version(),
+ ClusterId: string(clusterId),
+ }
+
+ glog.V(4).Infof("GetFilerConfiguration: %v", t)
+
+ return t, nil
+}
+
+func (fs *FilerServer) KeepConnected(stream filer_pb.SeaweedFiler_KeepConnectedServer) error {
+
+ req, err := stream.Recv()
+ if err != nil {
+ return err
+ }
+
+ clientName := util.JoinHostPort(req.Name, int(req.GrpcPort))
+ m := make(map[string]bool)
+ for _, tp := range req.Resources {
+ m[tp] = true
+ }
+ fs.brokersLock.Lock()
+ fs.brokers[clientName] = m
+ glog.V(0).Infof("+ broker %v", clientName)
+ fs.brokersLock.Unlock()
+
+ defer func() {
+ fs.brokersLock.Lock()
+ delete(fs.brokers, clientName)
+ glog.V(0).Infof("- broker %v: %v", clientName, err)
+ fs.brokersLock.Unlock()
+ }()
+
+ for {
+ if err := stream.Send(&filer_pb.KeepConnectedResponse{}); err != nil {
+ glog.V(0).Infof("send broker %v: %+v", clientName, err)
+ return err
+ }
+ // println("replied")
+
+ if _, err := stream.Recv(); err != nil {
+ glog.V(0).Infof("recv broker %v: %v", clientName, err)
+ return err
+ }
+ // println("received")
+ }
+
+}
+
+func (fs *FilerServer) LocateBroker(ctx context.Context, req *filer_pb.LocateBrokerRequest) (resp *filer_pb.LocateBrokerResponse, err error) {
+
+ resp = &filer_pb.LocateBrokerResponse{}
+
+ fs.brokersLock.Lock()
+ defer fs.brokersLock.Unlock()
+
+ var localBrokers []*filer_pb.LocateBrokerResponse_Resource
+
+ for b, m := range fs.brokers {
+ if _, found := m[req.Resource]; found {
+ resp.Found = true
+ resp.Resources = []*filer_pb.LocateBrokerResponse_Resource{
+ {
+ GrpcAddresses: b,
+ ResourceCount: int32(len(m)),
+ },
+ }
+ return
+ }
+ localBrokers = append(localBrokers, &filer_pb.LocateBrokerResponse_Resource{
+ GrpcAddresses: b,
+ ResourceCount: int32(len(m)),
+ })
+ }
+
+ resp.Resources = localBrokers
+
+ return resp, nil
+
+}