diff options
| author | chrislu <chris.lu@gmail.com> | 2022-04-01 16:40:49 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2022-04-01 16:40:49 -0700 |
| commit | 2305508b6503a9b1c9cf79e2d359e0c6f6b0d2ea (patch) | |
| tree | 925a242359b2910b2fa9367da7f991bc0bfa9ed4 /weed/server/filer_grpc_server_admin.go | |
| parent | 800cbc004c84a5113076fdbb70e93d53e8434073 (diff) | |
| download | seaweedfs-2305508b6503a9b1c9cf79e2d359e0c6f6b0d2ea.tar.xz seaweedfs-2305508b6503a9b1c9cf79e2d359e0c6f6b0d2ea.zip | |
refactor: separate into two files
Diffstat (limited to 'weed/server/filer_grpc_server_admin.go')
| -rw-r--r-- | weed/server/filer_grpc_server_admin.go | 135 |
1 files changed, 135 insertions, 0 deletions
diff --git a/weed/server/filer_grpc_server_admin.go b/weed/server/filer_grpc_server_admin.go new file mode 100644 index 000000000..b1a936e81 --- /dev/null +++ b/weed/server/filer_grpc_server_admin.go @@ -0,0 +1,135 @@ +package weed_server + +import ( + "context" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsRequest) (resp *filer_pb.StatisticsResponse, err error) { + + var output *master_pb.StatisticsResponse + + err = fs.filer.MasterClient.WithClient(false, func(masterClient master_pb.SeaweedClient) error { + grpcResponse, grpcErr := masterClient.Statistics(context.Background(), &master_pb.StatisticsRequest{ + Replication: req.Replication, + Collection: req.Collection, + Ttl: req.Ttl, + DiskType: req.DiskType, + }) + if grpcErr != nil { + return grpcErr + } + + output = grpcResponse + return nil + }) + + if err != nil { + return nil, err + } + + return &filer_pb.StatisticsResponse{ + TotalSize: output.TotalSize, + UsedSize: output.UsedSize, + FileCount: output.FileCount, + }, nil +} + +func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb.GetFilerConfigurationRequest) (resp *filer_pb.GetFilerConfigurationResponse, err error) { + + clusterId, _ := fs.filer.Store.KvGet(context.Background(), []byte("clusterId")) + + t := &filer_pb.GetFilerConfigurationResponse{ + Masters: pb.ToAddressStringsFromMap(fs.option.Masters), + Collection: fs.option.Collection, + Replication: fs.option.DefaultReplication, + MaxMb: uint32(fs.option.MaxMB), + DirBuckets: fs.filer.DirBucketsPath, + Cipher: fs.filer.Cipher, + Signature: fs.filer.Signature, + MetricsAddress: fs.metricsAddress, + MetricsIntervalSec: int32(fs.metricsIntervalSec), + Version: util.Version(), + ClusterId: string(clusterId), + } + + glog.V(4).Infof("GetFilerConfiguration: %v", t) + + return t, nil +} + +func (fs *FilerServer) KeepConnected(stream filer_pb.SeaweedFiler_KeepConnectedServer) error { + + req, err := stream.Recv() + if err != nil { + return err + } + + clientName := util.JoinHostPort(req.Name, int(req.GrpcPort)) + m := make(map[string]bool) + for _, tp := range req.Resources { + m[tp] = true + } + fs.brokersLock.Lock() + fs.brokers[clientName] = m + glog.V(0).Infof("+ broker %v", clientName) + fs.brokersLock.Unlock() + + defer func() { + fs.brokersLock.Lock() + delete(fs.brokers, clientName) + glog.V(0).Infof("- broker %v: %v", clientName, err) + fs.brokersLock.Unlock() + }() + + for { + if err := stream.Send(&filer_pb.KeepConnectedResponse{}); err != nil { + glog.V(0).Infof("send broker %v: %+v", clientName, err) + return err + } + // println("replied") + + if _, err := stream.Recv(); err != nil { + glog.V(0).Infof("recv broker %v: %v", clientName, err) + return err + } + // println("received") + } + +} + +func (fs *FilerServer) LocateBroker(ctx context.Context, req *filer_pb.LocateBrokerRequest) (resp *filer_pb.LocateBrokerResponse, err error) { + + resp = &filer_pb.LocateBrokerResponse{} + + fs.brokersLock.Lock() + defer fs.brokersLock.Unlock() + + var localBrokers []*filer_pb.LocateBrokerResponse_Resource + + for b, m := range fs.brokers { + if _, found := m[req.Resource]; found { + resp.Found = true + resp.Resources = []*filer_pb.LocateBrokerResponse_Resource{ + { + GrpcAddresses: b, + ResourceCount: int32(len(m)), + }, + } + return + } + localBrokers = append(localBrokers, &filer_pb.LocateBrokerResponse_Resource{ + GrpcAddresses: b, + ResourceCount: int32(len(m)), + }) + } + + resp.Resources = localBrokers + + return resp, nil + +} |
