aboutsummaryrefslogtreecommitdiff
path: root/weed/messaging
diff options
context:
space:
mode:
Diffstat (limited to 'weed/messaging')
-rw-r--r--weed/messaging/broker/broker_append.go5
-rw-r--r--weed/messaging/broker/broker_grpc_server_discovery.go11
-rw-r--r--weed/messaging/broker/broker_server.go6
3 files changed, 13 insertions, 9 deletions
diff --git a/weed/messaging/broker/broker_append.go b/weed/messaging/broker/broker_append.go
index 599efad98..9958a0752 100644
--- a/weed/messaging/broker/broker_append.go
+++ b/weed/messaging/broker/broker_append.go
@@ -71,8 +71,9 @@ func (broker *MessageBroker) assignAndUpload(topicConfig *messaging_pb.TopicConf
assignResult.Auth = security.EncodedJwt(resp.Auth)
assignResult.Fid = resp.FileId
- assignResult.Url = resp.Url
- assignResult.PublicUrl = resp.PublicUrl
+ assignResult.Url = resp.Location.Url
+ assignResult.PublicUrl = resp.Location.PublicUrl
+ assignResult.GrpcPort = int(resp.Location.GrpcPort)
assignResult.Count = uint64(resp.Count)
return nil
diff --git a/weed/messaging/broker/broker_grpc_server_discovery.go b/weed/messaging/broker/broker_grpc_server_discovery.go
index 3c14f3220..2b5e03236 100644
--- a/weed/messaging/broker/broker_grpc_server_discovery.go
+++ b/weed/messaging/broker/broker_grpc_server_discovery.go
@@ -3,6 +3,7 @@ package broker
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -62,7 +63,7 @@ func (broker *MessageBroker) FindBroker(c context.Context, request *messaging_pb
func (broker *MessageBroker) checkFilers() {
// contact a filer about masters
- var masters []string
+ var masters []pb.ServerAddress
found := false
for !found {
for _, filer := range broker.option.Filers {
@@ -71,7 +72,9 @@ func (broker *MessageBroker) checkFilers() {
if err != nil {
return err
}
- masters = append(masters, resp.Masters...)
+ for _, m := range resp.Masters {
+ masters = append(masters, pb.ServerAddress(m))
+ }
return nil
})
if err == nil {
@@ -85,7 +88,7 @@ func (broker *MessageBroker) checkFilers() {
glog.V(0).Infof("received master list: %s", masters)
// contact each masters for filers
- var filers []string
+ var filers []pb.ServerAddress
found = false
for !found {
for _, master := range masters {
@@ -97,7 +100,7 @@ func (broker *MessageBroker) checkFilers() {
return err
}
- filers = append(filers, resp.GrpcAddresses...)
+ filers = append(filers, pb.FromAddressStrings(resp.GrpcAddresses)...)
return nil
})
diff --git a/weed/messaging/broker/broker_server.go b/weed/messaging/broker/broker_server.go
index 06162471c..fd41dd441 100644
--- a/weed/messaging/broker/broker_server.go
+++ b/weed/messaging/broker/broker_server.go
@@ -13,7 +13,7 @@ import (
)
type MessageBrokerOption struct {
- Filers []string
+ Filers []pb.ServerAddress
DefaultReplication string
MaxMB int
Ip string
@@ -99,13 +99,13 @@ func (broker *MessageBroker) keepConnectedToOneFiler() {
}
-func (broker *MessageBroker) withFilerClient(filer string, fn func(filer_pb.SeaweedFilerClient) error) error {
+func (broker *MessageBroker) withFilerClient(filer pb.ServerAddress, fn func(filer_pb.SeaweedFilerClient) error) error {
return pb.WithFilerClient(filer, broker.grpcDialOption, fn)
}
-func (broker *MessageBroker) withMasterClient(master string, fn func(client master_pb.SeaweedClient) error) error {
+func (broker *MessageBroker) withMasterClient(master pb.ServerAddress, fn func(client master_pb.SeaweedClient) error) error {
return pb.WithMasterClient(master, broker.grpcDialOption, func(client master_pb.SeaweedClient) error {
return fn(client)