diff options
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/msg_broker_grpc_server.go | 23 | ||||
| -rw-r--r-- | weed/server/msg_broker_server.go | 121 | ||||
| -rw-r--r-- | weed/server/queue_server.go | 49 | ||||
| -rw-r--r-- | weed/server/raft_server.go | 11 | ||||
| -rw-r--r-- | weed/server/volume_grpc_client_to_master.go | 5 | ||||
| -rw-r--r-- | weed/server/webdav_server.go | 3 |
6 files changed, 156 insertions, 56 deletions
diff --git a/weed/server/msg_broker_grpc_server.go b/weed/server/msg_broker_grpc_server.go new file mode 100644 index 000000000..8b13aac76 --- /dev/null +++ b/weed/server/msg_broker_grpc_server.go @@ -0,0 +1,23 @@ +package weed_server + +import ( + "context" + + "github.com/chrislusf/seaweedfs/weed/pb/queue_pb" +) + +func (broker *MessageBroker) ConfigureTopic(context.Context, *queue_pb.ConfigureTopicRequest) (*queue_pb.ConfigureTopicResponse, error) { + panic("implement me") +} + +func (broker *MessageBroker) DeleteTopic(context.Context, *queue_pb.DeleteTopicRequest) (*queue_pb.DeleteTopicResponse, error) { + panic("implement me") +} + +func (broker *MessageBroker) StreamWrite(queue_pb.SeaweedQueue_StreamWriteServer) error { + panic("implement me") +} + +func (broker *MessageBroker) StreamRead(*queue_pb.ReadMessageRequest, queue_pb.SeaweedQueue_StreamReadServer) error { + panic("implement me") +} diff --git a/weed/server/msg_broker_server.go b/weed/server/msg_broker_server.go new file mode 100644 index 000000000..a9d908581 --- /dev/null +++ b/weed/server/msg_broker_server.go @@ -0,0 +1,121 @@ +package weed_server + +import ( + "context" + "fmt" + "time" + + "google.golang.org/grpc" + + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util" +) + +type MessageBrokerOption struct { + Filers []string + DefaultReplication string + MaxMB int + Port int +} + +type MessageBroker struct { + option *MessageBrokerOption + grpcDialOption grpc.DialOption +} + +func NewMessageBroker(option *MessageBrokerOption) (messageBroker *MessageBroker, err error) { + + messageBroker = &MessageBroker{ + option: option, + grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.msg_broker"), + } + + go messageBroker.loopForEver() + + return messageBroker, nil +} + +func (broker *MessageBroker) loopForEver() { + + for { + broker.checkPeers() + time.Sleep(3 * time.Second) + } + +} + +func (broker *MessageBroker) checkPeers() { + + // contact a filer about masters + var masters []string + for _, filer := range broker.option.Filers { + err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error { + resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) + if err != nil { + return err + } + masters = append(masters, resp.Masters...) + return nil + }) + if err != nil { + fmt.Printf("failed to read masters from %+v: %v\n", broker.option.Filers, err) + return + } + } + + // contact each masters for filers + var filers []string + for _, master := range masters { + err := broker.withMasterClient(master, func(client master_pb.SeaweedClient) error { + resp, err := client.ListMasterClients(context.Background(), &master_pb.ListMasterClientsRequest{ + ClientType: "filer", + }) + if err != nil { + return err + } + + fmt.Printf("filers: %+v\n", resp.GrpcAddresses) + filers = append(filers, resp.GrpcAddresses...) + + return nil + }) + if err != nil { + fmt.Printf("failed to list filers: %v\n", err) + return + } + } + + // contact each filer about brokers + for _, filer := range filers { + err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error { + resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) + if err != nil { + return err + } + masters = append(masters, resp.Masters...) + return nil + }) + if err != nil { + fmt.Printf("failed to read masters from %+v: %v\n", broker.option.Filers, err) + return + } + } + +} + +func (broker *MessageBroker) withFilerClient(filer string, fn func(filer_pb.SeaweedFilerClient) error) error { + + return pb.WithFilerClient(filer, broker.grpcDialOption, fn) + +} + +func (broker *MessageBroker) withMasterClient(master string, fn func(client master_pb.SeaweedClient) error) error { + + return pb.WithMasterClient(master, broker.grpcDialOption, func(client master_pb.SeaweedClient) error { + return fn(client) + }) + +} diff --git a/weed/server/queue_server.go b/weed/server/queue_server.go deleted file mode 100644 index 078c76a30..000000000 --- a/weed/server/queue_server.go +++ /dev/null @@ -1,49 +0,0 @@ -package weed_server - -import ( - "context" - - "google.golang.org/grpc" - - "github.com/chrislusf/seaweedfs/weed/pb/queue_pb" - "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/util" -) - -type QueueServerOption struct { - Filers []string - DefaultReplication string - MaxMB int - Port int -} - -type QueueServer struct { - option *QueueServerOption - grpcDialOption grpc.DialOption -} - -func (q *QueueServer) ConfigureTopic(context.Context, *queue_pb.ConfigureTopicRequest) (*queue_pb.ConfigureTopicResponse, error) { - panic("implement me") -} - -func (q *QueueServer) DeleteTopic(context.Context, *queue_pb.DeleteTopicRequest) (*queue_pb.DeleteTopicResponse, error) { - panic("implement me") -} - -func (q *QueueServer) StreamWrite(queue_pb.SeaweedQueue_StreamWriteServer) error { - panic("implement me") -} - -func (q *QueueServer) StreamRead(*queue_pb.ReadMessageRequest, queue_pb.SeaweedQueue_StreamReadServer) error { - panic("implement me") -} - -func NewQueueServer(option *QueueServerOption) (qs *QueueServer, err error) { - - qs = &QueueServer{ - option: option, - grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.queue"), - } - - return qs, nil -} diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go index 53289f1c1..0381c7feb 100644 --- a/weed/server/raft_server.go +++ b/weed/server/raft_server.go @@ -2,8 +2,6 @@ package weed_server import ( "encoding/json" - "github.com/chrislusf/seaweedfs/weed/util" - "google.golang.org/grpc" "io/ioutil" "os" "path" @@ -11,7 +9,12 @@ import ( "sort" "time" + "google.golang.org/grpc" + + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/raft" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/topology" ) @@ -61,7 +64,7 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, d s.raftServer.Start() for _, peer := range s.peers { - s.raftServer.AddPeer(peer, util.ServerToGrpcAddress(peer)) + s.raftServer.AddPeer(peer, pb.ServerToGrpcAddress(peer)) } s.GrpcServer = raft.NewGrpcServer(s.raftServer) @@ -72,7 +75,7 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, d _, err := s.raftServer.Do(&raft.DefaultJoinCommand{ Name: s.raftServer.Name(), - ConnectionString: util.ServerToGrpcAddress(s.serverAddr), + ConnectionString: pb.ServerToGrpcAddress(s.serverAddr), }) if err != nil { diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go index 2168afee7..1f4d9df10 100644 --- a/weed/server/volume_grpc_client_to_master.go +++ b/weed/server/volume_grpc_client_to_master.go @@ -7,6 +7,7 @@ import ( "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/storage/backend" "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" @@ -36,7 +37,7 @@ func (vs *VolumeServer) heartbeat() { if newLeader != "" { master = newLeader } - masterGrpcAddress, parseErr := util.ParseServerToGrpcAddress(master) + masterGrpcAddress, parseErr := pb.ParseServerToGrpcAddress(master) if parseErr != nil { glog.V(0).Infof("failed to parse master grpc %v: %v", masterGrpcAddress, parseErr) continue @@ -55,7 +56,7 @@ func (vs *VolumeServer) heartbeat() { func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDialOption grpc.DialOption, sleepInterval time.Duration) (newLeader string, err error) { - grpcConection, err := util.GrpcDial(context.Background(), masterGrpcAddress, grpcDialOption) + grpcConection, err := pb.GrpcDial(context.Background(), masterGrpcAddress, grpcDialOption) if err != nil { return "", fmt.Errorf("fail to dial %s : %v", masterNode, err) } diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index ddd611724..a07f6be01 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -14,6 +14,7 @@ import ( "google.golang.org/grpc" "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" @@ -98,7 +99,7 @@ func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) { func (fs *WebDavFileSystem) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { - return util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) }, fs.option.FilerGrpcAddress, fs.option.GrpcDialOption) |
