diff options
Diffstat (limited to 'weed/messaging')
| -rw-r--r-- | weed/messaging/broker/broker_append.go | 5 | ||||
| -rw-r--r-- | weed/messaging/broker/broker_grpc_server_discovery.go | 11 | ||||
| -rw-r--r-- | weed/messaging/broker/broker_server.go | 6 |
3 files changed, 13 insertions, 9 deletions
diff --git a/weed/messaging/broker/broker_append.go b/weed/messaging/broker/broker_append.go index 599efad98..9958a0752 100644 --- a/weed/messaging/broker/broker_append.go +++ b/weed/messaging/broker/broker_append.go @@ -71,8 +71,9 @@ func (broker *MessageBroker) assignAndUpload(topicConfig *messaging_pb.TopicConf assignResult.Auth = security.EncodedJwt(resp.Auth) assignResult.Fid = resp.FileId - assignResult.Url = resp.Url - assignResult.PublicUrl = resp.PublicUrl + assignResult.Url = resp.Location.Url + assignResult.PublicUrl = resp.Location.PublicUrl + assignResult.GrpcPort = int(resp.Location.GrpcPort) assignResult.Count = uint64(resp.Count) return nil diff --git a/weed/messaging/broker/broker_grpc_server_discovery.go b/weed/messaging/broker/broker_grpc_server_discovery.go index 3c14f3220..2b5e03236 100644 --- a/weed/messaging/broker/broker_grpc_server_discovery.go +++ b/weed/messaging/broker/broker_grpc_server_discovery.go @@ -3,6 +3,7 @@ package broker import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" "time" "github.com/chrislusf/seaweedfs/weed/glog" @@ -62,7 +63,7 @@ func (broker *MessageBroker) FindBroker(c context.Context, request *messaging_pb func (broker *MessageBroker) checkFilers() { // contact a filer about masters - var masters []string + var masters []pb.ServerAddress found := false for !found { for _, filer := range broker.option.Filers { @@ -71,7 +72,9 @@ func (broker *MessageBroker) checkFilers() { if err != nil { return err } - masters = append(masters, resp.Masters...) + for _, m := range resp.Masters { + masters = append(masters, pb.ServerAddress(m)) + } return nil }) if err == nil { @@ -85,7 +88,7 @@ func (broker *MessageBroker) checkFilers() { glog.V(0).Infof("received master list: %s", masters) // contact each masters for filers - var filers []string + var filers []pb.ServerAddress found = false for !found { for _, master := range masters { @@ -97,7 +100,7 @@ func (broker *MessageBroker) checkFilers() { return err } - filers = append(filers, resp.GrpcAddresses...) + filers = append(filers, pb.FromAddressStrings(resp.GrpcAddresses)...) return nil }) diff --git a/weed/messaging/broker/broker_server.go b/weed/messaging/broker/broker_server.go index 06162471c..fd41dd441 100644 --- a/weed/messaging/broker/broker_server.go +++ b/weed/messaging/broker/broker_server.go @@ -13,7 +13,7 @@ import ( ) type MessageBrokerOption struct { - Filers []string + Filers []pb.ServerAddress DefaultReplication string MaxMB int Ip string @@ -99,13 +99,13 @@ func (broker *MessageBroker) keepConnectedToOneFiler() { } -func (broker *MessageBroker) withFilerClient(filer string, fn func(filer_pb.SeaweedFilerClient) error) error { +func (broker *MessageBroker) withFilerClient(filer pb.ServerAddress, fn func(filer_pb.SeaweedFilerClient) error) error { return pb.WithFilerClient(filer, broker.grpcDialOption, fn) } -func (broker *MessageBroker) withMasterClient(master string, fn func(client master_pb.SeaweedClient) error) error { +func (broker *MessageBroker) withMasterClient(master pb.ServerAddress, fn func(client master_pb.SeaweedClient) error) error { return pb.WithMasterClient(master, broker.grpcDialOption, func(client master_pb.SeaweedClient) error { return fn(client) |
