aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2020-05-17 17:39:16 -0700
committerGitHub <noreply@github.com>2020-05-17 17:39:16 -0700
commite0e31e67a809d00c99edaa299531c7ce4d4750dc (patch)
tree0f890277ef14c748faed4fecb7f8b8d4edeb9849
parentb4e02ec525a6ec87b26686202307896faf3296a7 (diff)
parent081ee6fe349b519da8ea54cf3cdc17d2b15c5a71 (diff)
downloadseaweedfs-e0e31e67a809d00c99edaa299531c7ce4d4750dc.tar.xz
seaweedfs-e0e31e67a809d00c99edaa299531c7ce4d4750dc.zip
Merge pull request #1318 from chrislusf/msg_channel
Add messaging, add channel
-rw-r--r--go.mod2
-rw-r--r--go.sum2
-rw-r--r--other/java/client/src/main/proto/filer.proto28
-rw-r--r--weed/command/benchmark.go2
-rw-r--r--weed/command/filer_copy.go2
-rw-r--r--weed/command/msg_broker.go8
-rw-r--r--weed/filer2/reader_at.go2
-rw-r--r--weed/filer2/stream.go2
-rw-r--r--weed/messaging/broker/broker_append.go4
-rw-r--r--weed/messaging/broker/broker_grpc_server.go22
-rw-r--r--weed/messaging/broker/broker_grpc_server_discovery.go116
-rw-r--r--weed/messaging/broker/broker_grpc_server_publish.go56
-rw-r--r--weed/messaging/broker/broker_grpc_server_subscribe.go55
-rw-r--r--weed/messaging/broker/broker_server.go96
-rw-r--r--weed/messaging/broker/consistent_distribution.go38
-rw-r--r--weed/messaging/broker/consistent_distribution_test.go32
-rw-r--r--weed/messaging/broker/topic_lock.go103
-rw-r--r--weed/messaging/broker/topic_manager.go123
-rw-r--r--weed/messaging/client/client.go30
-rw-r--r--weed/messaging/client/subscriber.go91
-rw-r--r--weed/messaging/msgclient/client.go55
-rw-r--r--weed/messaging/msgclient/config.go63
-rw-r--r--weed/messaging/msgclient/pub_chan.go76
-rw-r--r--weed/messaging/msgclient/publisher.go (renamed from weed/messaging/client/publisher.go)31
-rw-r--r--weed/messaging/msgclient/sub_chan.go85
-rw-r--r--weed/messaging/msgclient/subscriber.go120
-rw-r--r--weed/operation/submit.go2
-rw-r--r--weed/pb/filer.proto28
-rw-r--r--weed/pb/filer_pb/filer.pb.go474
-rw-r--r--weed/pb/filer_pb/filer_client.go4
-rw-r--r--weed/pb/messaging.proto30
-rw-r--r--weed/pb/messaging_pb/messaging.pb.go345
-rw-r--r--weed/pb/volume_server_pb/volume_server.pb.go26
-rw-r--r--weed/server/filer_grpc_server.go72
-rw-r--r--weed/server/filer_grpc_server_listen.go2
-rw-r--r--weed/server/filer_server.go7
-rw-r--r--weed/util/log_buffer/log_buffer.go21
-rw-r--r--weed/util/log_buffer/log_read.go8
-rw-r--r--weed/util/log_buffer/sealed_buffer.go2
39 files changed, 1683 insertions, 582 deletions
diff --git a/go.mod b/go.mod
index 96ea8fd3d..2ba8658d3 100644
--- a/go.mod
+++ b/go.mod
@@ -10,6 +10,8 @@ require (
github.com/OneOfOne/xxhash v1.2.2
github.com/Shopify/sarama v1.23.1
github.com/aws/aws-sdk-go v1.23.13
+ github.com/buraksezer/consistent v0.0.0-20191006190839-693edf70fd72
+ github.com/cespare/xxhash v1.1.0
github.com/chrislusf/raft v0.0.0-20190225081310-10d6e2182d92
github.com/coreos/bbolt v1.3.3 // indirect
github.com/coreos/etcd v3.3.15+incompatible // indirect
diff --git a/go.sum b/go.sum
index a382daf10..00329907e 100644
--- a/go.sum
+++ b/go.sum
@@ -58,6 +58,8 @@ github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 h1:mXoPYz/Ul5HYE
github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k=
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY=
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
+github.com/buraksezer/consistent v0.0.0-20191006190839-693edf70fd72 h1:fUmDBbSvv1uOzo/t8WaxZMVb7BxJ8JECo5lGoR9c5bA=
+github.com/buraksezer/consistent v0.0.0-20191006190839-693edf70fd72/go.mod h1:OEE5igu/CDjGegM1Jn6ZMo7R6LlV/JChAkjfQQIRLpg=
github.com/census-instrumentation/opencensus-proto v0.2.0 h1:LzQXZOgg4CQfE6bFvXGM30YZL1WW/M337pXml+GrcZ4=
github.com/census-instrumentation/opencensus-proto v0.2.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/census-instrumentation/opencensus-proto v0.2.1 h1:glEXhBS5PSLLv4IXzLA5yPRVX4bilULVyxxbrfOtDAk=
diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto
index 6d890861d..1fc8ef63d 100644
--- a/other/java/client/src/main/proto/filer.proto
+++ b/other/java/client/src/main/proto/filer.proto
@@ -48,6 +48,12 @@ service SeaweedFiler {
rpc SubscribeMetadata (SubscribeMetadataRequest) returns (stream SubscribeMetadataResponse) {
}
+ rpc KeepConnected (stream KeepConnectedRequest) returns (stream KeepConnectedResponse) {
+ }
+
+ rpc LocateBroker (LocateBrokerRequest) returns (LocateBrokerResponse) {
+ }
+
}
//////////////////////////////////////////////////
@@ -261,3 +267,25 @@ message LogEntry {
int32 partition_key_hash = 2;
bytes data = 3;
}
+
+message KeepConnectedRequest {
+ string name = 1;
+ uint32 grpc_port = 2;
+ repeated string resources = 3;
+}
+message KeepConnectedResponse {
+}
+
+message LocateBrokerRequest {
+ string resource = 1;
+}
+message LocateBrokerResponse {
+ bool found = 1;
+ // if found, send the exact address
+ // if not found, send the full list of existing brokers
+ message Resource {
+ string grpc_addresses = 1;
+ int32 resource_count = 2;
+ }
+ repeated Resource resources = 2;
+}
diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go
index 5b2e31622..ee21914b3 100644
--- a/weed/command/benchmark.go
+++ b/weed/command/benchmark.go
@@ -232,7 +232,7 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
Reader: &FakeReader{id: uint64(id), size: fileSize, random: random},
FileSize: fileSize,
MimeType: "image/bench", // prevent gzip benchmark content
- Fsync: *b.fsync,
+ Fsync: *b.fsync,
}
ar := &operation.VolumeAssignRequest{
Count: 1,
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go
index 322ab20d5..2d6ba94d6 100644
--- a/weed/command/filer_copy.go
+++ b/weed/command/filer_copy.go
@@ -428,7 +428,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
uploadError = fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
return
}
- chunksChan <- uploadResult.ToPbFileChunk(assignResult.FileId, i * chunkSize)
+ chunksChan <- uploadResult.ToPbFileChunk(assignResult.FileId, i*chunkSize)
fmt.Printf("uploaded %s-%d to %s [%d,%d)\n", fileName, i+1, targetUrl, i*chunkSize, i*chunkSize+int64(uploadResult.Size))
}(i)
diff --git a/weed/command/msg_broker.go b/weed/command/msg_broker.go
index 984497c2e..bab1083ab 100644
--- a/weed/command/msg_broker.go
+++ b/weed/command/msg_broker.go
@@ -6,9 +6,10 @@ import (
"strconv"
"time"
- "github.com/chrislusf/seaweedfs/weed/util/grace"
"google.golang.org/grpc/reflection"
+ "github.com/chrislusf/seaweedfs/weed/util/grace"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/messaging/broker"
"github.com/chrislusf/seaweedfs/weed/pb"
@@ -24,6 +25,7 @@ var (
type QueueOptions struct {
filer *string
+ ip *string
port *int
cpuprofile *string
memprofile *string
@@ -32,7 +34,8 @@ type QueueOptions struct {
func init() {
cmdMsgBroker.Run = runMsgBroker // break init cycle
messageBrokerStandaloneOptions.filer = cmdMsgBroker.Flag.String("filer", "localhost:8888", "filer server address")
- messageBrokerStandaloneOptions.port = cmdMsgBroker.Flag.Int("port", 17777, "queue server gRPC listen port")
+ messageBrokerStandaloneOptions.ip = cmdMsgBroker.Flag.String("ip", util.DetectedHostAddress(), "broker host address")
+ messageBrokerStandaloneOptions.port = cmdMsgBroker.Flag.Int("port", 17777, "broker gRPC listen port")
messageBrokerStandaloneOptions.cpuprofile = cmdMsgBroker.Flag.String("cpuprofile", "", "cpu profile output file")
messageBrokerStandaloneOptions.memprofile = cmdMsgBroker.Flag.String("memprofile", "", "memory profile output file")
}
@@ -91,6 +94,7 @@ func (msgBrokerOpt *QueueOptions) startQueueServer() bool {
Filers: []string{*msgBrokerOpt.filer},
DefaultReplication: "",
MaxMB: 0,
+ Ip: *msgBrokerOpt.ip,
Port: *msgBrokerOpt.port,
Cipher: cipher,
}, grpcDialOption)
diff --git a/weed/filer2/reader_at.go b/weed/filer2/reader_at.go
index a9772da5b..53fff7672 100644
--- a/weed/filer2/reader_at.go
+++ b/weed/filer2/reader_at.go
@@ -59,7 +59,7 @@ func LookupFn(filerClient filer_pb.FilerClient) LookupFileIdFunctionType {
func NewChunkReaderAtFromClient(filerClient filer_pb.FilerClient, chunkViews []*ChunkView, chunkCache *chunk_cache.ChunkCache) *ChunkReadAt {
return &ChunkReadAt{
- chunkViews: chunkViews,
+ chunkViews: chunkViews,
lookupFileId: LookupFn(filerClient),
bufferOffset: -1,
chunkCache: chunkCache,
diff --git a/weed/filer2/stream.go b/weed/filer2/stream.go
index 4c8213b07..033a8dd13 100644
--- a/weed/filer2/stream.go
+++ b/weed/filer2/stream.go
@@ -103,7 +103,7 @@ func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.F
chunkViews := ViewFromChunks(chunks, 0, math.MaxInt32)
return &ChunkStreamReader{
- chunkViews: chunkViews,
+ chunkViews: chunkViews,
lookupFileId: LookupFn(filerClient),
}
}
diff --git a/weed/messaging/broker/broker_append.go b/weed/messaging/broker/broker_append.go
index e87e197b0..80f107e00 100644
--- a/weed/messaging/broker/broker_append.go
+++ b/weed/messaging/broker/broker_append.go
@@ -3,6 +3,7 @@ package broker
import (
"context"
"fmt"
+ "io"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
@@ -94,6 +95,9 @@ func (broker *MessageBroker) WithFilerClient(fn func(filer_pb.SeaweedFilerClient
for _, filer := range broker.option.Filers {
if err = pb.WithFilerClient(filer, broker.grpcDialOption, fn); err != nil {
+ if err == io.EOF {
+ return
+ }
glog.V(0).Infof("fail to connect to %s: %v", filer, err)
} else {
break
diff --git a/weed/messaging/broker/broker_grpc_server.go b/weed/messaging/broker/broker_grpc_server.go
index 447620a6b..6918a28a6 100644
--- a/weed/messaging/broker/broker_grpc_server.go
+++ b/weed/messaging/broker/broker_grpc_server.go
@@ -2,7 +2,10 @@ package broker
import (
"context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
)
@@ -10,6 +13,25 @@ func (broker *MessageBroker) ConfigureTopic(c context.Context, request *messagin
panic("implement me")
}
+func (broker *MessageBroker) DeleteTopic(c context.Context, request *messaging_pb.DeleteTopicRequest) (*messaging_pb.DeleteTopicResponse, error) {
+ resp := &messaging_pb.DeleteTopicResponse{}
+ dir, entry := genTopicDirEntry(request.Namespace, request.Topic)
+ if exists, err := filer_pb.Exists(broker, dir, entry, true); err != nil {
+ return nil, err
+ } else if exists {
+ err = filer_pb.Remove(broker, dir, entry, true, true, true)
+ }
+ return resp, nil
+}
+
func (broker *MessageBroker) GetTopicConfiguration(c context.Context, request *messaging_pb.GetTopicConfigurationRequest) (*messaging_pb.GetTopicConfigurationResponse, error) {
panic("implement me")
}
+
+func genTopicDir(namespace, topic string) string {
+ return fmt.Sprintf("%s/%s/%s", filer2.TopicsDir, namespace, topic)
+}
+
+func genTopicDirEntry(namespace, topic string) (dir, entry string) {
+ return fmt.Sprintf("%s/%s", filer2.TopicsDir, namespace), topic
+}
diff --git a/weed/messaging/broker/broker_grpc_server_discovery.go b/weed/messaging/broker/broker_grpc_server_discovery.go
new file mode 100644
index 000000000..3c14f3220
--- /dev/null
+++ b/weed/messaging/broker/broker_grpc_server_discovery.go
@@ -0,0 +1,116 @@
+package broker
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+)
+
+/*
+Topic discovery:
+
+When pub or sub connects, it ask for the whole broker list, and run consistent hashing to find the broker.
+
+The broker will check peers whether it is already hosted by some other broker, if that broker is alive and acknowledged alive, redirect to it.
+Otherwise, just host the topic.
+
+So, if the pub or sub connects around the same time, they would connect to the same broker. Everyone is happy.
+If one of the pub or sub connects very late, and the system topo changed quite a bit with new servers added or old servers died, checking peers will help.
+
+*/
+
+func (broker *MessageBroker) FindBroker(c context.Context, request *messaging_pb.FindBrokerRequest) (*messaging_pb.FindBrokerResponse, error) {
+
+ t := &messaging_pb.FindBrokerResponse{}
+ var peers []string
+
+ targetTopicPartition := fmt.Sprintf(TopicPartitionFmt, request.Namespace, request.Topic, request.Parition)
+
+ for _, filer := range broker.option.Filers {
+ err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error {
+ resp, err := client.LocateBroker(context.Background(), &filer_pb.LocateBrokerRequest{
+ Resource: targetTopicPartition,
+ })
+ if err != nil {
+ return err
+ }
+ if resp.Found && len(resp.Resources) > 0 {
+ t.Broker = resp.Resources[0].GrpcAddresses
+ return nil
+ }
+ for _, b := range resp.Resources {
+ peers = append(peers, b.GrpcAddresses)
+ }
+ return nil
+ })
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ t.Broker = PickMember(peers, []byte(targetTopicPartition))
+
+ return t, nil
+
+}
+
+func (broker *MessageBroker) checkFilers() {
+
+ // contact a filer about masters
+ var masters []string
+ found := false
+ for !found {
+ for _, filer := range broker.option.Filers {
+ err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error {
+ resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
+ if err != nil {
+ return err
+ }
+ masters = append(masters, resp.Masters...)
+ return nil
+ })
+ if err == nil {
+ found = true
+ break
+ }
+ glog.V(0).Infof("failed to read masters from %+v: %v", broker.option.Filers, err)
+ time.Sleep(time.Second)
+ }
+ }
+ glog.V(0).Infof("received master list: %s", masters)
+
+ // contact each masters for filers
+ var filers []string
+ found = false
+ for !found {
+ for _, master := range masters {
+ err := broker.withMasterClient(master, func(client master_pb.SeaweedClient) error {
+ resp, err := client.ListMasterClients(context.Background(), &master_pb.ListMasterClientsRequest{
+ ClientType: "filer",
+ })
+ if err != nil {
+ return err
+ }
+
+ filers = append(filers, resp.GrpcAddresses...)
+
+ return nil
+ })
+ if err == nil {
+ found = true
+ break
+ }
+ glog.V(0).Infof("failed to list filers: %v", err)
+ time.Sleep(time.Second)
+ }
+ }
+ glog.V(0).Infof("received filer list: %s", filers)
+
+ broker.option.Filers = filers
+
+}
diff --git a/weed/messaging/broker/broker_grpc_server_publish.go b/weed/messaging/broker/broker_grpc_server_publish.go
index b3a909a6c..dc11061af 100644
--- a/weed/messaging/broker/broker_grpc_server_publish.go
+++ b/weed/messaging/broker/broker_grpc_server_publish.go
@@ -1,11 +1,15 @@
package broker
import (
+ "crypto/md5"
+ "fmt"
"io"
"github.com/golang/protobuf/proto"
+ "github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
)
@@ -44,27 +48,24 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis
Topic: in.Init.Topic,
Partition: in.Init.Partition,
}
- tl := broker.topicLocks.RequestLock(tp, topicConfig, true)
- defer broker.topicLocks.ReleaseLock(tp, true)
-
- updatesChan := make(chan int32)
-
- go func() {
- for update := range updatesChan {
- if err := stream.Send(&messaging_pb.PublishResponse{
- Config: &messaging_pb.PublishResponse_ConfigMessage{
- PartitionCount: update,
- },
- }); err != nil {
- glog.V(0).Infof("err sending publish response: %v", err)
- return
- }
- }
- }()
+ tpDir := fmt.Sprintf("%s/%s/%s", filer2.TopicsDir, tp.Namespace, tp.Topic)
+ md5File := fmt.Sprintf("p%02d.md5", tp.Partition)
+ // println("chan data stored under", tpDir, "as", md5File)
+
+ if exists, err := filer_pb.Exists(broker, tpDir, md5File, false); err == nil && exists {
+ return fmt.Errorf("channel is already closed")
+ }
+
+ tl := broker.topicManager.RequestLock(tp, topicConfig, true)
+ defer broker.topicManager.ReleaseLock(tp, true)
+
+ md5hash := md5.New()
// process each message
for {
+ // println("recv")
in, err := stream.Recv()
+ // glog.V(0).Infof("recieved %v err: %v", in, err)
if err == io.EOF {
return nil
}
@@ -86,5 +87,26 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis
tl.logBuffer.AddToBuffer(in.Data.Key, data)
+ if in.Data.IsClose {
+ // println("server received closing")
+ break
+ }
+
+ md5hash.Write(in.Data.Value)
+
+ }
+
+ if err := broker.appendToFile(tpDir+"/"+md5File, topicConfig, md5hash.Sum(nil)); err != nil {
+ glog.V(0).Infof("err writing %s: %v", md5File, err)
}
+
+ // fmt.Printf("received md5 %X\n", md5hash.Sum(nil))
+
+ // send the close ack
+ // println("server send ack closing")
+ if err := stream.Send(&messaging_pb.PublishResponse{IsClosed: true}); err != nil {
+ glog.V(0).Infof("err sending close response: %v", err)
+ }
+ return nil
+
}
diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go
index 379063eed..9538d3063 100644
--- a/weed/messaging/broker/broker_grpc_server_subscribe.go
+++ b/weed/messaging/broker/broker_grpc_server_subscribe.go
@@ -25,32 +25,39 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
return err
}
+ var processedTsNs int64
var messageCount int64
subscriberId := in.Init.SubscriberId
- fmt.Printf("+ subscriber %s\n", subscriberId)
- defer func() {
- fmt.Printf("- subscriber %s: %d messages\n", subscriberId, messageCount)
- }()
// TODO look it up
topicConfig := &messaging_pb.TopicConfiguration{
// IsTransient: true,
}
- if err = stream.Send(&messaging_pb.BrokerMessage{
- Redirect: nil,
- }); err != nil {
- return err
- }
-
// get lock
tp := TopicPartition{
Namespace: in.Init.Namespace,
Topic: in.Init.Topic,
Partition: in.Init.Partition,
}
- lock := broker.topicLocks.RequestLock(tp, topicConfig, false)
- defer broker.topicLocks.ReleaseLock(tp, false)
+ fmt.Printf("+ subscriber %s for %s\n", subscriberId, tp.String())
+ defer func() {
+ fmt.Printf("- subscriber %s for %s %d messages last %v\n", subscriberId, tp.String(), messageCount, time.Unix(0, processedTsNs))
+ }()
+
+ lock := broker.topicManager.RequestLock(tp, topicConfig, false)
+ defer broker.topicManager.ReleaseLock(tp, false)
+
+ isConnected := true
+ go func() {
+ for isConnected {
+ if _, err := stream.Recv(); err != nil {
+ // println("disconnecting connection to", subscriberId, tp.String())
+ isConnected = false
+ lock.cond.Signal()
+ }
+ }
+ }()
lastReadTime := time.Now()
switch in.Init.StartPosition {
@@ -58,8 +65,8 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
lastReadTime = time.Unix(0, in.Init.TimestampNs)
case messaging_pb.SubscriberMessage_InitMessage_LATEST:
case messaging_pb.SubscriberMessage_InitMessage_EARLIEST:
+ lastReadTime = time.Unix(0, 0)
}
- var processedTsNs int64
// how to process each message
// an error returned will end the subscription
@@ -84,23 +91,33 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
glog.Errorf("sending %d bytes to %s: %s", len(m.Value), subscriberId, err)
return err
}
+ if m.IsClose {
+ // println("processed EOF")
+ return io.EOF
+ }
processedTsNs = logEntry.TsNs
+ messageCount++
return nil
}
if err := broker.readPersistedLogBuffer(&tp, lastReadTime, eachLogEntryFn); err != nil {
- return err
+ if err != io.EOF {
+ // println("stopping from persisted logs", err.Error())
+ return err
+ }
}
if processedTsNs != 0 {
lastReadTime = time.Unix(0, processedTsNs)
}
- messageCount, err = lock.logBuffer.LoopProcessLogData(lastReadTime, func() bool {
+ // fmt.Printf("subscriber %s read %d on disk log %v\n", subscriberId, messageCount, lastReadTime)
+
+ err = lock.logBuffer.LoopProcessLogData(lastReadTime, func() bool {
lock.Mutex.Lock()
lock.cond.Wait()
lock.Mutex.Unlock()
- return true
+ return isConnected
}, eachLogEntryFn)
return err
@@ -114,7 +131,7 @@ func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTim
sizeBuf := make([]byte, 4)
startTsNs := startTime.UnixNano()
- topicDir := fmt.Sprintf("/topics/%s/%s", tp.Namespace, tp.Topic)
+ topicDir := genTopicDir(tp.Namespace, tp.Topic)
partitionSuffix := fmt.Sprintf(".part%02d", tp.Partition)
return filer_pb.List(broker, topicDir, "", func(dayEntry *filer_pb.Entry, isLast bool) error {
@@ -125,7 +142,7 @@ func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTim
return nil
}
}
- if !strings.HasSuffix(hourMinuteEntry.Name, partitionSuffix){
+ if !strings.HasSuffix(hourMinuteEntry.Name, partitionSuffix) {
return nil
}
// println("partition", tp.Partition, "processing", dayDir, "/", hourMinuteEntry.Name)
@@ -134,7 +151,7 @@ func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTim
if err := filer2.ReadEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, eachLogEntryFn); err != nil {
chunkedFileReader.Close()
if err == io.EOF {
- return nil
+ return err
}
return fmt.Errorf("reading %s/%s: %v", dayDir, hourMinuteEntry.Name, err)
}
diff --git a/weed/messaging/broker/broker_server.go b/weed/messaging/broker/broker_server.go
index 29c227274..0c04d2841 100644
--- a/weed/messaging/broker/broker_server.go
+++ b/weed/messaging/broker/broker_server.go
@@ -16,6 +16,7 @@ type MessageBrokerOption struct {
Filers []string
DefaultReplication string
MaxMB int
+ Ip string
Port int
Cipher bool
}
@@ -23,7 +24,7 @@ type MessageBrokerOption struct {
type MessageBroker struct {
option *MessageBrokerOption
grpcDialOption grpc.DialOption
- topicLocks *TopicLocks
+ topicManager *TopicManager
}
func NewMessageBroker(option *MessageBrokerOption, grpcDialOption grpc.DialOption) (messageBroker *MessageBroker, err error) {
@@ -33,77 +34,66 @@ func NewMessageBroker(option *MessageBrokerOption, grpcDialOption grpc.DialOptio
grpcDialOption: grpcDialOption,
}
- messageBroker.topicLocks = NewTopicLocks(messageBroker)
+ messageBroker.topicManager = NewTopicManager(messageBroker)
- messageBroker.checkPeers()
+ messageBroker.checkFilers()
- // go messageBroker.loopForEver()
+ go messageBroker.keepConnectedToOneFiler()
return messageBroker, nil
}
-func (broker *MessageBroker) loopForEver() {
+func (broker *MessageBroker) keepConnectedToOneFiler() {
for {
- broker.checkPeers()
- time.Sleep(3 * time.Second)
- }
-
-}
-
-func (broker *MessageBroker) checkPeers() {
-
- // contact a filer about masters
- var masters []string
- found := false
- for !found {
for _, filer := range broker.option.Filers {
- err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error {
- resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
+ broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error {
+ stream, err := client.KeepConnected(context.Background())
if err != nil {
+ glog.V(0).Infof("%s:%d failed to keep connected to %s: %v", broker.option.Ip, broker.option.Port, filer, err)
return err
}
- masters = append(masters, resp.Masters...)
- return nil
- })
- if err == nil {
- found = true
- break
- }
- glog.V(0).Infof("failed to read masters from %+v: %v", broker.option.Filers, err)
- time.Sleep(time.Second)
- }
- }
- glog.V(0).Infof("received master list: %s", masters)
-
- // contact each masters for filers
- var filers []string
- found = false
- for !found {
- for _, master := range masters {
- err := broker.withMasterClient(master, func(client master_pb.SeaweedClient) error {
- resp, err := client.ListMasterClients(context.Background(), &master_pb.ListMasterClientsRequest{
- ClientType: "filer",
- })
- if err != nil {
+
+ initRequest := &filer_pb.KeepConnectedRequest{
+ Name: broker.option.Ip,
+ GrpcPort: uint32(broker.option.Port),
+ }
+ for _, tp := range broker.topicManager.ListTopicPartitions() {
+ initRequest.Resources = append(initRequest.Resources, tp.String())
+ }
+ if err := stream.Send(&filer_pb.KeepConnectedRequest{
+ Name: broker.option.Ip,
+ GrpcPort: uint32(broker.option.Port),
+ }); err != nil {
+ glog.V(0).Infof("broker %s:%d failed to init at %s: %v", broker.option.Ip, broker.option.Port, filer, err)
return err
}
- filers = append(filers, resp.GrpcAddresses...)
-
+ // TODO send events of adding/removing topics
+
+ glog.V(0).Infof("conntected with filer: %v", filer)
+ for {
+ if err := stream.Send(&filer_pb.KeepConnectedRequest{
+ Name: broker.option.Ip,
+ GrpcPort: uint32(broker.option.Port),
+ }); err != nil {
+ glog.V(0).Infof("%s:%d failed to sendto %s: %v", broker.option.Ip, broker.option.Port, filer, err)
+ return err
+ }
+ // println("send heartbeat")
+ if _, err := stream.Recv(); err != nil {
+ glog.V(0).Infof("%s:%d failed to receive from %s: %v", broker.option.Ip, broker.option.Port, filer, err)
+ return err
+ }
+ // println("received reply")
+ time.Sleep(11 * time.Second)
+ // println("woke up")
+ }
return nil
})
- if err == nil {
- found = true
- break
- }
- glog.V(0).Infof("failed to list filers: %v", err)
- time.Sleep(time.Second)
+ time.Sleep(3 * time.Second)
}
}
- glog.V(0).Infof("received filer list: %s", filers)
-
- broker.option.Filers = filers
}
diff --git a/weed/messaging/broker/consistent_distribution.go b/weed/messaging/broker/consistent_distribution.go
new file mode 100644
index 000000000..465a2a8f2
--- /dev/null
+++ b/weed/messaging/broker/consistent_distribution.go
@@ -0,0 +1,38 @@
+package broker
+
+import (
+ "github.com/buraksezer/consistent"
+ "github.com/cespare/xxhash"
+)
+
+type Member string
+
+func (m Member) String() string {
+ return string(m)
+}
+
+type hasher struct{}
+
+func (h hasher) Sum64(data []byte) uint64 {
+ return xxhash.Sum64(data)
+}
+
+func PickMember(members []string, key []byte) string {
+ cfg := consistent.Config{
+ PartitionCount: 9791,
+ ReplicationFactor: 2,
+ Load: 1.25,
+ Hasher: hasher{},
+ }
+
+ cmembers := []consistent.Member{}
+ for _, m := range members {
+ cmembers = append(cmembers, Member(m))
+ }
+
+ c := consistent.New(cmembers, cfg)
+
+ m := c.LocateKey(key)
+
+ return m.String()
+}
diff --git a/weed/messaging/broker/consistent_distribution_test.go b/weed/messaging/broker/consistent_distribution_test.go
new file mode 100644
index 000000000..f58fe4e0e
--- /dev/null
+++ b/weed/messaging/broker/consistent_distribution_test.go
@@ -0,0 +1,32 @@
+package broker
+
+import (
+ "fmt"
+ "testing"
+)
+
+func TestPickMember(t *testing.T) {
+
+ servers := []string{
+ "s1:port",
+ "s2:port",
+ "s3:port",
+ "s5:port",
+ "s4:port",
+ }
+
+ total := 1000
+
+ distribution := make(map[string]int)
+ for i := 0; i < total; i++ {
+ tp := fmt.Sprintf("tp:%2d", i)
+ m := PickMember(servers, []byte(tp))
+ // println(tp, "=>", m)
+ distribution[m]++
+ }
+
+ for member, count := range distribution {
+ fmt.Printf("member: %s, key count: %d load=%.2f\n", member, count, float64(count*100)/float64(total/len(servers)))
+ }
+
+}
diff --git a/weed/messaging/broker/topic_lock.go b/weed/messaging/broker/topic_lock.go
deleted file mode 100644
index f8a5aa171..000000000
--- a/weed/messaging/broker/topic_lock.go
+++ /dev/null
@@ -1,103 +0,0 @@
-package broker
-
-import (
- "fmt"
- "sync"
- "time"
-
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
- "github.com/chrislusf/seaweedfs/weed/util/log_buffer"
-)
-
-type TopicPartition struct {
- Namespace string
- Topic string
- Partition int32
-}
-type TopicLock struct {
- sync.Mutex
- cond *sync.Cond
- subscriberCount int
- publisherCount int
- logBuffer *log_buffer.LogBuffer
-}
-
-type TopicLocks struct {
- sync.Mutex
- locks map[TopicPartition]*TopicLock
- broker *MessageBroker
-}
-
-func NewTopicLocks(messageBroker *MessageBroker) *TopicLocks {
- return &TopicLocks{
- locks: make(map[TopicPartition]*TopicLock),
- broker: messageBroker,
- }
-}
-
-func (locks *TopicLocks) buildLogBuffer(tl *TopicLock, tp TopicPartition, topicConfig *messaging_pb.TopicConfiguration) *log_buffer.LogBuffer {
-
- flushFn := func(startTime, stopTime time.Time, buf []byte) {
-
- if topicConfig.IsTransient {
- // return
- }
-
- // fmt.Printf("flushing with topic config %+v\n", topicConfig)
-
- targetFile := fmt.Sprintf(
- "%s/%s/%s/%04d-%02d-%02d/%02d-%02d.part%02d",
- filer2.TopicsDir, tp.Namespace, tp.Topic,
- startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(),
- tp.Partition,
- )
-
- if err := locks.broker.appendToFile(targetFile, topicConfig, buf); err != nil {
- glog.V(0).Infof("log write failed %s: %v", targetFile, err)
- }
- }
- logBuffer := log_buffer.NewLogBuffer(time.Minute, flushFn, func() {
- tl.cond.Broadcast()
- })
-
- return logBuffer
-}
-
-func (tl *TopicLocks) RequestLock(partition TopicPartition, topicConfig *messaging_pb.TopicConfiguration, isPublisher bool) *TopicLock {
- tl.Lock()
- defer tl.Unlock()
-
- lock, found := tl.locks[partition]
- if !found {
- lock = &TopicLock{}
- lock.cond = sync.NewCond(&lock.Mutex)
- tl.locks[partition] = lock
- lock.logBuffer = tl.buildLogBuffer(lock, partition, topicConfig)
- }
- if isPublisher {
- lock.publisherCount++
- } else {
- lock.subscriberCount++
- }
- return lock
-}
-
-func (tl *TopicLocks) ReleaseLock(partition TopicPartition, isPublisher bool) {
- tl.Lock()
- defer tl.Unlock()
-
- lock, found := tl.locks[partition]
- if !found {
- return
- }
- if isPublisher {
- lock.publisherCount--
- } else {
- lock.subscriberCount--
- }
- if lock.subscriberCount <= 0 && lock.publisherCount <= 0 {
- delete(tl.locks, partition)
- }
-}
diff --git a/weed/messaging/broker/topic_manager.go b/weed/messaging/broker/topic_manager.go
new file mode 100644
index 000000000..b563fffa1
--- /dev/null
+++ b/weed/messaging/broker/topic_manager.go
@@ -0,0 +1,123 @@
+package broker
+
+import (
+ "fmt"
+ "sync"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+ "github.com/chrislusf/seaweedfs/weed/util/log_buffer"
+)
+
+type TopicPartition struct {
+ Namespace string
+ Topic string
+ Partition int32
+}
+
+const (
+ TopicPartitionFmt = "%s/%s_%02d"
+)
+
+func (tp *TopicPartition) String() string {
+ return fmt.Sprintf(TopicPartitionFmt, tp.Namespace, tp.Topic, tp.Partition)
+}
+
+type TopicControl struct {
+ sync.Mutex
+ cond *sync.Cond
+ subscriberCount int
+ publisherCount int
+ logBuffer *log_buffer.LogBuffer
+}
+
+type TopicManager struct {
+ sync.Mutex
+ topicControls map[TopicPartition]*TopicControl
+ broker *MessageBroker
+}
+
+func NewTopicManager(messageBroker *MessageBroker) *TopicManager {
+ return &TopicManager{
+ topicControls: make(map[TopicPartition]*TopicControl),
+ broker: messageBroker,
+ }
+}
+
+func (tm *TopicManager) buildLogBuffer(tl *TopicControl, tp TopicPartition, topicConfig *messaging_pb.TopicConfiguration) *log_buffer.LogBuffer {
+
+ flushFn := func(startTime, stopTime time.Time, buf []byte) {
+
+ if topicConfig.IsTransient {
+ // return
+ }
+
+ // fmt.Printf("flushing with topic config %+v\n", topicConfig)
+
+ targetFile := fmt.Sprintf(
+ "%s/%s/%s/%04d-%02d-%02d/%02d-%02d.part%02d",
+ filer2.TopicsDir, tp.Namespace, tp.Topic,
+ startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(),
+ tp.Partition,
+ )
+
+ if err := tm.broker.appendToFile(targetFile, topicConfig, buf); err != nil {
+ glog.V(0).Infof("log write failed %s: %v", targetFile, err)
+ }
+ }
+ logBuffer := log_buffer.NewLogBuffer(time.Minute, flushFn, func() {
+ tl.cond.Broadcast()
+ })
+
+ return logBuffer
+}
+
+func (tm *TopicManager) RequestLock(partition TopicPartition, topicConfig *messaging_pb.TopicConfiguration, isPublisher bool) *TopicControl {
+ tm.Lock()
+ defer tm.Unlock()
+
+ tc, found := tm.topicControls[partition]
+ if !found {
+ tc = &TopicControl{}
+ tc.cond = sync.NewCond(&tc.Mutex)
+ tm.topicControls[partition] = tc
+ tc.logBuffer = tm.buildLogBuffer(tc, partition, topicConfig)
+ }
+ if isPublisher {
+ tc.publisherCount++
+ } else {
+ tc.subscriberCount++
+ }
+ return tc
+}
+
+func (tm *TopicManager) ReleaseLock(partition TopicPartition, isPublisher bool) {
+ tm.Lock()
+ defer tm.Unlock()
+
+ lock, found := tm.topicControls[partition]
+ if !found {
+ return
+ }
+ if isPublisher {
+ lock.publisherCount--
+ } else {
+ lock.subscriberCount--
+ }
+ if lock.subscriberCount <= 0 && lock.publisherCount <= 0 {
+ delete(tm.topicControls, partition)
+ lock.logBuffer.Shutdown()
+ }
+}
+
+func (tm *TopicManager) ListTopicPartitions() (tps []TopicPartition) {
+ tm.Lock()
+ defer tm.Unlock()
+
+ for k := range tm.topicControls {
+ tps = append(tps, k)
+ }
+ return
+}
diff --git a/weed/messaging/client/client.go b/weed/messaging/client/client.go
deleted file mode 100644
index 4a674a9fc..000000000
--- a/weed/messaging/client/client.go
+++ /dev/null
@@ -1,30 +0,0 @@
-package client
-
-import (
- "context"
-
- "google.golang.org/grpc"
-
- "github.com/chrislusf/seaweedfs/weed/pb"
- "github.com/chrislusf/seaweedfs/weed/security"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-type MessagingClient struct {
- bootstrapBrokers []string
- grpcConnection *grpc.ClientConn
-}
-
-func NewMessagingClient(bootstrapBrokers []string) (*MessagingClient, error) {
- grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.msg_client")
-
- grpcConnection, err := pb.GrpcDial(context.Background(), "localhost:17777", grpcDialOption)
- if err != nil {
- return nil, err
- }
-
- return &MessagingClient{
- bootstrapBrokers: bootstrapBrokers,
- grpcConnection: grpcConnection,
- }, nil
-}
diff --git a/weed/messaging/client/subscriber.go b/weed/messaging/client/subscriber.go
deleted file mode 100644
index 53e7ffc7d..000000000
--- a/weed/messaging/client/subscriber.go
+++ /dev/null
@@ -1,91 +0,0 @@
-package client
-
-import (
- "context"
- "io"
- "time"
-
- "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
-)
-
-type Subscriber struct {
- subscriberClients []messaging_pb.SeaweedMessaging_SubscribeClient
- subscriberId string
-}
-
-func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, startTime time.Time) (*Subscriber, error) {
- // read topic configuration
- topicConfiguration := &messaging_pb.TopicConfiguration{
- PartitionCount: 4,
- }
- subscriberClients := make([]messaging_pb.SeaweedMessaging_SubscribeClient, topicConfiguration.PartitionCount)
-
- for i := 0; i < int(topicConfiguration.PartitionCount); i++ {
- client, err := mc.setupSubscriberClient(subscriberId, namespace, topic, int32(i), startTime)
- if err != nil {
- return nil, err
- }
- subscriberClients[i] = client
- }
-
- return &Subscriber{
- subscriberClients: subscriberClients,
- subscriberId: subscriberId,
- }, nil
-}
-
-func (mc *MessagingClient) setupSubscriberClient(subscriberId, namespace, topic string, partition int32, startTime time.Time) (messaging_pb.SeaweedMessaging_SubscribeClient, error) {
-
- stream, err := messaging_pb.NewSeaweedMessagingClient(mc.grpcConnection).Subscribe(context.Background())
- if err != nil {
- return nil, err
- }
-
- // send init message
- err = stream.Send(&messaging_pb.SubscriberMessage{
- Init: &messaging_pb.SubscriberMessage_InitMessage{
- Namespace: namespace,
- Topic: topic,
- Partition: partition,
- StartPosition: messaging_pb.SubscriberMessage_InitMessage_TIMESTAMP,
- TimestampNs: startTime.UnixNano(),
- SubscriberId: subscriberId,
- },
- })
- if err != nil {
- return nil, err
- }
-
- // process init response
- initResponse, err := stream.Recv()
- if err != nil {
- return nil, err
- }
- if initResponse.Redirect != nil {
- // TODO follow redirection
- }
-
- return stream, nil
-
-}
-
-func (s *Subscriber) doSubscribe(partition int, processFn func(m *messaging_pb.Message)) error {
- for {
- resp, listenErr := s.subscriberClients[partition].Recv()
- if listenErr == io.EOF {
- return nil
- }
- if listenErr != nil {
- println(listenErr.Error())
- return listenErr
- }
- processFn(resp.Data)
- }
-}
-
-// Subscribe starts goroutines to process the messages
-func (s *Subscriber) Subscribe(processFn func(m *messaging_pb.Message)) {
- for i := 0; i < len(s.subscriberClients); i++ {
- go s.doSubscribe(i, processFn)
- }
-}
diff --git a/weed/messaging/msgclient/client.go b/weed/messaging/msgclient/client.go
new file mode 100644
index 000000000..4d7ef2b8e
--- /dev/null
+++ b/weed/messaging/msgclient/client.go
@@ -0,0 +1,55 @@
+package msgclient
+
+import (
+ "context"
+ "fmt"
+ "log"
+
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/messaging/broker"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+type MessagingClient struct {
+ bootstrapBrokers []string
+ grpcConnections map[broker.TopicPartition]*grpc.ClientConn
+ grpcDialOption grpc.DialOption
+}
+
+func NewMessagingClient(bootstrapBrokers ...string) *MessagingClient {
+ return &MessagingClient{
+ bootstrapBrokers: bootstrapBrokers,
+ grpcConnections: make(map[broker.TopicPartition]*grpc.ClientConn),
+ grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.msg_client"),
+ }
+}
+
+func (mc *MessagingClient) findBroker(tp broker.TopicPartition) (*grpc.ClientConn, error) {
+
+ for _, broker := range mc.bootstrapBrokers {
+ grpcConnection, err := pb.GrpcDial(context.Background(), broker, mc.grpcDialOption)
+ if err != nil {
+ log.Printf("dial broker %s: %v", broker, err)
+ continue
+ }
+ defer grpcConnection.Close()
+
+ resp, err := messaging_pb.NewSeaweedMessagingClient(grpcConnection).FindBroker(context.Background(),
+ &messaging_pb.FindBrokerRequest{
+ Namespace: tp.Namespace,
+ Topic: tp.Topic,
+ Parition: tp.Partition,
+ })
+ if err != nil {
+ return nil, err
+ }
+
+ targetBroker := resp.Broker
+ return pb.GrpcDial(context.Background(), targetBroker, mc.grpcDialOption)
+ }
+ return nil, fmt.Errorf("no broker found for %+v", tp)
+}
diff --git a/weed/messaging/msgclient/config.go b/weed/messaging/msgclient/config.go
new file mode 100644
index 000000000..2b9eba1a8
--- /dev/null
+++ b/weed/messaging/msgclient/config.go
@@ -0,0 +1,63 @@
+package msgclient
+
+import (
+ "context"
+ "log"
+
+ "github.com/chrislusf/seaweedfs/weed/messaging/broker"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+)
+
+func (mc *MessagingClient) configureTopic(tp broker.TopicPartition) error {
+
+ return mc.withAnyBroker(func(client messaging_pb.SeaweedMessagingClient) error {
+ _, err := client.ConfigureTopic(context.Background(),
+ &messaging_pb.ConfigureTopicRequest{
+ Namespace: tp.Namespace,
+ Topic: tp.Topic,
+ Configuration: &messaging_pb.TopicConfiguration{
+ PartitionCount: 0,
+ Collection: "",
+ Replication: "",
+ IsTransient: false,
+ Partitoning: 0,
+ },
+ })
+ return err
+ })
+
+}
+
+func (mc *MessagingClient) DeleteTopic(namespace, topic string) error {
+
+ return mc.withAnyBroker(func(client messaging_pb.SeaweedMessagingClient) error {
+ _, err := client.DeleteTopic(context.Background(),
+ &messaging_pb.DeleteTopicRequest{
+ Namespace: namespace,
+ Topic: topic,
+ })
+ return err
+ })
+}
+
+func (mc *MessagingClient) withAnyBroker(fn func(client messaging_pb.SeaweedMessagingClient) error) error {
+
+ var lastErr error
+ for _, broker := range mc.bootstrapBrokers {
+ grpcConnection, err := pb.GrpcDial(context.Background(), broker, mc.grpcDialOption)
+ if err != nil {
+ log.Printf("dial broker %s: %v", broker, err)
+ continue
+ }
+ defer grpcConnection.Close()
+
+ err = fn(messaging_pb.NewSeaweedMessagingClient(grpcConnection))
+ if err == nil {
+ return nil
+ }
+ lastErr = err
+ }
+
+ return lastErr
+}
diff --git a/weed/messaging/msgclient/pub_chan.go b/weed/messaging/msgclient/pub_chan.go
new file mode 100644
index 000000000..9bc88f7c0
--- /dev/null
+++ b/weed/messaging/msgclient/pub_chan.go
@@ -0,0 +1,76 @@
+package msgclient
+
+import (
+ "crypto/md5"
+ "hash"
+ "io"
+ "log"
+
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/messaging/broker"
+ "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+)
+
+type PubChannel struct {
+ client messaging_pb.SeaweedMessaging_PublishClient
+ grpcConnection *grpc.ClientConn
+ md5hash hash.Hash
+}
+
+func (mc *MessagingClient) NewPubChannel(chanName string) (*PubChannel, error) {
+ tp := broker.TopicPartition{
+ Namespace: "chan",
+ Topic: chanName,
+ Partition: 0,
+ }
+ grpcConnection, err := mc.findBroker(tp)
+ if err != nil {
+ return nil, err
+ }
+ pc, err := setupPublisherClient(grpcConnection, tp)
+ if err != nil {
+ return nil, err
+ }
+ return &PubChannel{
+ client: pc,
+ grpcConnection: grpcConnection,
+ md5hash: md5.New(),
+ }, nil
+}
+
+func (pc *PubChannel) Publish(m []byte) error {
+ err := pc.client.Send(&messaging_pb.PublishRequest{
+ Data: &messaging_pb.Message{
+ Value: m,
+ },
+ })
+ if err == nil {
+ pc.md5hash.Write(m)
+ }
+ return err
+}
+func (pc *PubChannel) Close() error {
+
+ // println("send closing")
+ if err := pc.client.Send(&messaging_pb.PublishRequest{
+ Data: &messaging_pb.Message{
+ IsClose: true,
+ },
+ }); err != nil {
+ log.Printf("err send close: %v", err)
+ }
+ // println("receive closing")
+ if _, err := pc.client.Recv(); err != nil && err != io.EOF {
+ log.Printf("err receive close: %v", err)
+ }
+ // println("close connection")
+ if err := pc.grpcConnection.Close(); err != nil {
+ log.Printf("err connection close: %v", err)
+ }
+ return nil
+}
+
+func (pc *PubChannel) Md5() []byte {
+ return pc.md5hash.Sum(nil)
+}
diff --git a/weed/messaging/client/publisher.go b/weed/messaging/msgclient/publisher.go
index 68e5729c1..b0fc5afbf 100644
--- a/weed/messaging/client/publisher.go
+++ b/weed/messaging/msgclient/publisher.go
@@ -1,10 +1,12 @@
-package client
+package msgclient
import (
"context"
"github.com/OneOfOne/xxhash"
+ "google.golang.org/grpc"
+ "github.com/chrislusf/seaweedfs/weed/messaging/broker"
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
)
@@ -22,7 +24,16 @@ func (mc *MessagingClient) NewPublisher(publisherId, namespace, topic string) (*
}
publishClients := make([]messaging_pb.SeaweedMessaging_PublishClient, topicConfiguration.PartitionCount)
for i := 0; i < int(topicConfiguration.PartitionCount); i++ {
- client, err := mc.setupPublisherClient(namespace, topic, int32(i))
+ tp := broker.TopicPartition{
+ Namespace: namespace,
+ Topic: topic,
+ Partition: int32(i),
+ }
+ grpcClientConn, err := mc.findBroker(tp)
+ if err != nil {
+ return nil, err
+ }
+ client, err := setupPublisherClient(grpcClientConn, tp)
if err != nil {
return nil, err
}
@@ -34,9 +45,9 @@ func (mc *MessagingClient) NewPublisher(publisherId, namespace, topic string) (*
}, nil
}
-func (mc *MessagingClient) setupPublisherClient(namespace, topic string, partition int32) (messaging_pb.SeaweedMessaging_PublishClient, error) {
+func setupPublisherClient(grpcConnection *grpc.ClientConn, tp broker.TopicPartition) (messaging_pb.SeaweedMessaging_PublishClient, error) {
- stream, err := messaging_pb.NewSeaweedMessagingClient(mc.grpcConnection).Publish(context.Background())
+ stream, err := messaging_pb.NewSeaweedMessagingClient(grpcConnection).Publish(context.Background())
if err != nil {
return nil, err
}
@@ -44,9 +55,9 @@ func (mc *MessagingClient) setupPublisherClient(namespace, topic string, partiti
// send init message
err = stream.Send(&messaging_pb.PublishRequest{
Init: &messaging_pb.PublishRequest_InitMessage{
- Namespace: namespace,
- Topic: topic,
- Partition: partition,
+ Namespace: tp.Namespace,
+ Topic: tp.Topic,
+ Partition: tp.Partition,
},
})
if err != nil {
@@ -105,9 +116,3 @@ func (p *Publisher) Publish(m *messaging_pb.Message) error {
Data: m,
})
}
-
-func (p *Publisher) Shutdown() {
- for _, client := range p.publishClients {
- client.CloseSend()
- }
-}
diff --git a/weed/messaging/msgclient/sub_chan.go b/weed/messaging/msgclient/sub_chan.go
new file mode 100644
index 000000000..213ff4666
--- /dev/null
+++ b/weed/messaging/msgclient/sub_chan.go
@@ -0,0 +1,85 @@
+package msgclient
+
+import (
+ "context"
+ "crypto/md5"
+ "hash"
+ "io"
+ "log"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/messaging/broker"
+ "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+)
+
+type SubChannel struct {
+ ch chan []byte
+ stream messaging_pb.SeaweedMessaging_SubscribeClient
+ md5hash hash.Hash
+ cancel context.CancelFunc
+}
+
+func (mc *MessagingClient) NewSubChannel(subscriberId, chanName string) (*SubChannel, error) {
+ tp := broker.TopicPartition{
+ Namespace: "chan",
+ Topic: chanName,
+ Partition: 0,
+ }
+ grpcConnection, err := mc.findBroker(tp)
+ if err != nil {
+ return nil, err
+ }
+ ctx, cancel := context.WithCancel(context.Background())
+ sc, err := setupSubscriberClient(ctx, grpcConnection, tp, subscriberId, time.Unix(0, 0))
+ if err != nil {
+ return nil, err
+ }
+
+ t := &SubChannel{
+ ch: make(chan []byte),
+ stream: sc,
+ md5hash: md5.New(),
+ cancel: cancel,
+ }
+
+ go func() {
+ for {
+ resp, subErr := t.stream.Recv()
+ if subErr == io.EOF {
+ return
+ }
+ if subErr != nil {
+ log.Printf("fail to receive from netchan %s: %v", chanName, subErr)
+ return
+ }
+ if resp.Data == nil {
+ // this could be heartbeat from broker
+ continue
+ }
+ if resp.Data.IsClose {
+ t.stream.Send(&messaging_pb.SubscriberMessage{
+ IsClose: true,
+ })
+ close(t.ch)
+ cancel()
+ return
+ }
+ t.ch <- resp.Data.Value
+ t.md5hash.Write(resp.Data.Value)
+ }
+ }()
+
+ return t, nil
+}
+
+func (sc *SubChannel) Channel() chan []byte {
+ return sc.ch
+}
+
+func (sc *SubChannel) Md5() []byte {
+ return sc.md5hash.Sum(nil)
+}
+
+func (sc *SubChannel) Cancel() {
+ sc.cancel()
+}
diff --git a/weed/messaging/msgclient/subscriber.go b/weed/messaging/msgclient/subscriber.go
new file mode 100644
index 000000000..caa795626
--- /dev/null
+++ b/weed/messaging/msgclient/subscriber.go
@@ -0,0 +1,120 @@
+package msgclient
+
+import (
+ "context"
+ "io"
+ "time"
+ "sync"
+
+ "github.com/chrislusf/seaweedfs/weed/messaging/broker"
+ "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+ "google.golang.org/grpc"
+)
+
+type Subscriber struct {
+ subscriberClients []messaging_pb.SeaweedMessaging_SubscribeClient
+ subscriberCancels []context.CancelFunc
+ subscriberId string
+}
+
+func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, partitionId int, startTime time.Time) (*Subscriber, error) {
+ // read topic configuration
+ topicConfiguration := &messaging_pb.TopicConfiguration{
+ PartitionCount: 4,
+ }
+ subscriberClients := make([]messaging_pb.SeaweedMessaging_SubscribeClient, topicConfiguration.PartitionCount)
+ subscriberCancels := make([]context.CancelFunc, topicConfiguration.PartitionCount)
+
+ for i := 0; i < int(topicConfiguration.PartitionCount); i++ {
+ if partitionId>=0 && i != partitionId {
+ continue
+ }
+ tp := broker.TopicPartition{
+ Namespace: namespace,
+ Topic: topic,
+ Partition: int32(i),
+ }
+ grpcClientConn, err := mc.findBroker(tp)
+ if err != nil {
+ return nil, err
+ }
+ ctx, cancel := context.WithCancel(context.Background())
+ client, err := setupSubscriberClient(ctx, grpcClientConn, tp, subscriberId, startTime)
+ if err != nil {
+ return nil, err
+ }
+ subscriberClients[i] = client
+ subscriberCancels[i] = cancel
+ }
+
+ return &Subscriber{
+ subscriberClients: subscriberClients,
+ subscriberCancels: subscriberCancels,
+ subscriberId: subscriberId,
+ }, nil
+}
+
+func setupSubscriberClient(ctx context.Context, grpcConnection *grpc.ClientConn, tp broker.TopicPartition, subscriberId string, startTime time.Time) (stream messaging_pb.SeaweedMessaging_SubscribeClient, err error) {
+ stream, err = messaging_pb.NewSeaweedMessagingClient(grpcConnection).Subscribe(ctx)
+ if err != nil {
+ return
+ }
+
+ // send init message
+ err = stream.Send(&messaging_pb.SubscriberMessage{
+ Init: &messaging_pb.SubscriberMessage_InitMessage{
+ Namespace: tp.Namespace,
+ Topic: tp.Topic,
+ Partition: tp.Partition,
+ StartPosition: messaging_pb.SubscriberMessage_InitMessage_TIMESTAMP,
+ TimestampNs: startTime.UnixNano(),
+ SubscriberId: subscriberId,
+ },
+ })
+ if err != nil {
+ return
+ }
+
+ return stream, nil
+}
+
+func doSubscribe(subscriberClient messaging_pb.SeaweedMessaging_SubscribeClient, processFn func(m *messaging_pb.Message)) error {
+ for {
+ resp, listenErr := subscriberClient.Recv()
+ if listenErr == io.EOF {
+ return nil
+ }
+ if listenErr != nil {
+ println(listenErr.Error())
+ return listenErr
+ }
+ if resp.Data == nil {
+ // this could be heartbeat from broker
+ continue
+ }
+ processFn(resp.Data)
+ }
+}
+
+// Subscribe starts goroutines to process the messages
+func (s *Subscriber) Subscribe(processFn func(m *messaging_pb.Message)) {
+ var wg sync.WaitGroup
+ for i := 0; i < len(s.subscriberClients); i++ {
+ if s.subscriberClients[i] != nil {
+ wg.Add(1)
+ go func(subscriberClient messaging_pb.SeaweedMessaging_SubscribeClient) {
+ defer wg.Done()
+ doSubscribe(subscriberClient, processFn)
+ }(s.subscriberClients[i])
+ }
+ }
+ wg.Wait()
+}
+
+func (s *Subscriber) Shutdown() {
+ for i := 0; i < len(s.subscriberClients); i++ {
+ if s.subscriberCancels[i] != nil {
+ s.subscriberCancels[i]()
+ }
+ }
+}
diff --git a/weed/operation/submit.go b/weed/operation/submit.go
index 7cf5b9645..e8bec382a 100644
--- a/weed/operation/submit.go
+++ b/weed/operation/submit.go
@@ -27,7 +27,7 @@ type FilePart struct {
Ttl string
Server string //this comes from assign result
Fid string //this comes from assign result, but customizable
- Fsync bool
+ Fsync bool
}
type SubmitResult struct {
diff --git a/weed/pb/filer.proto b/weed/pb/filer.proto
index 6d890861d..1fc8ef63d 100644
--- a/weed/pb/filer.proto
+++ b/weed/pb/filer.proto
@@ -48,6 +48,12 @@ service SeaweedFiler {
rpc SubscribeMetadata (SubscribeMetadataRequest) returns (stream SubscribeMetadataResponse) {
}
+ rpc KeepConnected (stream KeepConnectedRequest) returns (stream KeepConnectedResponse) {
+ }
+
+ rpc LocateBroker (LocateBrokerRequest) returns (LocateBrokerResponse) {
+ }
+
}
//////////////////////////////////////////////////
@@ -261,3 +267,25 @@ message LogEntry {
int32 partition_key_hash = 2;
bytes data = 3;
}
+
+message KeepConnectedRequest {
+ string name = 1;
+ uint32 grpc_port = 2;
+ repeated string resources = 3;
+}
+message KeepConnectedResponse {
+}
+
+message LocateBrokerRequest {
+ string resource = 1;
+}
+message LocateBrokerResponse {
+ bool found = 1;
+ // if found, send the exact address
+ // if not found, send the full list of existing brokers
+ message Resource {
+ string grpc_addresses = 1;
+ int32 resource_count = 2;
+ }
+ repeated Resource resources = 2;
+}
diff --git a/weed/pb/filer_pb/filer.pb.go b/weed/pb/filer_pb/filer.pb.go
index 83f0eae0d..f5b62e377 100644
--- a/weed/pb/filer_pb/filer.pb.go
+++ b/weed/pb/filer_pb/filer.pb.go
@@ -44,6 +44,10 @@ It has these top-level messages:
SubscribeMetadataRequest
SubscribeMetadataResponse
LogEntry
+ KeepConnectedRequest
+ KeepConnectedResponse
+ LocateBrokerRequest
+ LocateBrokerResponse
*/
package filer_pb
@@ -1228,6 +1232,114 @@ func (m *LogEntry) GetData() []byte {
return nil
}
+type KeepConnectedRequest struct {
+ Name string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"`
+ GrpcPort uint32 `protobuf:"varint,2,opt,name=grpc_port,json=grpcPort" json:"grpc_port,omitempty"`
+ Resources []string `protobuf:"bytes,3,rep,name=resources" json:"resources,omitempty"`
+}
+
+func (m *KeepConnectedRequest) Reset() { *m = KeepConnectedRequest{} }
+func (m *KeepConnectedRequest) String() string { return proto.CompactTextString(m) }
+func (*KeepConnectedRequest) ProtoMessage() {}
+func (*KeepConnectedRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{35} }
+
+func (m *KeepConnectedRequest) GetName() string {
+ if m != nil {
+ return m.Name
+ }
+ return ""
+}
+
+func (m *KeepConnectedRequest) GetGrpcPort() uint32 {
+ if m != nil {
+ return m.GrpcPort
+ }
+ return 0
+}
+
+func (m *KeepConnectedRequest) GetResources() []string {
+ if m != nil {
+ return m.Resources
+ }
+ return nil
+}
+
+type KeepConnectedResponse struct {
+}
+
+func (m *KeepConnectedResponse) Reset() { *m = KeepConnectedResponse{} }
+func (m *KeepConnectedResponse) String() string { return proto.CompactTextString(m) }
+func (*KeepConnectedResponse) ProtoMessage() {}
+func (*KeepConnectedResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{36} }
+
+type LocateBrokerRequest struct {
+ Resource string `protobuf:"bytes,1,opt,name=resource" json:"resource,omitempty"`
+}
+
+func (m *LocateBrokerRequest) Reset() { *m = LocateBrokerRequest{} }
+func (m *LocateBrokerRequest) String() string { return proto.CompactTextString(m) }
+func (*LocateBrokerRequest) ProtoMessage() {}
+func (*LocateBrokerRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{37} }
+
+func (m *LocateBrokerRequest) GetResource() string {
+ if m != nil {
+ return m.Resource
+ }
+ return ""
+}
+
+type LocateBrokerResponse struct {
+ Found bool `protobuf:"varint,1,opt,name=found" json:"found,omitempty"`
+ Resources []*LocateBrokerResponse_Resource `protobuf:"bytes,2,rep,name=resources" json:"resources,omitempty"`
+}
+
+func (m *LocateBrokerResponse) Reset() { *m = LocateBrokerResponse{} }
+func (m *LocateBrokerResponse) String() string { return proto.CompactTextString(m) }
+func (*LocateBrokerResponse) ProtoMessage() {}
+func (*LocateBrokerResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{38} }
+
+func (m *LocateBrokerResponse) GetFound() bool {
+ if m != nil {
+ return m.Found
+ }
+ return false
+}
+
+func (m *LocateBrokerResponse) GetResources() []*LocateBrokerResponse_Resource {
+ if m != nil {
+ return m.Resources
+ }
+ return nil
+}
+
+// if found, send the exact address
+// if not found, send the full list of existing brokers
+type LocateBrokerResponse_Resource struct {
+ GrpcAddresses string `protobuf:"bytes,1,opt,name=grpc_addresses,json=grpcAddresses" json:"grpc_addresses,omitempty"`
+ ResourceCount int32 `protobuf:"varint,2,opt,name=resource_count,json=resourceCount" json:"resource_count,omitempty"`
+}
+
+func (m *LocateBrokerResponse_Resource) Reset() { *m = LocateBrokerResponse_Resource{} }
+func (m *LocateBrokerResponse_Resource) String() string { return proto.CompactTextString(m) }
+func (*LocateBrokerResponse_Resource) ProtoMessage() {}
+func (*LocateBrokerResponse_Resource) Descriptor() ([]byte, []int) {
+ return fileDescriptor0, []int{38, 0}
+}
+
+func (m *LocateBrokerResponse_Resource) GetGrpcAddresses() string {
+ if m != nil {
+ return m.GrpcAddresses
+ }
+ return ""
+}
+
+func (m *LocateBrokerResponse_Resource) GetResourceCount() int32 {
+ if m != nil {
+ return m.ResourceCount
+ }
+ return 0
+}
+
func init() {
proto.RegisterType((*LookupDirectoryEntryRequest)(nil), "filer_pb.LookupDirectoryEntryRequest")
proto.RegisterType((*LookupDirectoryEntryResponse)(nil), "filer_pb.LookupDirectoryEntryResponse")
@@ -1264,6 +1376,11 @@ func init() {
proto.RegisterType((*SubscribeMetadataRequest)(nil), "filer_pb.SubscribeMetadataRequest")
proto.RegisterType((*SubscribeMetadataResponse)(nil), "filer_pb.SubscribeMetadataResponse")
proto.RegisterType((*LogEntry)(nil), "filer_pb.LogEntry")
+ proto.RegisterType((*KeepConnectedRequest)(nil), "filer_pb.KeepConnectedRequest")
+ proto.RegisterType((*KeepConnectedResponse)(nil), "filer_pb.KeepConnectedResponse")
+ proto.RegisterType((*LocateBrokerRequest)(nil), "filer_pb.LocateBrokerRequest")
+ proto.RegisterType((*LocateBrokerResponse)(nil), "filer_pb.LocateBrokerResponse")
+ proto.RegisterType((*LocateBrokerResponse_Resource)(nil), "filer_pb.LocateBrokerResponse.Resource")
}
// Reference imports to suppress errors if they are not otherwise used.
@@ -1290,6 +1407,8 @@ type SeaweedFilerClient interface {
Statistics(ctx context.Context, in *StatisticsRequest, opts ...grpc.CallOption) (*StatisticsResponse, error)
GetFilerConfiguration(ctx context.Context, in *GetFilerConfigurationRequest, opts ...grpc.CallOption) (*GetFilerConfigurationResponse, error)
SubscribeMetadata(ctx context.Context, in *SubscribeMetadataRequest, opts ...grpc.CallOption) (SeaweedFiler_SubscribeMetadataClient, error)
+ KeepConnected(ctx context.Context, opts ...grpc.CallOption) (SeaweedFiler_KeepConnectedClient, error)
+ LocateBroker(ctx context.Context, in *LocateBrokerRequest, opts ...grpc.CallOption) (*LocateBrokerResponse, error)
}
type seaweedFilerClient struct {
@@ -1463,6 +1582,46 @@ func (x *seaweedFilerSubscribeMetadataClient) Recv() (*SubscribeMetadataResponse
return m, nil
}
+func (c *seaweedFilerClient) KeepConnected(ctx context.Context, opts ...grpc.CallOption) (SeaweedFiler_KeepConnectedClient, error) {
+ stream, err := grpc.NewClientStream(ctx, &_SeaweedFiler_serviceDesc.Streams[2], c.cc, "/filer_pb.SeaweedFiler/KeepConnected", opts...)
+ if err != nil {
+ return nil, err
+ }
+ x := &seaweedFilerKeepConnectedClient{stream}
+ return x, nil
+}
+
+type SeaweedFiler_KeepConnectedClient interface {
+ Send(*KeepConnectedRequest) error
+ Recv() (*KeepConnectedResponse, error)
+ grpc.ClientStream
+}
+
+type seaweedFilerKeepConnectedClient struct {
+ grpc.ClientStream
+}
+
+func (x *seaweedFilerKeepConnectedClient) Send(m *KeepConnectedRequest) error {
+ return x.ClientStream.SendMsg(m)
+}
+
+func (x *seaweedFilerKeepConnectedClient) Recv() (*KeepConnectedResponse, error) {
+ m := new(KeepConnectedResponse)
+ if err := x.ClientStream.RecvMsg(m); err != nil {
+ return nil, err
+ }
+ return m, nil
+}
+
+func (c *seaweedFilerClient) LocateBroker(ctx context.Context, in *LocateBrokerRequest, opts ...grpc.CallOption) (*LocateBrokerResponse, error) {
+ out := new(LocateBrokerResponse)
+ err := grpc.Invoke(ctx, "/filer_pb.SeaweedFiler/LocateBroker", in, out, c.cc, opts...)
+ if err != nil {
+ return nil, err
+ }
+ return out, nil
+}
+
// Server API for SeaweedFiler service
type SeaweedFilerServer interface {
@@ -1479,6 +1638,8 @@ type SeaweedFilerServer interface {
Statistics(context.Context, *StatisticsRequest) (*StatisticsResponse, error)
GetFilerConfiguration(context.Context, *GetFilerConfigurationRequest) (*GetFilerConfigurationResponse, error)
SubscribeMetadata(*SubscribeMetadataRequest, SeaweedFiler_SubscribeMetadataServer) error
+ KeepConnected(SeaweedFiler_KeepConnectedServer) error
+ LocateBroker(context.Context, *LocateBrokerRequest) (*LocateBrokerResponse, error)
}
func RegisterSeaweedFilerServer(s *grpc.Server, srv SeaweedFilerServer) {
@@ -1725,6 +1886,50 @@ func (x *seaweedFilerSubscribeMetadataServer) Send(m *SubscribeMetadataResponse)
return x.ServerStream.SendMsg(m)
}
+func _SeaweedFiler_KeepConnected_Handler(srv interface{}, stream grpc.ServerStream) error {
+ return srv.(SeaweedFilerServer).KeepConnected(&seaweedFilerKeepConnectedServer{stream})
+}
+
+type SeaweedFiler_KeepConnectedServer interface {
+ Send(*KeepConnectedResponse) error
+ Recv() (*KeepConnectedRequest, error)
+ grpc.ServerStream
+}
+
+type seaweedFilerKeepConnectedServer struct {
+ grpc.ServerStream
+}
+
+func (x *seaweedFilerKeepConnectedServer) Send(m *KeepConnectedResponse) error {
+ return x.ServerStream.SendMsg(m)
+}
+
+func (x *seaweedFilerKeepConnectedServer) Recv() (*KeepConnectedRequest, error) {
+ m := new(KeepConnectedRequest)
+ if err := x.ServerStream.RecvMsg(m); err != nil {
+ return nil, err
+ }
+ return m, nil
+}
+
+func _SeaweedFiler_LocateBroker_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
+ in := new(LocateBrokerRequest)
+ if err := dec(in); err != nil {
+ return nil, err
+ }
+ if interceptor == nil {
+ return srv.(SeaweedFilerServer).LocateBroker(ctx, in)
+ }
+ info := &grpc.UnaryServerInfo{
+ Server: srv,
+ FullMethod: "/filer_pb.SeaweedFiler/LocateBroker",
+ }
+ handler := func(ctx context.Context, req interface{}) (interface{}, error) {
+ return srv.(SeaweedFilerServer).LocateBroker(ctx, req.(*LocateBrokerRequest))
+ }
+ return interceptor(ctx, in, info, handler)
+}
+
var _SeaweedFiler_serviceDesc = grpc.ServiceDesc{
ServiceName: "filer_pb.SeaweedFiler",
HandlerType: (*SeaweedFilerServer)(nil),
@@ -1773,6 +1978,10 @@ var _SeaweedFiler_serviceDesc = grpc.ServiceDesc{
MethodName: "GetFilerConfiguration",
Handler: _SeaweedFiler_GetFilerConfiguration_Handler,
},
+ {
+ MethodName: "LocateBroker",
+ Handler: _SeaweedFiler_LocateBroker_Handler,
+ },
},
Streams: []grpc.StreamDesc{
{
@@ -1785,6 +1994,12 @@ var _SeaweedFiler_serviceDesc = grpc.ServiceDesc{
Handler: _SeaweedFiler_SubscribeMetadata_Handler,
ServerStreams: true,
},
+ {
+ StreamName: "KeepConnected",
+ Handler: _SeaweedFiler_KeepConnected_Handler,
+ ServerStreams: true,
+ ClientStreams: true,
+ },
},
Metadata: "filer.proto",
}
@@ -1792,128 +2007,139 @@ var _SeaweedFiler_serviceDesc = grpc.ServiceDesc{
func init() { proto.RegisterFile("filer.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{
- // 1957 bytes of a gzipped FileDescriptorProto
- 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xb4, 0x58, 0x5f, 0x6f, 0xdb, 0xc8,
- 0x11, 0x0f, 0x25, 0x4b, 0x16, 0x47, 0x52, 0xce, 0x5e, 0xdb, 0x89, 0xac, 0xd8, 0x8e, 0x8f, 0x69,
- 0xae, 0x29, 0x12, 0xb8, 0x81, 0x7b, 0x05, 0xee, 0x7a, 0xed, 0x43, 0xe2, 0x38, 0xd7, 0xf4, 0x12,
- 0x5f, 0x40, 0x27, 0x45, 0x8b, 0x02, 0x65, 0x69, 0x72, 0x2d, 0x6d, 0x4d, 0x91, 0xec, 0xee, 0xd2,
- 0x7f, 0xee, 0xe9, 0x3e, 0x47, 0x81, 0x7e, 0x8b, 0x3e, 0x16, 0x7d, 0x29, 0x0a, 0x14, 0xe8, 0xb7,
- 0xe8, 0xf7, 0x28, 0x50, 0xec, 0x2c, 0x49, 0x2d, 0x45, 0xc9, 0xbe, 0xa0, 0xb8, 0xb7, 0xdd, 0x99,
- 0xd9, 0xd9, 0xd9, 0xf9, 0xf3, 0x9b, 0x21, 0xa1, 0x7b, 0xca, 0x22, 0xca, 0xf7, 0x52, 0x9e, 0xc8,
- 0x84, 0x74, 0x70, 0xe3, 0xa5, 0x27, 0xce, 0xd7, 0x70, 0xef, 0x75, 0x92, 0x9c, 0x65, 0xe9, 0x0b,
- 0xc6, 0x69, 0x20, 0x13, 0x7e, 0x75, 0x18, 0x4b, 0x7e, 0xe5, 0xd2, 0x3f, 0x65, 0x54, 0x48, 0xb2,
- 0x05, 0x76, 0x58, 0x30, 0x06, 0xd6, 0xae, 0xf5, 0xc8, 0x76, 0xa7, 0x04, 0x42, 0x60, 0x29, 0xf6,
- 0x27, 0x74, 0xd0, 0x40, 0x06, 0xae, 0x9d, 0x43, 0xd8, 0x9a, 0xaf, 0x50, 0xa4, 0x49, 0x2c, 0x28,
- 0x79, 0x08, 0x2d, 0xaa, 0x08, 0xa8, 0xad, 0xbb, 0xff, 0xd1, 0x5e, 0x61, 0xca, 0x9e, 0x96, 0xd3,
- 0x5c, 0xe7, 0xef, 0x16, 0x90, 0xd7, 0x4c, 0x48, 0x45, 0x64, 0x54, 0x7c, 0x37, 0x7b, 0xee, 0x40,
- 0x3b, 0xe5, 0xf4, 0x94, 0x5d, 0xe6, 0x16, 0xe5, 0x3b, 0xf2, 0x04, 0x56, 0x85, 0xf4, 0xb9, 0x7c,
- 0xc9, 0x93, 0xc9, 0x4b, 0x16, 0xd1, 0x23, 0x65, 0x74, 0x13, 0x45, 0xea, 0x0c, 0xb2, 0x07, 0x84,
- 0xc5, 0x41, 0x94, 0x09, 0x76, 0x4e, 0x8f, 0x0b, 0xee, 0x60, 0x69, 0xd7, 0x7a, 0xd4, 0x71, 0xe7,
- 0x70, 0xc8, 0x3a, 0xb4, 0x22, 0x36, 0x61, 0x72, 0xd0, 0xda, 0xb5, 0x1e, 0xf5, 0x5d, 0xbd, 0x71,
- 0x7e, 0x0e, 0x6b, 0x15, 0xfb, 0x3f, 0xec, 0xf9, 0x7f, 0x69, 0x40, 0x0b, 0x09, 0xa5, 0x8f, 0xad,
- 0xa9, 0x8f, 0xc9, 0xc7, 0xd0, 0x63, 0xc2, 0x9b, 0x3a, 0xa2, 0x81, 0xb6, 0x75, 0x99, 0x28, 0x7d,
- 0x4e, 0x1e, 0x43, 0x3b, 0x18, 0x67, 0xf1, 0x99, 0x18, 0x34, 0x77, 0x9b, 0x8f, 0xba, 0xfb, 0x6b,
- 0xd3, 0x8b, 0xd4, 0x43, 0x0f, 0x14, 0xcf, 0xcd, 0x45, 0xc8, 0x67, 0x00, 0xbe, 0x94, 0x9c, 0x9d,
- 0x64, 0x92, 0x0a, 0x7c, 0x69, 0x77, 0x7f, 0x60, 0x1c, 0xc8, 0x04, 0x7d, 0x56, 0xf2, 0x5d, 0x43,
- 0x96, 0x7c, 0x0e, 0x1d, 0x7a, 0x29, 0x69, 0x1c, 0xd2, 0x70, 0xd0, 0xc2, 0x8b, 0xb6, 0x67, 0x5e,
- 0xb4, 0x77, 0x98, 0xf3, 0xf5, 0xfb, 0x4a, 0xf1, 0xe1, 0x17, 0xd0, 0xaf, 0xb0, 0xc8, 0x0a, 0x34,
- 0xcf, 0x68, 0x11, 0x55, 0xb5, 0x54, 0x9e, 0x3d, 0xf7, 0xa3, 0x4c, 0x27, 0x58, 0xcf, 0xd5, 0x9b,
- 0x9f, 0x35, 0x3e, 0xb3, 0x9c, 0x17, 0x60, 0xbf, 0xcc, 0xa2, 0xa8, 0x3c, 0x18, 0x32, 0x5e, 0x1c,
- 0x0c, 0x19, 0x9f, 0x7a, 0xb9, 0x71, 0xad, 0x97, 0xff, 0x66, 0xc1, 0xea, 0xe1, 0x39, 0x8d, 0xe5,
- 0x51, 0x22, 0xd9, 0x29, 0x0b, 0x7c, 0xc9, 0x92, 0x98, 0x3c, 0x01, 0x3b, 0x89, 0x42, 0xef, 0xda,
- 0x30, 0x75, 0x92, 0x28, 0xb7, 0xfa, 0x09, 0xd8, 0x31, 0xbd, 0xf0, 0xae, 0xbd, 0xae, 0x13, 0xd3,
- 0x0b, 0x2d, 0xfd, 0x00, 0xfa, 0x21, 0x8d, 0xa8, 0xa4, 0x5e, 0x19, 0x1d, 0x15, 0xba, 0x9e, 0x26,
- 0x1e, 0xe8, 0x70, 0x7c, 0x02, 0x1f, 0x29, 0x95, 0xa9, 0xcf, 0x69, 0x2c, 0xbd, 0xd4, 0x97, 0x63,
- 0x8c, 0x89, 0xed, 0xf6, 0x63, 0x7a, 0xf1, 0x16, 0xa9, 0x6f, 0x7d, 0x39, 0x76, 0xfe, 0xda, 0x00,
- 0xbb, 0x0c, 0x26, 0xb9, 0x0b, 0xcb, 0xea, 0x5a, 0x8f, 0x85, 0xb9, 0x27, 0xda, 0x6a, 0xfb, 0x2a,
- 0x54, 0x55, 0x91, 0x9c, 0x9e, 0x0a, 0x2a, 0xd1, 0xbc, 0xa6, 0x9b, 0xef, 0x54, 0x66, 0x09, 0xf6,
- 0x8d, 0x2e, 0x84, 0x25, 0x17, 0xd7, 0xca, 0xe3, 0x13, 0xc9, 0x26, 0x14, 0x2f, 0x6c, 0xba, 0x7a,
- 0x43, 0xd6, 0xa0, 0x45, 0x3d, 0xe9, 0x8f, 0x30, 0xc3, 0x6d, 0x77, 0x89, 0xbe, 0xf3, 0x47, 0xe4,
- 0x07, 0x70, 0x5b, 0x24, 0x19, 0x0f, 0xa8, 0x57, 0x5c, 0xdb, 0x46, 0x6e, 0x4f, 0x53, 0x5f, 0xea,
- 0xcb, 0x1d, 0x68, 0x9e, 0xb2, 0x70, 0xb0, 0x8c, 0x8e, 0x59, 0xa9, 0x26, 0xe1, 0xab, 0xd0, 0x55,
- 0x4c, 0xf2, 0x63, 0x80, 0x52, 0x53, 0x38, 0xe8, 0x2c, 0x10, 0xb5, 0x0b, 0xbd, 0x21, 0xd9, 0x06,
- 0x08, 0x58, 0x3a, 0xa6, 0xdc, 0x53, 0x09, 0x63, 0x63, 0x72, 0xd8, 0x9a, 0xf2, 0x15, 0xbd, 0x52,
- 0x6c, 0x26, 0xbc, 0xd1, 0x37, 0x2c, 0x4d, 0x69, 0x38, 0x00, 0xf4, 0xb0, 0xcd, 0xc4, 0x97, 0x9a,
- 0xe0, 0xfc, 0x06, 0xda, 0xb9, 0x71, 0xf7, 0xc0, 0x3e, 0x4f, 0xa2, 0x6c, 0x52, 0x3a, 0xad, 0xef,
- 0x76, 0x34, 0xe1, 0x55, 0x48, 0x36, 0x01, 0x51, 0x12, 0xaf, 0x68, 0xa0, 0x8b, 0xd0, 0xbf, 0xea,
- 0x82, 0x3b, 0xd0, 0x0e, 0x92, 0xe4, 0x8c, 0x69, 0xdf, 0x2d, 0xbb, 0xf9, 0xce, 0xf9, 0xb6, 0x09,
- 0xb7, 0xab, 0xc5, 0xa2, 0xae, 0x40, 0x2d, 0xe8, 0x69, 0x0b, 0xd5, 0xa0, 0xda, 0xe3, 0x8a, 0xb7,
- 0x1b, 0xa6, 0xb7, 0x8b, 0x23, 0x93, 0x24, 0xd4, 0x17, 0xf4, 0xf5, 0x91, 0x37, 0x49, 0x48, 0x55,
- 0xae, 0x67, 0x2c, 0xc4, 0xf0, 0xf4, 0x5d, 0xb5, 0x54, 0x94, 0x11, 0x0b, 0x73, 0xf0, 0x51, 0x4b,
- 0x34, 0x8f, 0xa3, 0xde, 0xb6, 0x0e, 0xb8, 0xde, 0xa9, 0x80, 0x4f, 0x14, 0x75, 0x59, 0x47, 0x51,
- 0xad, 0xc9, 0x2e, 0x74, 0x39, 0x4d, 0xa3, 0x3c, 0xf7, 0xd1, 0xf9, 0xb6, 0x6b, 0x92, 0xc8, 0x0e,
- 0x40, 0x90, 0x44, 0x11, 0x0d, 0x50, 0xc0, 0x46, 0x01, 0x83, 0xa2, 0xf2, 0x4e, 0xca, 0xc8, 0x13,
- 0x34, 0x40, 0x57, 0xb7, 0xdc, 0xb6, 0x94, 0xd1, 0x31, 0x0d, 0xd4, 0x3b, 0x32, 0x41, 0xb9, 0x87,
- 0xf0, 0xd5, 0xc5, 0x73, 0x1d, 0x45, 0x40, 0x90, 0xdd, 0x06, 0x18, 0xf1, 0x24, 0x4b, 0x35, 0xb7,
- 0xb7, 0xdb, 0x54, 0x48, 0x8e, 0x14, 0x64, 0x3f, 0x84, 0xdb, 0xe2, 0x6a, 0x12, 0xb1, 0xf8, 0xcc,
- 0x93, 0x3e, 0x1f, 0x51, 0x39, 0xe8, 0xeb, 0x0a, 0xc8, 0xa9, 0xef, 0x90, 0xa8, 0xde, 0x3e, 0x09,
- 0x7f, 0x3a, 0xb8, 0x8d, 0x19, 0xa0, 0x96, 0x4e, 0x0a, 0xe4, 0x80, 0x53, 0x5f, 0xd2, 0x0f, 0x68,
- 0x63, 0xdf, 0x0d, 0x2d, 0xc8, 0x06, 0xb4, 0x13, 0x8f, 0x5e, 0x06, 0x51, 0x5e, 0xb4, 0xad, 0xe4,
- 0xf0, 0x32, 0x88, 0x9c, 0xc7, 0xb0, 0x56, 0xb9, 0x31, 0x07, 0xfa, 0x75, 0x68, 0x51, 0xce, 0x93,
- 0x02, 0x96, 0xf4, 0xc6, 0xf9, 0x2d, 0x90, 0xf7, 0x69, 0xf8, 0x7d, 0x98, 0xe7, 0x6c, 0xc0, 0x5a,
- 0x45, 0xb5, 0xb6, 0xc3, 0xf9, 0xd6, 0x82, 0xf5, 0x67, 0x69, 0x4a, 0xe3, 0xf0, 0x5d, 0xf2, 0x01,
- 0x97, 0x6e, 0x03, 0xa0, 0x5a, 0xcf, 0x68, 0xf0, 0x36, 0x52, 0x30, 0x3e, 0x1f, 0xd2, 0x5e, 0x9c,
- 0xbb, 0xb0, 0x31, 0x63, 0x41, 0x6e, 0xdb, 0x3f, 0x2d, 0x20, 0x2f, 0x10, 0xf9, 0xfe, 0xbf, 0xa1,
- 0x43, 0x61, 0x91, 0x6a, 0x88, 0x1a, 0x59, 0x43, 0x5f, 0xfa, 0x79, 0xbb, 0xee, 0x31, 0xa1, 0xf5,
- 0xbf, 0xf0, 0xa5, 0x9f, 0xb7, 0x4d, 0x4e, 0x83, 0x8c, 0xab, 0x0e, 0x8e, 0x25, 0x83, 0x6d, 0xd3,
- 0x2d, 0x48, 0xe4, 0x53, 0xb8, 0xc3, 0x46, 0x71, 0xc2, 0xe9, 0x54, 0xcc, 0xd3, 0x61, 0x6c, 0xa3,
- 0xf0, 0xba, 0xe6, 0x96, 0x07, 0x0e, 0x31, 0xaa, 0x8f, 0x61, 0xad, 0xf2, 0x8c, 0x6b, 0x53, 0xe0,
- 0xcf, 0x16, 0x0c, 0x9e, 0xc9, 0x64, 0xc2, 0x02, 0x97, 0x2a, 0xe3, 0x2b, 0x4f, 0x7f, 0x00, 0x7d,
- 0xd5, 0x7b, 0x66, 0x9f, 0xdf, 0x4b, 0xa2, 0x70, 0xda, 0xdb, 0x37, 0x41, 0xb5, 0x1f, 0x33, 0x32,
- 0xcb, 0x49, 0x14, 0x62, 0x5c, 0x1e, 0x80, 0xea, 0x11, 0xc6, 0x79, 0x3d, 0xe5, 0xf4, 0x62, 0x7a,
- 0x51, 0x39, 0xaf, 0x84, 0xf0, 0xbc, 0x6e, 0x2c, 0xcb, 0x31, 0xbd, 0x50, 0xe7, 0x9d, 0x7b, 0xb0,
- 0x39, 0xc7, 0xb6, 0x3c, 0x5c, 0xff, 0xb2, 0x60, 0xed, 0x99, 0x10, 0x6c, 0x14, 0xff, 0x1a, 0x41,
- 0xb2, 0x30, 0x7a, 0x1d, 0x5a, 0x41, 0x92, 0xc5, 0x12, 0x8d, 0x6d, 0xb9, 0x7a, 0x33, 0x83, 0x1b,
- 0x8d, 0x1a, 0x6e, 0xcc, 0x20, 0x4f, 0xb3, 0x8e, 0x3c, 0x06, 0xb2, 0x2c, 0x55, 0x90, 0xe5, 0x3e,
- 0x74, 0x55, 0x90, 0xbd, 0x80, 0xc6, 0x92, 0xf2, 0xbc, 0x2b, 0x81, 0x22, 0x1d, 0x20, 0x45, 0x09,
- 0x98, 0xdd, 0x53, 0x37, 0x26, 0x48, 0xa7, 0xad, 0xf3, 0x3f, 0xaa, 0x2a, 0x2a, 0x4f, 0xc9, 0x63,
- 0xb6, 0xb0, 0x8b, 0x2a, 0xe0, 0xe5, 0x51, 0xfe, 0x0e, 0xb5, 0x54, 0x25, 0x92, 0x66, 0x27, 0x11,
- 0x0b, 0x3c, 0xc5, 0xd0, 0xf6, 0xdb, 0x9a, 0xf2, 0x9e, 0x47, 0x53, 0xaf, 0x2c, 0x99, 0x5e, 0x21,
- 0xb0, 0xe4, 0x67, 0x72, 0x5c, 0x74, 0x52, 0xb5, 0x9e, 0xf1, 0x54, 0xfb, 0x26, 0x4f, 0x2d, 0xd7,
- 0x3d, 0x55, 0x66, 0x5a, 0xc7, 0xcc, 0xb4, 0x4f, 0x61, 0x4d, 0x8f, 0xe2, 0xd5, 0x70, 0x6d, 0x03,
- 0x94, 0x5d, 0x4f, 0x0c, 0x2c, 0x0d, 0xbd, 0x45, 0xdb, 0x13, 0xce, 0x2f, 0xc0, 0x7e, 0x9d, 0x68,
- 0xbd, 0x82, 0x3c, 0x05, 0x3b, 0x2a, 0x36, 0x28, 0xda, 0xdd, 0x27, 0xd3, 0x52, 0x2f, 0xe4, 0xdc,
- 0xa9, 0x90, 0xf3, 0x05, 0x74, 0x0a, 0x72, 0xe1, 0x33, 0x6b, 0x91, 0xcf, 0x1a, 0x33, 0x3e, 0x73,
- 0xfe, 0x61, 0xc1, 0x7a, 0xd5, 0xe4, 0x3c, 0x2c, 0xef, 0xa1, 0x5f, 0x5e, 0xe1, 0x4d, 0xfc, 0x34,
- 0xb7, 0xe5, 0xa9, 0x69, 0x4b, 0xfd, 0x58, 0x69, 0xa0, 0x78, 0xe3, 0xa7, 0x3a, 0x97, 0x7b, 0x91,
- 0x41, 0x1a, 0xbe, 0x83, 0xd5, 0x9a, 0xc8, 0x9c, 0x39, 0xf4, 0x47, 0xe6, 0x1c, 0x5a, 0x01, 0xbb,
- 0xf2, 0xb4, 0x39, 0x9c, 0x7e, 0x0e, 0x77, 0x35, 0x1c, 0x1c, 0x94, 0x31, 0x2c, 0x7c, 0x5f, 0x0d,
- 0xb5, 0x35, 0x1b, 0x6a, 0x67, 0x08, 0x83, 0xfa, 0xd1, 0xbc, 0xfc, 0x46, 0xb0, 0x7a, 0x2c, 0x7d,
- 0xc9, 0x84, 0x64, 0x41, 0xf9, 0x41, 0x34, 0x93, 0x1b, 0xd6, 0x4d, 0xfd, 0xbb, 0x5e, 0x87, 0x2b,
- 0xd0, 0x94, 0xb2, 0xc8, 0x5f, 0xb5, 0x54, 0x51, 0x20, 0xe6, 0x4d, 0x79, 0x0c, 0xbe, 0x87, 0xab,
- 0x54, 0x3e, 0xc8, 0x44, 0xfa, 0x91, 0x9e, 0x8f, 0x96, 0x70, 0x3e, 0xb2, 0x91, 0x82, 0x03, 0x92,
- 0x1e, 0x21, 0x42, 0xcd, 0x6d, 0xe9, 0xe9, 0x49, 0x11, 0x90, 0xb9, 0x0d, 0x80, 0xa5, 0xaa, 0xab,
- 0xac, 0xad, 0xcf, 0x2a, 0xca, 0x81, 0x22, 0x38, 0x3b, 0xb0, 0xf5, 0x25, 0x95, 0xaa, 0x1b, 0xf1,
- 0x83, 0x24, 0x3e, 0x65, 0xa3, 0x8c, 0xfb, 0x46, 0x28, 0x9c, 0x7f, 0x5b, 0xb0, 0xbd, 0x40, 0x20,
- 0x7f, 0xf0, 0x00, 0x96, 0x27, 0xbe, 0x90, 0x94, 0x17, 0x55, 0x52, 0x6c, 0x67, 0x5d, 0xd1, 0xb8,
- 0xc9, 0x15, 0xcd, 0x9a, 0x2b, 0x36, 0xa0, 0x3d, 0xf1, 0x2f, 0xbd, 0xc9, 0x49, 0x3e, 0xca, 0xb5,
- 0x26, 0xfe, 0xe5, 0x9b, 0x13, 0x44, 0x36, 0xc6, 0xbd, 0x93, 0x2c, 0x38, 0xa3, 0x52, 0x94, 0xc8,
- 0xc6, 0xf8, 0x73, 0x4d, 0xc1, 0xd9, 0x0e, 0x07, 0x5d, 0x84, 0x81, 0x8e, 0x9b, 0xef, 0x9c, 0x0b,
- 0x18, 0x1c, 0x67, 0x27, 0x22, 0xe0, 0xec, 0x84, 0xbe, 0xa1, 0xd2, 0x57, 0x60, 0x58, 0xe4, 0xc8,
- 0x7d, 0xe8, 0x06, 0x11, 0x53, 0x68, 0x68, 0x7c, 0x49, 0x82, 0x26, 0x61, 0xd7, 0x40, 0xb8, 0x94,
- 0x63, 0xaf, 0xf2, 0xf1, 0x0c, 0x8a, 0xf4, 0x56, 0x7f, 0x40, 0x6f, 0x42, 0x47, 0xb0, 0x38, 0xa0,
- 0x5e, 0xac, 0xbf, 0x58, 0x9a, 0xee, 0x32, 0xee, 0x8f, 0x84, 0x6a, 0x67, 0x9b, 0x73, 0x6e, 0xce,
- 0x5d, 0x78, 0x7d, 0x2b, 0xff, 0x15, 0x10, 0x7a, 0x8e, 0x76, 0x19, 0xdf, 0x5f, 0x79, 0x91, 0xdd,
- 0x33, 0xc6, 0x9c, 0xd9, 0x4f, 0x34, 0x77, 0x95, 0xd6, 0xbe, 0xda, 0xd6, 0xa0, 0x25, 0xc5, 0xd4,
- 0xbe, 0x25, 0x29, 0x8e, 0x84, 0xe3, 0x2b, 0x30, 0x1a, 0xe9, 0xb2, 0x2e, 0x05, 0xac, 0xa9, 0x00,
- 0x79, 0x02, 0x24, 0xf5, 0xb9, 0x64, 0x4a, 0x85, 0x9a, 0xf4, 0xbd, 0xb1, 0x2f, 0xc6, 0x68, 0x41,
- 0xcb, 0x5d, 0x29, 0x39, 0x5f, 0xd1, 0xab, 0x5f, 0xfa, 0x62, 0xac, 0xc0, 0x1b, 0x87, 0x8b, 0x26,
- 0xce, 0x9b, 0xb8, 0xde, 0xff, 0x6f, 0x07, 0x7a, 0xc7, 0xd4, 0xbf, 0xa0, 0x34, 0xc4, 0x54, 0x22,
- 0xa3, 0x02, 0xc2, 0xaa, 0x3f, 0x40, 0xc8, 0xc3, 0x59, 0xac, 0x9a, 0xfb, 0xc7, 0x65, 0xf8, 0xc9,
- 0x4d, 0x62, 0x39, 0x1a, 0xdc, 0x22, 0x47, 0xd0, 0x35, 0xfe, 0x30, 0x90, 0x2d, 0xe3, 0x60, 0xed,
- 0xc7, 0xc9, 0x70, 0x7b, 0x01, 0xb7, 0xd0, 0xf6, 0xd4, 0x22, 0xaf, 0xa1, 0x6b, 0x0c, 0xb2, 0xa6,
- 0xbe, 0xfa, 0x44, 0x6d, 0xea, 0x9b, 0x33, 0xfd, 0x3a, 0xb7, 0x94, 0x36, 0x63, 0x1c, 0x35, 0xb5,
- 0xd5, 0x07, 0x60, 0x53, 0xdb, 0xbc, 0x19, 0xf6, 0x16, 0x71, 0xa1, 0x5f, 0x19, 0x21, 0xc9, 0xce,
- 0xf4, 0xc4, 0xbc, 0xe9, 0x76, 0x78, 0x7f, 0x21, 0xdf, 0xb4, 0xd0, 0x98, 0xda, 0x4c, 0x0b, 0xeb,
- 0x33, 0xa9, 0x69, 0xe1, 0x9c, 0x51, 0xcf, 0xb9, 0x45, 0x7e, 0x0f, 0xab, 0xb5, 0xc9, 0x89, 0x38,
- 0x86, 0x15, 0x0b, 0x46, 0xbe, 0xe1, 0x83, 0x6b, 0x65, 0x4a, 0xfd, 0x5f, 0x43, 0xcf, 0x1c, 0x58,
- 0x88, 0x61, 0xd0, 0x9c, 0x99, 0x6c, 0xb8, 0xb3, 0x88, 0x6d, 0x2a, 0x34, 0x7b, 0xa6, 0xa9, 0x70,
- 0xce, 0xd4, 0x60, 0x2a, 0x9c, 0xd7, 0x6a, 0x9d, 0x5b, 0xe4, 0x77, 0xb0, 0x32, 0xdb, 0xbb, 0xc8,
- 0xc7, 0xb3, 0x6e, 0xab, 0xb5, 0xc4, 0xa1, 0x73, 0x9d, 0x48, 0xa9, 0xfc, 0x15, 0xc0, 0xb4, 0x25,
- 0x11, 0x03, 0x1c, 0x6a, 0x2d, 0x71, 0xb8, 0x35, 0x9f, 0x59, 0xaa, 0xfa, 0x23, 0x6c, 0xcc, 0xc5,
- 0x7d, 0x62, 0x94, 0xde, 0x75, 0x9d, 0x63, 0xf8, 0xc3, 0x1b, 0xe5, 0xca, 0xbb, 0xfe, 0x00, 0xab,
- 0x35, 0x70, 0x34, 0xb3, 0x62, 0x11, 0x66, 0x9b, 0x59, 0xb1, 0x10, 0x5d, 0x55, 0xd5, 0x3e, 0xdf,
- 0x81, 0x15, 0xa1, 0xe1, 0xe7, 0x54, 0xec, 0x69, 0x4c, 0x7f, 0x0e, 0x68, 0xd3, 0x5b, 0x9e, 0xc8,
- 0xe4, 0xa4, 0x8d, 0x7f, 0x7c, 0x7f, 0xf2, 0xbf, 0x00, 0x00, 0x00, 0xff, 0xff, 0x39, 0x78, 0xfe,
- 0x97, 0x00, 0x16, 0x00, 0x00,
+ // 2142 bytes of a gzipped FileDescriptorProto
+ 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xb4, 0x59, 0x5f, 0x6f, 0xdb, 0xc8,
+ 0x11, 0x37, 0x25, 0x4b, 0x16, 0x47, 0x52, 0xce, 0x5e, 0x3b, 0x89, 0xa2, 0xc4, 0x89, 0x8f, 0x69,
+ 0xee, 0x5c, 0x24, 0x70, 0x53, 0xf7, 0x0a, 0xdc, 0xf5, 0xda, 0x07, 0xc7, 0x71, 0xae, 0x69, 0x12,
+ 0x9f, 0x41, 0x27, 0x87, 0x2b, 0x0a, 0x94, 0xa5, 0xc9, 0xb5, 0xbc, 0x35, 0x45, 0xb2, 0xbb, 0x4b,
+ 0xff, 0xb9, 0xa7, 0xfb, 0x1c, 0x05, 0xfa, 0xda, 0x4f, 0xd0, 0xc7, 0xa2, 0x2f, 0x45, 0x81, 0x02,
+ 0x45, 0xbf, 0x44, 0x3f, 0x49, 0xb1, 0xb3, 0x24, 0xb5, 0x14, 0x25, 0xfb, 0x82, 0xc3, 0xbd, 0x71,
+ 0x67, 0x66, 0x67, 0x67, 0xe7, 0xcf, 0x6f, 0x66, 0x25, 0xe8, 0x1e, 0xb3, 0x88, 0xf2, 0xad, 0x94,
+ 0x27, 0x32, 0x21, 0x1d, 0x5c, 0x78, 0xe9, 0x91, 0xf3, 0x25, 0xdc, 0x7d, 0x9d, 0x24, 0xa7, 0x59,
+ 0xfa, 0x9c, 0x71, 0x1a, 0xc8, 0x84, 0x5f, 0xee, 0xc5, 0x92, 0x5f, 0xba, 0xf4, 0x4f, 0x19, 0x15,
+ 0x92, 0xdc, 0x03, 0x3b, 0x2c, 0x18, 0x03, 0x6b, 0xc3, 0xda, 0xb4, 0xdd, 0x09, 0x81, 0x10, 0x58,
+ 0x8c, 0xfd, 0x31, 0x1d, 0x34, 0x90, 0x81, 0xdf, 0xce, 0x1e, 0xdc, 0x9b, 0xad, 0x50, 0xa4, 0x49,
+ 0x2c, 0x28, 0x79, 0x04, 0x2d, 0xaa, 0x08, 0xa8, 0xad, 0xbb, 0xfd, 0xc1, 0x56, 0x61, 0xca, 0x96,
+ 0x96, 0xd3, 0x5c, 0xe7, 0x1f, 0x16, 0x90, 0xd7, 0x4c, 0x48, 0x45, 0x64, 0x54, 0x7c, 0x37, 0x7b,
+ 0x6e, 0x41, 0x3b, 0xe5, 0xf4, 0x98, 0x5d, 0xe4, 0x16, 0xe5, 0x2b, 0xf2, 0x04, 0x56, 0x84, 0xf4,
+ 0xb9, 0x7c, 0xc1, 0x93, 0xf1, 0x0b, 0x16, 0xd1, 0x7d, 0x65, 0x74, 0x13, 0x45, 0xea, 0x0c, 0xb2,
+ 0x05, 0x84, 0xc5, 0x41, 0x94, 0x09, 0x76, 0x46, 0x0f, 0x0b, 0xee, 0x60, 0x71, 0xc3, 0xda, 0xec,
+ 0xb8, 0x33, 0x38, 0x64, 0x0d, 0x5a, 0x11, 0x1b, 0x33, 0x39, 0x68, 0x6d, 0x58, 0x9b, 0x7d, 0x57,
+ 0x2f, 0x9c, 0x5f, 0xc2, 0x6a, 0xc5, 0xfe, 0xf7, 0xbb, 0xfe, 0x5f, 0x1a, 0xd0, 0x42, 0x42, 0xe9,
+ 0x63, 0x6b, 0xe2, 0x63, 0xf2, 0x21, 0xf4, 0x98, 0xf0, 0x26, 0x8e, 0x68, 0xa0, 0x6d, 0x5d, 0x26,
+ 0x4a, 0x9f, 0x93, 0xc7, 0xd0, 0x0e, 0x4e, 0xb2, 0xf8, 0x54, 0x0c, 0x9a, 0x1b, 0xcd, 0xcd, 0xee,
+ 0xf6, 0xea, 0xe4, 0x20, 0x75, 0xd1, 0x5d, 0xc5, 0x73, 0x73, 0x11, 0xf2, 0x29, 0x80, 0x2f, 0x25,
+ 0x67, 0x47, 0x99, 0xa4, 0x02, 0x6f, 0xda, 0xdd, 0x1e, 0x18, 0x1b, 0x32, 0x41, 0x77, 0x4a, 0xbe,
+ 0x6b, 0xc8, 0x92, 0xcf, 0xa0, 0x43, 0x2f, 0x24, 0x8d, 0x43, 0x1a, 0x0e, 0x5a, 0x78, 0xd0, 0xfa,
+ 0xd4, 0x8d, 0xb6, 0xf6, 0x72, 0xbe, 0xbe, 0x5f, 0x29, 0x3e, 0xfc, 0x1c, 0xfa, 0x15, 0x16, 0x59,
+ 0x86, 0xe6, 0x29, 0x2d, 0xa2, 0xaa, 0x3e, 0x95, 0x67, 0xcf, 0xfc, 0x28, 0xd3, 0x09, 0xd6, 0x73,
+ 0xf5, 0xe2, 0x17, 0x8d, 0x4f, 0x2d, 0xe7, 0x39, 0xd8, 0x2f, 0xb2, 0x28, 0x2a, 0x37, 0x86, 0x8c,
+ 0x17, 0x1b, 0x43, 0xc6, 0x27, 0x5e, 0x6e, 0x5c, 0xe9, 0xe5, 0xbf, 0x5b, 0xb0, 0xb2, 0x77, 0x46,
+ 0x63, 0xb9, 0x9f, 0x48, 0x76, 0xcc, 0x02, 0x5f, 0xb2, 0x24, 0x26, 0x4f, 0xc0, 0x4e, 0xa2, 0xd0,
+ 0xbb, 0x32, 0x4c, 0x9d, 0x24, 0xca, 0xad, 0x7e, 0x02, 0x76, 0x4c, 0xcf, 0xbd, 0x2b, 0x8f, 0xeb,
+ 0xc4, 0xf4, 0x5c, 0x4b, 0x3f, 0x84, 0x7e, 0x48, 0x23, 0x2a, 0xa9, 0x57, 0x46, 0x47, 0x85, 0xae,
+ 0xa7, 0x89, 0xbb, 0x3a, 0x1c, 0x1f, 0xc1, 0x07, 0x4a, 0x65, 0xea, 0x73, 0x1a, 0x4b, 0x2f, 0xf5,
+ 0xe5, 0x09, 0xc6, 0xc4, 0x76, 0xfb, 0x31, 0x3d, 0x3f, 0x40, 0xea, 0x81, 0x2f, 0x4f, 0x9c, 0xbf,
+ 0x35, 0xc0, 0x2e, 0x83, 0x49, 0x6e, 0xc3, 0x92, 0x3a, 0xd6, 0x63, 0x61, 0xee, 0x89, 0xb6, 0x5a,
+ 0xbe, 0x0c, 0x55, 0x55, 0x24, 0xc7, 0xc7, 0x82, 0x4a, 0x34, 0xaf, 0xe9, 0xe6, 0x2b, 0x95, 0x59,
+ 0x82, 0x7d, 0xa3, 0x0b, 0x61, 0xd1, 0xc5, 0x6f, 0xe5, 0xf1, 0xb1, 0x64, 0x63, 0x8a, 0x07, 0x36,
+ 0x5d, 0xbd, 0x20, 0xab, 0xd0, 0xa2, 0x9e, 0xf4, 0x47, 0x98, 0xe1, 0xb6, 0xbb, 0x48, 0xdf, 0xfa,
+ 0x23, 0xf2, 0x23, 0xb8, 0x21, 0x92, 0x8c, 0x07, 0xd4, 0x2b, 0x8e, 0x6d, 0x23, 0xb7, 0xa7, 0xa9,
+ 0x2f, 0xf4, 0xe1, 0x0e, 0x34, 0x8f, 0x59, 0x38, 0x58, 0x42, 0xc7, 0x2c, 0x57, 0x93, 0xf0, 0x65,
+ 0xe8, 0x2a, 0x26, 0xf9, 0x09, 0x40, 0xa9, 0x29, 0x1c, 0x74, 0xe6, 0x88, 0xda, 0x85, 0xde, 0x90,
+ 0xac, 0x03, 0x04, 0x2c, 0x3d, 0xa1, 0xdc, 0x53, 0x09, 0x63, 0x63, 0x72, 0xd8, 0x9a, 0xf2, 0x8a,
+ 0x5e, 0x2a, 0x36, 0x13, 0xde, 0xe8, 0x1b, 0x96, 0xa6, 0x34, 0x1c, 0x00, 0x7a, 0xd8, 0x66, 0xe2,
+ 0x0b, 0x4d, 0x70, 0xbe, 0x86, 0x76, 0x6e, 0xdc, 0x5d, 0xb0, 0xcf, 0x92, 0x28, 0x1b, 0x97, 0x4e,
+ 0xeb, 0xbb, 0x1d, 0x4d, 0x78, 0x19, 0x92, 0x3b, 0x80, 0x28, 0x89, 0x47, 0x34, 0xd0, 0x45, 0xe8,
+ 0x5f, 0x75, 0xc0, 0x2d, 0x68, 0x07, 0x49, 0x72, 0xca, 0xb4, 0xef, 0x96, 0xdc, 0x7c, 0xe5, 0x7c,
+ 0xdb, 0x84, 0x1b, 0xd5, 0x62, 0x51, 0x47, 0xa0, 0x16, 0xf4, 0xb4, 0x85, 0x6a, 0x50, 0xed, 0x61,
+ 0xc5, 0xdb, 0x0d, 0xd3, 0xdb, 0xc5, 0x96, 0x71, 0x12, 0xea, 0x03, 0xfa, 0x7a, 0xcb, 0x9b, 0x24,
+ 0xa4, 0x2a, 0xd7, 0x33, 0x16, 0x62, 0x78, 0xfa, 0xae, 0xfa, 0x54, 0x94, 0x11, 0x0b, 0x73, 0xf0,
+ 0x51, 0x9f, 0x68, 0x1e, 0x47, 0xbd, 0x6d, 0x1d, 0x70, 0xbd, 0x52, 0x01, 0x1f, 0x2b, 0xea, 0x92,
+ 0x8e, 0xa2, 0xfa, 0x26, 0x1b, 0xd0, 0xe5, 0x34, 0x8d, 0xf2, 0xdc, 0x47, 0xe7, 0xdb, 0xae, 0x49,
+ 0x22, 0xf7, 0x01, 0x82, 0x24, 0x8a, 0x68, 0x80, 0x02, 0x36, 0x0a, 0x18, 0x14, 0x95, 0x77, 0x52,
+ 0x46, 0x9e, 0xa0, 0x01, 0xba, 0xba, 0xe5, 0xb6, 0xa5, 0x8c, 0x0e, 0x69, 0xa0, 0xee, 0x91, 0x09,
+ 0xca, 0x3d, 0x84, 0xaf, 0x2e, 0xee, 0xeb, 0x28, 0x02, 0x82, 0xec, 0x3a, 0xc0, 0x88, 0x27, 0x59,
+ 0xaa, 0xb9, 0xbd, 0x8d, 0xa6, 0x42, 0x72, 0xa4, 0x20, 0xfb, 0x11, 0xdc, 0x10, 0x97, 0xe3, 0x88,
+ 0xc5, 0xa7, 0x9e, 0xf4, 0xf9, 0x88, 0xca, 0x41, 0x5f, 0x57, 0x40, 0x4e, 0x7d, 0x8b, 0x44, 0x75,
+ 0xf7, 0x71, 0xf8, 0xf3, 0xc1, 0x0d, 0xcc, 0x00, 0xf5, 0xe9, 0xa4, 0x40, 0x76, 0x39, 0xf5, 0x25,
+ 0x7d, 0x8f, 0x36, 0xf6, 0xdd, 0xd0, 0x82, 0xdc, 0x84, 0x76, 0xe2, 0xd1, 0x8b, 0x20, 0xca, 0x8b,
+ 0xb6, 0x95, 0xec, 0x5d, 0x04, 0x91, 0xf3, 0x18, 0x56, 0x2b, 0x27, 0xe6, 0x40, 0xbf, 0x06, 0x2d,
+ 0xca, 0x79, 0x52, 0xc0, 0x92, 0x5e, 0x38, 0xbf, 0x05, 0xf2, 0x2e, 0x0d, 0x7f, 0x08, 0xf3, 0x9c,
+ 0x9b, 0xb0, 0x5a, 0x51, 0xad, 0xed, 0x70, 0xbe, 0xb5, 0x60, 0x6d, 0x27, 0x4d, 0x69, 0x1c, 0xbe,
+ 0x4d, 0xde, 0xe3, 0xd0, 0x75, 0x00, 0x54, 0xeb, 0x19, 0x0d, 0xde, 0x46, 0x0a, 0xc6, 0xe7, 0x7d,
+ 0xda, 0x8b, 0x73, 0x1b, 0x6e, 0x4e, 0x59, 0x90, 0xdb, 0xf6, 0x2f, 0x0b, 0xc8, 0x73, 0x44, 0xbe,
+ 0xef, 0x37, 0x74, 0x28, 0x2c, 0x52, 0x0d, 0x51, 0x23, 0x6b, 0xe8, 0x4b, 0x3f, 0x6f, 0xd7, 0x3d,
+ 0x26, 0xb4, 0xfe, 0xe7, 0xbe, 0xf4, 0xf3, 0xb6, 0xc9, 0x69, 0x90, 0x71, 0xd5, 0xc1, 0xb1, 0x64,
+ 0xb0, 0x6d, 0xba, 0x05, 0x89, 0x7c, 0x02, 0xb7, 0xd8, 0x28, 0x4e, 0x38, 0x9d, 0x88, 0x79, 0x3a,
+ 0x8c, 0x6d, 0x14, 0x5e, 0xd3, 0xdc, 0x72, 0xc3, 0x1e, 0x46, 0xf5, 0x31, 0xac, 0x56, 0xae, 0x71,
+ 0x65, 0x0a, 0xfc, 0xd9, 0x82, 0xc1, 0x8e, 0x4c, 0xc6, 0x2c, 0x70, 0xa9, 0x32, 0xbe, 0x72, 0xf5,
+ 0x87, 0xd0, 0x57, 0xbd, 0x67, 0xfa, 0xfa, 0xbd, 0x24, 0x0a, 0x27, 0xbd, 0xfd, 0x0e, 0xa8, 0xf6,
+ 0x63, 0x46, 0x66, 0x29, 0x89, 0x42, 0x8c, 0xcb, 0x43, 0x50, 0x3d, 0xc2, 0xd8, 0xaf, 0xa7, 0x9c,
+ 0x5e, 0x4c, 0xcf, 0x2b, 0xfb, 0x95, 0x10, 0xee, 0xd7, 0x8d, 0x65, 0x29, 0xa6, 0xe7, 0x6a, 0xbf,
+ 0x73, 0x17, 0xee, 0xcc, 0xb0, 0x2d, 0x0f, 0xd7, 0xbf, 0x2d, 0x58, 0xdd, 0x11, 0x82, 0x8d, 0xe2,
+ 0xaf, 0x10, 0x24, 0x0b, 0xa3, 0xd7, 0xa0, 0x15, 0x24, 0x59, 0x2c, 0xd1, 0xd8, 0x96, 0xab, 0x17,
+ 0x53, 0xb8, 0xd1, 0xa8, 0xe1, 0xc6, 0x14, 0xf2, 0x34, 0xeb, 0xc8, 0x63, 0x20, 0xcb, 0x62, 0x05,
+ 0x59, 0x1e, 0x40, 0x57, 0x05, 0xd9, 0x0b, 0x68, 0x2c, 0x29, 0xcf, 0xbb, 0x12, 0x28, 0xd2, 0x2e,
+ 0x52, 0x94, 0x80, 0xd9, 0x3d, 0x75, 0x63, 0x82, 0x74, 0xd2, 0x3a, 0xff, 0xa7, 0xaa, 0xa2, 0x72,
+ 0x95, 0x3c, 0x66, 0x73, 0xbb, 0xa8, 0x02, 0x5e, 0x1e, 0xe5, 0xf7, 0x50, 0x9f, 0xaa, 0x44, 0xd2,
+ 0xec, 0x28, 0x62, 0x81, 0xa7, 0x18, 0xda, 0x7e, 0x5b, 0x53, 0xde, 0xf1, 0x68, 0xe2, 0x95, 0x45,
+ 0xd3, 0x2b, 0x04, 0x16, 0xfd, 0x4c, 0x9e, 0x14, 0x9d, 0x54, 0x7d, 0x4f, 0x79, 0xaa, 0x7d, 0x9d,
+ 0xa7, 0x96, 0xea, 0x9e, 0x2a, 0x33, 0xad, 0x63, 0x66, 0xda, 0x27, 0xb0, 0xaa, 0x47, 0xf1, 0x6a,
+ 0xb8, 0xd6, 0x01, 0xca, 0xae, 0x27, 0x06, 0x96, 0x86, 0xde, 0xa2, 0xed, 0x09, 0xe7, 0x57, 0x60,
+ 0xbf, 0x4e, 0xb4, 0x5e, 0x41, 0x9e, 0x82, 0x1d, 0x15, 0x0b, 0x14, 0xed, 0x6e, 0x93, 0x49, 0xa9,
+ 0x17, 0x72, 0xee, 0x44, 0xc8, 0xf9, 0x1c, 0x3a, 0x05, 0xb9, 0xf0, 0x99, 0x35, 0xcf, 0x67, 0x8d,
+ 0x29, 0x9f, 0x39, 0xff, 0xb4, 0x60, 0xad, 0x6a, 0x72, 0x1e, 0x96, 0x77, 0xd0, 0x2f, 0x8f, 0xf0,
+ 0xc6, 0x7e, 0x9a, 0xdb, 0xf2, 0xd4, 0xb4, 0xa5, 0xbe, 0xad, 0x34, 0x50, 0xbc, 0xf1, 0x53, 0x9d,
+ 0xcb, 0xbd, 0xc8, 0x20, 0x0d, 0xdf, 0xc2, 0x4a, 0x4d, 0x64, 0xc6, 0x1c, 0xfa, 0x63, 0x73, 0x0e,
+ 0xad, 0x80, 0x5d, 0xb9, 0xdb, 0x1c, 0x4e, 0x3f, 0x83, 0xdb, 0x1a, 0x0e, 0x76, 0xcb, 0x18, 0x16,
+ 0xbe, 0xaf, 0x86, 0xda, 0x9a, 0x0e, 0xb5, 0x33, 0x84, 0x41, 0x7d, 0x6b, 0x5e, 0x7e, 0x23, 0x58,
+ 0x39, 0x94, 0xbe, 0x64, 0x42, 0xb2, 0xa0, 0x7c, 0x10, 0x4d, 0xe5, 0x86, 0x75, 0x5d, 0xff, 0xae,
+ 0xd7, 0xe1, 0x32, 0x34, 0xa5, 0x2c, 0xf2, 0x57, 0x7d, 0xaa, 0x28, 0x10, 0xf3, 0xa4, 0x3c, 0x06,
+ 0x3f, 0xc0, 0x51, 0x2a, 0x1f, 0x64, 0x22, 0xfd, 0x48, 0xcf, 0x47, 0x8b, 0x38, 0x1f, 0xd9, 0x48,
+ 0xc1, 0x01, 0x49, 0x8f, 0x10, 0xa1, 0xe6, 0xb6, 0xf4, 0xf4, 0xa4, 0x08, 0xc8, 0x5c, 0x07, 0xc0,
+ 0x52, 0xd5, 0x55, 0xd6, 0xd6, 0x7b, 0x15, 0x65, 0x57, 0x11, 0x9c, 0xfb, 0x70, 0xef, 0x0b, 0x2a,
+ 0x55, 0x37, 0xe2, 0xbb, 0x49, 0x7c, 0xcc, 0x46, 0x19, 0xf7, 0x8d, 0x50, 0x38, 0xff, 0xb1, 0x60,
+ 0x7d, 0x8e, 0x40, 0x7e, 0xe1, 0x01, 0x2c, 0x8d, 0x7d, 0x21, 0x29, 0x2f, 0xaa, 0xa4, 0x58, 0x4e,
+ 0xbb, 0xa2, 0x71, 0x9d, 0x2b, 0x9a, 0x35, 0x57, 0xdc, 0x84, 0xf6, 0xd8, 0xbf, 0xf0, 0xc6, 0x47,
+ 0xf9, 0x28, 0xd7, 0x1a, 0xfb, 0x17, 0x6f, 0x8e, 0x10, 0xd9, 0x18, 0xf7, 0x8e, 0xb2, 0xe0, 0x94,
+ 0x4a, 0x51, 0x22, 0x1b, 0xe3, 0xcf, 0x34, 0x05, 0x67, 0x3b, 0x1c, 0x74, 0x11, 0x06, 0x3a, 0x6e,
+ 0xbe, 0x72, 0xce, 0x61, 0x70, 0x98, 0x1d, 0x89, 0x80, 0xb3, 0x23, 0xfa, 0x86, 0x4a, 0x5f, 0x81,
+ 0x61, 0x91, 0x23, 0x0f, 0xa0, 0x1b, 0x44, 0x4c, 0xa1, 0xa1, 0xf1, 0x92, 0x04, 0x4d, 0xc2, 0xae,
+ 0x81, 0x70, 0x29, 0x4f, 0xbc, 0xca, 0xe3, 0x19, 0x14, 0xe9, 0x40, 0x3f, 0xa0, 0xef, 0x40, 0x47,
+ 0xb0, 0x38, 0xa0, 0x5e, 0xac, 0x5f, 0x2c, 0x4d, 0x77, 0x09, 0xd7, 0xfb, 0x42, 0xb5, 0xb3, 0x3b,
+ 0x33, 0x4e, 0xce, 0x5d, 0x78, 0x75, 0x2b, 0xff, 0x0d, 0x10, 0x7a, 0x86, 0x76, 0x19, 0xef, 0xaf,
+ 0xbc, 0xc8, 0xee, 0x1a, 0x63, 0xce, 0xf4, 0x13, 0xcd, 0x5d, 0xa1, 0xb5, 0x57, 0xdb, 0x2a, 0xb4,
+ 0xa4, 0x98, 0xd8, 0xb7, 0x28, 0xc5, 0xbe, 0x70, 0x7c, 0x05, 0x46, 0x23, 0x5d, 0xd6, 0xa5, 0x80,
+ 0x35, 0x11, 0x20, 0x4f, 0x80, 0xa4, 0x3e, 0x97, 0x4c, 0xa9, 0x50, 0x93, 0xbe, 0x77, 0xe2, 0x8b,
+ 0x13, 0xb4, 0xa0, 0xe5, 0x2e, 0x97, 0x9c, 0x57, 0xf4, 0xf2, 0xd7, 0xbe, 0x38, 0x51, 0xe0, 0x8d,
+ 0xc3, 0x45, 0x13, 0xe7, 0x4d, 0xfc, 0x76, 0x28, 0xac, 0xbd, 0xa2, 0x34, 0xdd, 0x4d, 0xe2, 0x98,
+ 0x06, 0x92, 0x86, 0x85, 0xd3, 0x67, 0xbd, 0xdb, 0xef, 0x82, 0x3d, 0xe2, 0x69, 0xe0, 0xa5, 0x09,
+ 0xd7, 0x8f, 0xb1, 0xbe, 0xdb, 0x51, 0x84, 0x83, 0x84, 0xe3, 0xd4, 0xc3, 0xa9, 0x7e, 0xe3, 0xe8,
+ 0xa9, 0xca, 0x76, 0x27, 0x04, 0x35, 0x43, 0x4d, 0x1d, 0x93, 0xa3, 0xc2, 0x4f, 0x15, 0xc8, 0x07,
+ 0xbe, 0xa4, 0xcf, 0x78, 0x72, 0x4a, 0x79, 0x71, 0xfc, 0x10, 0x3a, 0xc5, 0xe6, 0xdc, 0x84, 0x72,
+ 0xed, 0xfc, 0x17, 0x51, 0xd6, 0xdc, 0x33, 0x19, 0x58, 0x8e, 0x93, 0x2c, 0xd6, 0xad, 0xaf, 0xe3,
+ 0xea, 0x05, 0xd9, 0x33, 0x0d, 0x6b, 0x20, 0xee, 0x7e, 0x3c, 0x85, 0x80, 0x53, 0x8a, 0xb6, 0xdc,
+ 0x5c, 0xde, 0xb8, 0xc1, 0xf0, 0x6b, 0xe8, 0x14, 0x64, 0x35, 0xde, 0xa3, 0x23, 0xfc, 0x30, 0xe4,
+ 0x54, 0x08, 0x2a, 0x72, 0x1b, 0xfb, 0x8a, 0xba, 0x53, 0x10, 0x95, 0x58, 0xb1, 0x3f, 0xaf, 0x72,
+ 0x1d, 0x99, 0x7e, 0x41, 0xc5, 0x4a, 0xdf, 0xfe, 0x2b, 0x40, 0xef, 0x90, 0xfa, 0xe7, 0x94, 0x86,
+ 0x58, 0xcd, 0x64, 0x54, 0x74, 0x91, 0xea, 0x6f, 0x50, 0xe4, 0xd1, 0x74, 0xbb, 0x98, 0xf9, 0xa3,
+ 0xd7, 0xf0, 0xa3, 0xeb, 0xc4, 0x72, 0xd7, 0x2f, 0x90, 0x7d, 0xe8, 0x1a, 0x3f, 0xf2, 0x90, 0x7b,
+ 0xc6, 0xc6, 0xda, 0x6f, 0x57, 0xc3, 0xf5, 0x39, 0xdc, 0x42, 0xdb, 0x53, 0x8b, 0xbc, 0x86, 0xae,
+ 0xf1, 0x96, 0x30, 0xf5, 0xd5, 0x1f, 0x35, 0xa6, 0xbe, 0x19, 0x0f, 0x10, 0x67, 0x41, 0x69, 0x33,
+ 0x5e, 0x04, 0xa6, 0xb6, 0xfa, 0x1b, 0xc4, 0xd4, 0x36, 0xeb, 0x19, 0xb1, 0x40, 0x5c, 0xe8, 0x57,
+ 0xa6, 0x78, 0x72, 0x7f, 0xb2, 0x63, 0xd6, 0x03, 0x63, 0xf8, 0x60, 0x2e, 0xdf, 0xb4, 0xd0, 0x18,
+ 0x9c, 0x4d, 0x0b, 0xeb, 0xcf, 0x02, 0xd3, 0xc2, 0x19, 0xd3, 0xb6, 0xb3, 0x40, 0x7e, 0x0f, 0x2b,
+ 0xb5, 0xe1, 0x95, 0x38, 0x86, 0x15, 0x73, 0xa6, 0xee, 0xe1, 0xc3, 0x2b, 0x65, 0x4a, 0xfd, 0x5f,
+ 0x42, 0xcf, 0x9c, 0x19, 0x89, 0x61, 0xd0, 0x8c, 0xb1, 0x78, 0x78, 0x7f, 0x1e, 0xdb, 0x54, 0x68,
+ 0x8e, 0x2d, 0xa6, 0xc2, 0x19, 0x83, 0x9b, 0xa9, 0x70, 0xd6, 0xb4, 0xe3, 0x2c, 0x90, 0xdf, 0xc1,
+ 0xf2, 0xf4, 0xf8, 0x40, 0x3e, 0x9c, 0x76, 0x5b, 0x6d, 0x2a, 0x19, 0x3a, 0x57, 0x89, 0x94, 0xca,
+ 0x5f, 0x02, 0x4c, 0xa6, 0x02, 0x62, 0xe0, 0x73, 0x6d, 0x2a, 0x19, 0xde, 0x9b, 0xcd, 0x2c, 0x55,
+ 0xfd, 0x11, 0x6e, 0xce, 0x6c, 0xbd, 0xc4, 0x28, 0xbd, 0xab, 0x9a, 0xf7, 0xf0, 0xe3, 0x6b, 0xe5,
+ 0xca, 0xb3, 0xfe, 0x00, 0x2b, 0xb5, 0xfe, 0x64, 0x66, 0xc5, 0xbc, 0xb6, 0x69, 0x66, 0xc5, 0xdc,
+ 0x06, 0x87, 0x55, 0xfb, 0x15, 0xf4, 0x2b, 0xd8, 0x6c, 0x56, 0xc6, 0xac, 0xde, 0x60, 0x56, 0xc6,
+ 0x6c, 0x50, 0x5f, 0xd8, 0xb4, 0x9e, 0x5a, 0x3a, 0x3d, 0x26, 0xe8, 0x5a, 0x4d, 0x8f, 0x1a, 0xe4,
+ 0x57, 0xd3, 0xa3, 0x0e, 0xca, 0xce, 0xc2, 0xb3, 0xfb, 0xb0, 0x2c, 0x34, 0x4e, 0x1e, 0x8b, 0x2d,
+ 0xdd, 0xff, 0x9f, 0x01, 0x3a, 0xef, 0x80, 0x27, 0x32, 0x39, 0x6a, 0xe3, 0xbf, 0x03, 0x3f, 0xfb,
+ 0x7f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x32, 0x17, 0xbf, 0x34, 0x2c, 0x18, 0x00, 0x00,
}
diff --git a/weed/pb/filer_pb/filer_client.go b/weed/pb/filer_pb/filer_client.go
index 73b66472d..d42e20b34 100644
--- a/weed/pb/filer_pb/filer_client.go
+++ b/weed/pb/filer_pb/filer_client.go
@@ -83,10 +83,12 @@ func doList(filerClient FilerClient, fullDirPath util.FullPath, prefix string, f
}
glog.V(3).Infof("read directory: %v", request)
- stream, err := client.ListEntries(context.Background(), request)
+ ctx, cancel := context.WithCancel(context.Background())
+ stream, err := client.ListEntries(ctx, request)
if err != nil {
return fmt.Errorf("list %s: %v", fullDirPath, err)
}
+ defer cancel()
var prevEntry *Entry
for {
diff --git a/weed/pb/messaging.proto b/weed/pb/messaging.proto
index 165542681..689c22d29 100644
--- a/weed/pb/messaging.proto
+++ b/weed/pb/messaging.proto
@@ -15,12 +15,18 @@ service SeaweedMessaging {
rpc Publish (stream PublishRequest) returns (stream PublishResponse) {
}
+ rpc DeleteTopic (DeleteTopicRequest) returns (DeleteTopicResponse) {
+ }
+
rpc ConfigureTopic (ConfigureTopicRequest) returns (ConfigureTopicResponse) {
}
rpc GetTopicConfiguration (GetTopicConfigurationRequest) returns (GetTopicConfigurationResponse) {
}
+ rpc FindBroker (FindBrokerRequest) returns (FindBrokerResponse) {
+ }
+
}
//////////////////////////////////////////////////
@@ -44,6 +50,7 @@ message SubscriberMessage {
int64 message_id = 1;
}
AckMessage ack = 2;
+ bool is_close = 3;
}
message Message {
@@ -51,14 +58,11 @@ message Message {
bytes key = 2; // Message key
bytes value = 3; // Message payload
map<string, bytes> headers = 4; // Message headers
+ bool is_close = 5;
}
message BrokerMessage {
Message data = 1;
- message RedirectMessage {
- string new_broker = 1;
- }
- RedirectMessage redirect = 2;
}
message PublishRequest {
@@ -80,6 +84,14 @@ message PublishResponse {
string new_broker = 1;
}
RedirectMessage redirect = 2;
+ bool is_closed = 3;
+}
+
+message DeleteTopicRequest {
+ string namespace = 1;
+ string topic = 2;
+}
+message DeleteTopicResponse {
}
message ConfigureTopicRequest {
@@ -98,6 +110,16 @@ message GetTopicConfigurationResponse {
TopicConfiguration configuration = 1;
}
+message FindBrokerRequest {
+ string namespace = 1;
+ string topic = 2;
+ int32 parition = 3;
+}
+
+message FindBrokerResponse {
+ string broker = 1;
+}
+
message TopicConfiguration {
int32 partition_count = 1;
string collection = 2;
diff --git a/weed/pb/messaging_pb/messaging.pb.go b/weed/pb/messaging_pb/messaging.pb.go
index 57f316708..f42e2c2db 100644
--- a/weed/pb/messaging_pb/messaging.pb.go
+++ b/weed/pb/messaging_pb/messaging.pb.go
@@ -14,10 +14,14 @@ It has these top-level messages:
BrokerMessage
PublishRequest
PublishResponse
+ DeleteTopicRequest
+ DeleteTopicResponse
ConfigureTopicRequest
ConfigureTopicResponse
GetTopicConfigurationRequest
GetTopicConfigurationResponse
+ FindBrokerRequest
+ FindBrokerResponse
TopicConfiguration
*/
package messaging_pb
@@ -91,12 +95,13 @@ func (x TopicConfiguration_Partitioning) String() string {
return proto.EnumName(TopicConfiguration_Partitioning_name, int32(x))
}
func (TopicConfiguration_Partitioning) EnumDescriptor() ([]byte, []int) {
- return fileDescriptor0, []int{9, 0}
+ return fileDescriptor0, []int{13, 0}
}
type SubscriberMessage struct {
- Init *SubscriberMessage_InitMessage `protobuf:"bytes,1,opt,name=init" json:"init,omitempty"`
- Ack *SubscriberMessage_AckMessage `protobuf:"bytes,2,opt,name=ack" json:"ack,omitempty"`
+ Init *SubscriberMessage_InitMessage `protobuf:"bytes,1,opt,name=init" json:"init,omitempty"`
+ Ack *SubscriberMessage_AckMessage `protobuf:"bytes,2,opt,name=ack" json:"ack,omitempty"`
+ IsClose bool `protobuf:"varint,3,opt,name=is_close,json=isClose" json:"is_close,omitempty"`
}
func (m *SubscriberMessage) Reset() { *m = SubscriberMessage{} }
@@ -118,6 +123,13 @@ func (m *SubscriberMessage) GetAck() *SubscriberMessage_AckMessage {
return nil
}
+func (m *SubscriberMessage) GetIsClose() bool {
+ if m != nil {
+ return m.IsClose
+ }
+ return false
+}
+
type SubscriberMessage_InitMessage struct {
Namespace string `protobuf:"bytes,1,opt,name=namespace" json:"namespace,omitempty"`
Topic string `protobuf:"bytes,2,opt,name=topic" json:"topic,omitempty"`
@@ -197,6 +209,7 @@ type Message struct {
Key []byte `protobuf:"bytes,2,opt,name=key,proto3" json:"key,omitempty"`
Value []byte `protobuf:"bytes,3,opt,name=value,proto3" json:"value,omitempty"`
Headers map[string][]byte `protobuf:"bytes,4,rep,name=headers" json:"headers,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value,proto3"`
+ IsClose bool `protobuf:"varint,5,opt,name=is_close,json=isClose" json:"is_close,omitempty"`
}
func (m *Message) Reset() { *m = Message{} }
@@ -232,9 +245,15 @@ func (m *Message) GetHeaders() map[string][]byte {
return nil
}
+func (m *Message) GetIsClose() bool {
+ if m != nil {
+ return m.IsClose
+ }
+ return false
+}
+
type BrokerMessage struct {
- Data *Message `protobuf:"bytes,1,opt,name=data" json:"data,omitempty"`
- Redirect *BrokerMessage_RedirectMessage `protobuf:"bytes,2,opt,name=redirect" json:"redirect,omitempty"`
+ Data *Message `protobuf:"bytes,1,opt,name=data" json:"data,omitempty"`
}
func (m *BrokerMessage) Reset() { *m = BrokerMessage{} }
@@ -249,31 +268,6 @@ func (m *BrokerMessage) GetData() *Message {
return nil
}
-func (m *BrokerMessage) GetRedirect() *BrokerMessage_RedirectMessage {
- if m != nil {
- return m.Redirect
- }
- return nil
-}
-
-type BrokerMessage_RedirectMessage struct {
- NewBroker string `protobuf:"bytes,1,opt,name=new_broker,json=newBroker" json:"new_broker,omitempty"`
-}
-
-func (m *BrokerMessage_RedirectMessage) Reset() { *m = BrokerMessage_RedirectMessage{} }
-func (m *BrokerMessage_RedirectMessage) String() string { return proto.CompactTextString(m) }
-func (*BrokerMessage_RedirectMessage) ProtoMessage() {}
-func (*BrokerMessage_RedirectMessage) Descriptor() ([]byte, []int) {
- return fileDescriptor0, []int{2, 0}
-}
-
-func (m *BrokerMessage_RedirectMessage) GetNewBroker() string {
- if m != nil {
- return m.NewBroker
- }
- return ""
-}
-
type PublishRequest struct {
Init *PublishRequest_InitMessage `protobuf:"bytes,1,opt,name=init" json:"init,omitempty"`
Data *Message `protobuf:"bytes,2,opt,name=data" json:"data,omitempty"`
@@ -333,6 +327,7 @@ func (m *PublishRequest_InitMessage) GetPartition() int32 {
type PublishResponse struct {
Config *PublishResponse_ConfigMessage `protobuf:"bytes,1,opt,name=config" json:"config,omitempty"`
Redirect *PublishResponse_RedirectMessage `protobuf:"bytes,2,opt,name=redirect" json:"redirect,omitempty"`
+ IsClosed bool `protobuf:"varint,3,opt,name=is_closed,json=isClosed" json:"is_closed,omitempty"`
}
func (m *PublishResponse) Reset() { *m = PublishResponse{} }
@@ -354,6 +349,13 @@ func (m *PublishResponse) GetRedirect() *PublishResponse_RedirectMessage {
return nil
}
+func (m *PublishResponse) GetIsClosed() bool {
+ if m != nil {
+ return m.IsClosed
+ }
+ return false
+}
+
type PublishResponse_ConfigMessage struct {
PartitionCount int32 `protobuf:"varint,1,opt,name=partition_count,json=partitionCount" json:"partition_count,omitempty"`
}
@@ -390,6 +392,38 @@ func (m *PublishResponse_RedirectMessage) GetNewBroker() string {
return ""
}
+type DeleteTopicRequest struct {
+ Namespace string `protobuf:"bytes,1,opt,name=namespace" json:"namespace,omitempty"`
+ Topic string `protobuf:"bytes,2,opt,name=topic" json:"topic,omitempty"`
+}
+
+func (m *DeleteTopicRequest) Reset() { *m = DeleteTopicRequest{} }
+func (m *DeleteTopicRequest) String() string { return proto.CompactTextString(m) }
+func (*DeleteTopicRequest) ProtoMessage() {}
+func (*DeleteTopicRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{5} }
+
+func (m *DeleteTopicRequest) GetNamespace() string {
+ if m != nil {
+ return m.Namespace
+ }
+ return ""
+}
+
+func (m *DeleteTopicRequest) GetTopic() string {
+ if m != nil {
+ return m.Topic
+ }
+ return ""
+}
+
+type DeleteTopicResponse struct {
+}
+
+func (m *DeleteTopicResponse) Reset() { *m = DeleteTopicResponse{} }
+func (m *DeleteTopicResponse) String() string { return proto.CompactTextString(m) }
+func (*DeleteTopicResponse) ProtoMessage() {}
+func (*DeleteTopicResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{6} }
+
type ConfigureTopicRequest struct {
Namespace string `protobuf:"bytes,1,opt,name=namespace" json:"namespace,omitempty"`
Topic string `protobuf:"bytes,2,opt,name=topic" json:"topic,omitempty"`
@@ -399,7 +433,7 @@ type ConfigureTopicRequest struct {
func (m *ConfigureTopicRequest) Reset() { *m = ConfigureTopicRequest{} }
func (m *ConfigureTopicRequest) String() string { return proto.CompactTextString(m) }
func (*ConfigureTopicRequest) ProtoMessage() {}
-func (*ConfigureTopicRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{5} }
+func (*ConfigureTopicRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{7} }
func (m *ConfigureTopicRequest) GetNamespace() string {
if m != nil {
@@ -428,7 +462,7 @@ type ConfigureTopicResponse struct {
func (m *ConfigureTopicResponse) Reset() { *m = ConfigureTopicResponse{} }
func (m *ConfigureTopicResponse) String() string { return proto.CompactTextString(m) }
func (*ConfigureTopicResponse) ProtoMessage() {}
-func (*ConfigureTopicResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{6} }
+func (*ConfigureTopicResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{8} }
type GetTopicConfigurationRequest struct {
Namespace string `protobuf:"bytes,1,opt,name=namespace" json:"namespace,omitempty"`
@@ -438,7 +472,7 @@ type GetTopicConfigurationRequest struct {
func (m *GetTopicConfigurationRequest) Reset() { *m = GetTopicConfigurationRequest{} }
func (m *GetTopicConfigurationRequest) String() string { return proto.CompactTextString(m) }
func (*GetTopicConfigurationRequest) ProtoMessage() {}
-func (*GetTopicConfigurationRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{7} }
+func (*GetTopicConfigurationRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{9} }
func (m *GetTopicConfigurationRequest) GetNamespace() string {
if m != nil {
@@ -461,7 +495,7 @@ type GetTopicConfigurationResponse struct {
func (m *GetTopicConfigurationResponse) Reset() { *m = GetTopicConfigurationResponse{} }
func (m *GetTopicConfigurationResponse) String() string { return proto.CompactTextString(m) }
func (*GetTopicConfigurationResponse) ProtoMessage() {}
-func (*GetTopicConfigurationResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{8} }
+func (*GetTopicConfigurationResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{10} }
func (m *GetTopicConfigurationResponse) GetConfiguration() *TopicConfiguration {
if m != nil {
@@ -470,6 +504,54 @@ func (m *GetTopicConfigurationResponse) GetConfiguration() *TopicConfiguration {
return nil
}
+type FindBrokerRequest struct {
+ Namespace string `protobuf:"bytes,1,opt,name=namespace" json:"namespace,omitempty"`
+ Topic string `protobuf:"bytes,2,opt,name=topic" json:"topic,omitempty"`
+ Parition int32 `protobuf:"varint,3,opt,name=parition" json:"parition,omitempty"`
+}
+
+func (m *FindBrokerRequest) Reset() { *m = FindBrokerRequest{} }
+func (m *FindBrokerRequest) String() string { return proto.CompactTextString(m) }
+func (*FindBrokerRequest) ProtoMessage() {}
+func (*FindBrokerRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{11} }
+
+func (m *FindBrokerRequest) GetNamespace() string {
+ if m != nil {
+ return m.Namespace
+ }
+ return ""
+}
+
+func (m *FindBrokerRequest) GetTopic() string {
+ if m != nil {
+ return m.Topic
+ }
+ return ""
+}
+
+func (m *FindBrokerRequest) GetParition() int32 {
+ if m != nil {
+ return m.Parition
+ }
+ return 0
+}
+
+type FindBrokerResponse struct {
+ Broker string `protobuf:"bytes,1,opt,name=broker" json:"broker,omitempty"`
+}
+
+func (m *FindBrokerResponse) Reset() { *m = FindBrokerResponse{} }
+func (m *FindBrokerResponse) String() string { return proto.CompactTextString(m) }
+func (*FindBrokerResponse) ProtoMessage() {}
+func (*FindBrokerResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{12} }
+
+func (m *FindBrokerResponse) GetBroker() string {
+ if m != nil {
+ return m.Broker
+ }
+ return ""
+}
+
type TopicConfiguration struct {
PartitionCount int32 `protobuf:"varint,1,opt,name=partition_count,json=partitionCount" json:"partition_count,omitempty"`
Collection string `protobuf:"bytes,2,opt,name=collection" json:"collection,omitempty"`
@@ -481,7 +563,7 @@ type TopicConfiguration struct {
func (m *TopicConfiguration) Reset() { *m = TopicConfiguration{} }
func (m *TopicConfiguration) String() string { return proto.CompactTextString(m) }
func (*TopicConfiguration) ProtoMessage() {}
-func (*TopicConfiguration) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{9} }
+func (*TopicConfiguration) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{13} }
func (m *TopicConfiguration) GetPartitionCount() int32 {
if m != nil {
@@ -524,16 +606,19 @@ func init() {
proto.RegisterType((*SubscriberMessage_AckMessage)(nil), "messaging_pb.SubscriberMessage.AckMessage")
proto.RegisterType((*Message)(nil), "messaging_pb.Message")
proto.RegisterType((*BrokerMessage)(nil), "messaging_pb.BrokerMessage")
- proto.RegisterType((*BrokerMessage_RedirectMessage)(nil), "messaging_pb.BrokerMessage.RedirectMessage")
proto.RegisterType((*PublishRequest)(nil), "messaging_pb.PublishRequest")
proto.RegisterType((*PublishRequest_InitMessage)(nil), "messaging_pb.PublishRequest.InitMessage")
proto.RegisterType((*PublishResponse)(nil), "messaging_pb.PublishResponse")
proto.RegisterType((*PublishResponse_ConfigMessage)(nil), "messaging_pb.PublishResponse.ConfigMessage")
proto.RegisterType((*PublishResponse_RedirectMessage)(nil), "messaging_pb.PublishResponse.RedirectMessage")
+ proto.RegisterType((*DeleteTopicRequest)(nil), "messaging_pb.DeleteTopicRequest")
+ proto.RegisterType((*DeleteTopicResponse)(nil), "messaging_pb.DeleteTopicResponse")
proto.RegisterType((*ConfigureTopicRequest)(nil), "messaging_pb.ConfigureTopicRequest")
proto.RegisterType((*ConfigureTopicResponse)(nil), "messaging_pb.ConfigureTopicResponse")
proto.RegisterType((*GetTopicConfigurationRequest)(nil), "messaging_pb.GetTopicConfigurationRequest")
proto.RegisterType((*GetTopicConfigurationResponse)(nil), "messaging_pb.GetTopicConfigurationResponse")
+ proto.RegisterType((*FindBrokerRequest)(nil), "messaging_pb.FindBrokerRequest")
+ proto.RegisterType((*FindBrokerResponse)(nil), "messaging_pb.FindBrokerResponse")
proto.RegisterType((*TopicConfiguration)(nil), "messaging_pb.TopicConfiguration")
proto.RegisterEnum("messaging_pb.SubscriberMessage_InitMessage_StartPosition", SubscriberMessage_InitMessage_StartPosition_name, SubscriberMessage_InitMessage_StartPosition_value)
proto.RegisterEnum("messaging_pb.TopicConfiguration_Partitioning", TopicConfiguration_Partitioning_name, TopicConfiguration_Partitioning_value)
@@ -552,8 +637,10 @@ const _ = grpc.SupportPackageIsVersion4
type SeaweedMessagingClient interface {
Subscribe(ctx context.Context, opts ...grpc.CallOption) (SeaweedMessaging_SubscribeClient, error)
Publish(ctx context.Context, opts ...grpc.CallOption) (SeaweedMessaging_PublishClient, error)
+ DeleteTopic(ctx context.Context, in *DeleteTopicRequest, opts ...grpc.CallOption) (*DeleteTopicResponse, error)
ConfigureTopic(ctx context.Context, in *ConfigureTopicRequest, opts ...grpc.CallOption) (*ConfigureTopicResponse, error)
GetTopicConfiguration(ctx context.Context, in *GetTopicConfigurationRequest, opts ...grpc.CallOption) (*GetTopicConfigurationResponse, error)
+ FindBroker(ctx context.Context, in *FindBrokerRequest, opts ...grpc.CallOption) (*FindBrokerResponse, error)
}
type seaweedMessagingClient struct {
@@ -626,6 +713,15 @@ func (x *seaweedMessagingPublishClient) Recv() (*PublishResponse, error) {
return m, nil
}
+func (c *seaweedMessagingClient) DeleteTopic(ctx context.Context, in *DeleteTopicRequest, opts ...grpc.CallOption) (*DeleteTopicResponse, error) {
+ out := new(DeleteTopicResponse)
+ err := grpc.Invoke(ctx, "/messaging_pb.SeaweedMessaging/DeleteTopic", in, out, c.cc, opts...)
+ if err != nil {
+ return nil, err
+ }
+ return out, nil
+}
+
func (c *seaweedMessagingClient) ConfigureTopic(ctx context.Context, in *ConfigureTopicRequest, opts ...grpc.CallOption) (*ConfigureTopicResponse, error) {
out := new(ConfigureTopicResponse)
err := grpc.Invoke(ctx, "/messaging_pb.SeaweedMessaging/ConfigureTopic", in, out, c.cc, opts...)
@@ -644,13 +740,24 @@ func (c *seaweedMessagingClient) GetTopicConfiguration(ctx context.Context, in *
return out, nil
}
+func (c *seaweedMessagingClient) FindBroker(ctx context.Context, in *FindBrokerRequest, opts ...grpc.CallOption) (*FindBrokerResponse, error) {
+ out := new(FindBrokerResponse)
+ err := grpc.Invoke(ctx, "/messaging_pb.SeaweedMessaging/FindBroker", in, out, c.cc, opts...)
+ if err != nil {
+ return nil, err
+ }
+ return out, nil
+}
+
// Server API for SeaweedMessaging service
type SeaweedMessagingServer interface {
Subscribe(SeaweedMessaging_SubscribeServer) error
Publish(SeaweedMessaging_PublishServer) error
+ DeleteTopic(context.Context, *DeleteTopicRequest) (*DeleteTopicResponse, error)
ConfigureTopic(context.Context, *ConfigureTopicRequest) (*ConfigureTopicResponse, error)
GetTopicConfiguration(context.Context, *GetTopicConfigurationRequest) (*GetTopicConfigurationResponse, error)
+ FindBroker(context.Context, *FindBrokerRequest) (*FindBrokerResponse, error)
}
func RegisterSeaweedMessagingServer(s *grpc.Server, srv SeaweedMessagingServer) {
@@ -709,6 +816,24 @@ func (x *seaweedMessagingPublishServer) Recv() (*PublishRequest, error) {
return m, nil
}
+func _SeaweedMessaging_DeleteTopic_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
+ in := new(DeleteTopicRequest)
+ if err := dec(in); err != nil {
+ return nil, err
+ }
+ if interceptor == nil {
+ return srv.(SeaweedMessagingServer).DeleteTopic(ctx, in)
+ }
+ info := &grpc.UnaryServerInfo{
+ Server: srv,
+ FullMethod: "/messaging_pb.SeaweedMessaging/DeleteTopic",
+ }
+ handler := func(ctx context.Context, req interface{}) (interface{}, error) {
+ return srv.(SeaweedMessagingServer).DeleteTopic(ctx, req.(*DeleteTopicRequest))
+ }
+ return interceptor(ctx, in, info, handler)
+}
+
func _SeaweedMessaging_ConfigureTopic_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ConfigureTopicRequest)
if err := dec(in); err != nil {
@@ -745,11 +870,33 @@ func _SeaweedMessaging_GetTopicConfiguration_Handler(srv interface{}, ctx contex
return interceptor(ctx, in, info, handler)
}
+func _SeaweedMessaging_FindBroker_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
+ in := new(FindBrokerRequest)
+ if err := dec(in); err != nil {
+ return nil, err
+ }
+ if interceptor == nil {
+ return srv.(SeaweedMessagingServer).FindBroker(ctx, in)
+ }
+ info := &grpc.UnaryServerInfo{
+ Server: srv,
+ FullMethod: "/messaging_pb.SeaweedMessaging/FindBroker",
+ }
+ handler := func(ctx context.Context, req interface{}) (interface{}, error) {
+ return srv.(SeaweedMessagingServer).FindBroker(ctx, req.(*FindBrokerRequest))
+ }
+ return interceptor(ctx, in, info, handler)
+}
+
var _SeaweedMessaging_serviceDesc = grpc.ServiceDesc{
ServiceName: "messaging_pb.SeaweedMessaging",
HandlerType: (*SeaweedMessagingServer)(nil),
Methods: []grpc.MethodDesc{
{
+ MethodName: "DeleteTopic",
+ Handler: _SeaweedMessaging_DeleteTopic_Handler,
+ },
+ {
MethodName: "ConfigureTopic",
Handler: _SeaweedMessaging_ConfigureTopic_Handler,
},
@@ -757,6 +904,10 @@ var _SeaweedMessaging_serviceDesc = grpc.ServiceDesc{
MethodName: "GetTopicConfiguration",
Handler: _SeaweedMessaging_GetTopicConfiguration_Handler,
},
+ {
+ MethodName: "FindBroker",
+ Handler: _SeaweedMessaging_FindBroker_Handler,
+ },
},
Streams: []grpc.StreamDesc{
{
@@ -778,62 +929,68 @@ var _SeaweedMessaging_serviceDesc = grpc.ServiceDesc{
func init() { proto.RegisterFile("messaging.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{
- // 898 bytes of a gzipped FileDescriptorProto
- 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xb4, 0x56, 0x5d, 0x6f, 0xe3, 0x44,
- 0x17, 0xee, 0x38, 0xe9, 0x47, 0x8e, 0x93, 0x34, 0xef, 0xd1, 0x5b, 0x14, 0x99, 0x16, 0x82, 0x17,
- 0x41, 0xa0, 0xc2, 0xaa, 0xc2, 0x4d, 0x59, 0xad, 0xb4, 0x6a, 0xab, 0xb2, 0x1b, 0xd1, 0x76, 0xa3,
- 0x49, 0x6e, 0x91, 0xe5, 0x38, 0xb3, 0xe9, 0xa8, 0xc9, 0x38, 0x78, 0x26, 0x5b, 0xf5, 0x1a, 0x6e,
- 0xb9, 0xe2, 0xaf, 0xc0, 0x0f, 0xe0, 0x37, 0x70, 0xc7, 0xaf, 0x41, 0x1e, 0x7f, 0xc4, 0x4e, 0xb2,
- 0xe9, 0x52, 0x89, 0x3b, 0xfb, 0xcc, 0x73, 0x9e, 0xf3, 0x9c, 0x2f, 0x8f, 0x61, 0x7f, 0xca, 0xa4,
- 0xf4, 0xc6, 0x5c, 0x8c, 0x9d, 0x59, 0x18, 0xa8, 0x00, 0xab, 0x99, 0xc1, 0x9d, 0x0d, 0xed, 0x9f,
- 0xcb, 0xf0, 0xbf, 0xfe, 0x7c, 0x28, 0xfd, 0x90, 0x0f, 0x59, 0x78, 0xad, 0x8f, 0x18, 0xbe, 0x84,
- 0x32, 0x17, 0x5c, 0x35, 0x49, 0x8b, 0xb4, 0xcd, 0xce, 0xb1, 0x93, 0x77, 0x71, 0x56, 0xe0, 0x4e,
- 0x57, 0x70, 0x95, 0x3c, 0x53, 0xed, 0x88, 0x2f, 0xa0, 0xe4, 0xf9, 0x77, 0x4d, 0x43, 0xfb, 0x7f,
- 0xfd, 0x98, 0xff, 0x99, 0x7f, 0x97, 0xba, 0x47, 0x6e, 0xd6, 0x9f, 0x06, 0x98, 0x39, 0x4e, 0x3c,
- 0x84, 0x8a, 0xf0, 0xa6, 0x4c, 0xce, 0x3c, 0x9f, 0x69, 0x4d, 0x15, 0xba, 0x30, 0xe0, 0xff, 0x61,
- 0x5b, 0x05, 0x33, 0xee, 0xeb, 0x68, 0x15, 0x1a, 0xbf, 0x44, 0x3e, 0x33, 0x2f, 0x54, 0x5c, 0xf1,
- 0x40, 0x34, 0x4b, 0x2d, 0xd2, 0xde, 0xa6, 0x0b, 0x03, 0xba, 0x50, 0x93, 0xca, 0x0b, 0x55, 0x2f,
- 0x90, 0x31, 0xa2, 0xdc, 0x22, 0xed, 0x7a, 0xe7, 0xbb, 0x7f, 0x91, 0xa9, 0xd3, 0xcf, 0x13, 0xd0,
- 0x22, 0x1f, 0xb6, 0xc0, 0x54, 0x7c, 0xca, 0xa4, 0xf2, 0xa6, 0xb3, 0x1b, 0xd9, 0xdc, 0x6e, 0x91,
- 0x76, 0x89, 0xe6, 0x4d, 0xf8, 0x0c, 0x6a, 0x32, 0xe3, 0x77, 0xf9, 0xa8, 0xb9, 0xa3, 0xe5, 0x57,
- 0x17, 0xc6, 0xee, 0xc8, 0x3e, 0x85, 0x5a, 0x21, 0x0c, 0x02, 0xec, 0x5c, 0x9d, 0x0d, 0x2e, 0xfb,
- 0x83, 0xc6, 0x16, 0x56, 0x61, 0xef, 0xf2, 0x8c, 0x5e, 0x75, 0xa3, 0x37, 0x82, 0x35, 0xa8, 0x0c,
- 0xba, 0xd7, 0x97, 0xfd, 0xc1, 0xd9, 0x75, 0xaf, 0x61, 0x58, 0xc7, 0x00, 0x8b, 0xb2, 0xe2, 0x11,
- 0x40, 0x9c, 0x19, 0x8b, 0x22, 0x11, 0xad, 0xa6, 0x92, 0x58, 0xba, 0x23, 0xfb, 0x2f, 0x02, 0xbb,
- 0x29, 0xf4, 0x0b, 0xa8, 0xb1, 0x77, 0x4c, 0x28, 0x37, 0x12, 0xeb, 0x0a, 0x19, 0xa3, 0xcf, 0x8d,
- 0x13, 0x42, 0x4d, 0x7d, 0x30, 0xe0, 0x53, 0x76, 0x23, 0xb1, 0x01, 0xa5, 0x3b, 0xf6, 0xa0, 0x8b,
- 0x5e, 0xa5, 0xd1, 0x63, 0xd4, 0x88, 0x77, 0xde, 0x64, 0xce, 0x74, 0xb9, 0xab, 0x34, 0x7e, 0xc1,
- 0x17, 0xb0, 0x7b, 0xcb, 0xbc, 0x11, 0x0b, 0x65, 0xb3, 0xdc, 0x2a, 0xb5, 0xcd, 0x8e, 0x5d, 0x2c,
- 0x72, 0x5a, 0xce, 0xd7, 0x31, 0xe8, 0x52, 0xa8, 0xf0, 0x81, 0xa6, 0x2e, 0xd6, 0x73, 0xa8, 0xe6,
- 0x0f, 0xd2, 0xa8, 0xf1, 0x10, 0x14, 0xa3, 0x1a, 0xb9, 0xa8, 0xcf, 0x8d, 0x53, 0x62, 0xff, 0x41,
- 0xa0, 0x76, 0x1e, 0x06, 0x77, 0x8b, 0xb9, 0xfe, 0x0a, 0xca, 0x23, 0x4f, 0x79, 0xc9, 0x5c, 0x1f,
- 0xac, 0x15, 0x42, 0x35, 0x04, 0x5f, 0xc1, 0x5e, 0xc8, 0x46, 0x3c, 0x64, 0xbe, 0x4a, 0xc6, 0x78,
- 0x69, 0x0d, 0x0a, 0xcc, 0x0e, 0x4d, 0xb0, 0x29, 0x49, 0xe6, 0x6c, 0x9d, 0xc0, 0xfe, 0xd2, 0x61,
- 0xd4, 0x0d, 0xc1, 0xee, 0xdd, 0xa1, 0x66, 0xc8, 0x06, 0x9a, 0xdd, 0xc7, 0x94, 0xf6, 0xdf, 0x04,
- 0xea, 0xbd, 0xf9, 0x70, 0xc2, 0xe5, 0x2d, 0x65, 0x3f, 0xcd, 0x99, 0x8c, 0xf6, 0x29, 0xbf, 0x90,
- 0xed, 0xa2, 0x92, 0x22, 0x76, 0xcd, 0x36, 0xa6, 0x69, 0x1b, 0x8f, 0xa6, 0x6d, 0xb9, 0xff, 0xf1,
- 0xe6, 0xd9, 0xbf, 0x1a, 0xb0, 0x9f, 0x09, 0x96, 0xb3, 0x40, 0x48, 0x86, 0x17, 0xb0, 0xe3, 0x07,
- 0xe2, 0x2d, 0x1f, 0xaf, 0xff, 0xe0, 0x2c, 0xc1, 0x9d, 0x0b, 0x8d, 0x4d, 0x75, 0x27, 0xae, 0xd8,
- 0x5d, 0x69, 0xd8, 0x37, 0x9b, 0x69, 0xde, 0xdf, 0xb2, 0x53, 0xa8, 0x15, 0x62, 0xe0, 0x97, 0xb0,
- 0x9f, 0x65, 0xe0, 0xfa, 0xc1, 0x5c, 0xc4, 0x9d, 0xd8, 0xa6, 0xf5, 0xcc, 0x7c, 0x11, 0x59, 0x9f,
- 0xd0, 0xec, 0xdf, 0x08, 0x1c, 0xc4, 0xc1, 0xe6, 0x21, 0x1b, 0x44, 0x05, 0x4c, 0x7b, 0xfe, 0x94,
- 0xda, 0x7f, 0x0f, 0x35, 0x3f, 0x21, 0xf3, 0xb2, 0xfa, 0x9b, 0x9d, 0x56, 0xb1, 0x12, 0x3a, 0xcc,
- 0x45, 0x1e, 0x47, 0x8b, 0x6e, 0x76, 0x13, 0x3e, 0x5a, 0x16, 0x15, 0x57, 0xcd, 0xa6, 0x70, 0xf8,
- 0x8a, 0xa9, 0x35, 0x0c, 0x4f, 0x57, 0x6d, 0x8f, 0xe1, 0xe8, 0x3d, 0x9c, 0xc9, 0x80, 0xac, 0xa4,
- 0x45, 0x9e, 0x96, 0xd6, 0xef, 0x06, 0xe0, 0x2a, 0xea, 0x83, 0xdb, 0x8b, 0x9f, 0x00, 0xf8, 0xc1,
- 0x64, 0xc2, 0x7c, 0x2d, 0x22, 0xce, 0x21, 0x67, 0x89, 0xbe, 0xfa, 0x21, 0x9b, 0x4d, 0xb8, 0xbf,
- 0x28, 0x7e, 0x85, 0xe6, 0x4d, 0xf8, 0x19, 0x54, 0xb9, 0x74, 0x55, 0xe8, 0x09, 0xc9, 0x99, 0x50,
- 0xfa, 0xde, 0xd9, 0xa3, 0x26, 0x97, 0x83, 0xd4, 0x84, 0x6f, 0xc0, 0x8c, 0xc3, 0x06, 0x82, 0x8b,
- 0xb1, 0xbe, 0x3a, 0xea, 0xcb, 0xb3, 0xbc, 0x9a, 0x84, 0xd3, 0x4b, 0xa5, 0x72, 0x31, 0xa6, 0x79,
- 0x06, 0xfb, 0x25, 0x54, 0xf3, 0x87, 0x88, 0x50, 0xbf, 0x09, 0xc4, 0xcd, 0x7c, 0x32, 0xf9, 0x81,
- 0x3d, 0xbc, 0xf6, 0xe4, 0x6d, 0x63, 0x0b, 0x4d, 0xd8, 0x4d, 0x5f, 0x08, 0xd6, 0x01, 0x68, 0x30,
- 0x17, 0x23, 0x1a, 0x0c, 0xb9, 0x68, 0x18, 0x9d, 0x5f, 0x4a, 0xd0, 0xe8, 0x33, 0xef, 0x9e, 0xb1,
- 0xd1, 0x75, 0xaa, 0x02, 0xdf, 0x40, 0x25, 0xbb, 0x1f, 0xf1, 0xd3, 0x47, 0x2e, 0x4e, 0xeb, 0xe3,
- 0x0d, 0x1f, 0x4f, 0x7b, 0xab, 0x4d, 0x4e, 0x08, 0x5e, 0xc1, 0x6e, 0xb2, 0xa2, 0x78, 0xb8, 0xe9,
- 0x03, 0x67, 0x1d, 0x6d, 0xdc, 0xeb, 0x84, 0xed, 0x47, 0xa8, 0x17, 0x27, 0x18, 0x9f, 0x15, 0xdd,
- 0xd6, 0x2e, 0x9d, 0xf5, 0xf9, 0x66, 0x50, 0x1a, 0x02, 0x43, 0x38, 0x58, 0x3b, 0xb2, 0xb8, 0xf4,
- 0xb3, 0xb3, 0x69, 0x57, 0xac, 0xe3, 0x0f, 0xc2, 0xa6, 0x31, 0xcf, 0x6d, 0x68, 0xc8, 0xb8, 0x0b,
- 0x6f, 0xa5, 0xe3, 0x4f, 0xa2, 0x61, 0x39, 0xaf, 0x67, 0x0d, 0xe9, 0x45, 0x7f, 0x77, 0xc3, 0x1d,
- 0xfd, 0x93, 0xf7, 0xed, 0x3f, 0x01, 0x00, 0x00, 0xff, 0xff, 0x3b, 0xcc, 0x19, 0xa5, 0xf7, 0x09,
- 0x00, 0x00,
+ // 1002 bytes of a gzipped FileDescriptorProto
+ 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xb4, 0x56, 0xdd, 0x6e, 0xe3, 0x54,
+ 0x10, 0xae, 0xdd, 0xfc, 0x8e, 0x93, 0x34, 0x3b, 0xd0, 0x55, 0xf0, 0xb6, 0x90, 0xf5, 0x22, 0x08,
+ 0x14, 0xa2, 0x2a, 0xdc, 0x94, 0x6a, 0xa5, 0x55, 0x1b, 0xba, 0x34, 0xa2, 0xed, 0x86, 0x93, 0xdc,
+ 0x22, 0xcb, 0xb1, 0xcf, 0xa6, 0x47, 0x75, 0x8e, 0x8d, 0x8f, 0xb3, 0x55, 0x9f, 0x83, 0x7b, 0x1e,
+ 0x00, 0x89, 0x3b, 0x5e, 0x80, 0xd7, 0xe0, 0x21, 0x78, 0x06, 0xe4, 0xdf, 0xd8, 0x49, 0x36, 0x5d,
+ 0xb6, 0xda, 0xbb, 0x9c, 0xc9, 0x37, 0x33, 0xdf, 0x99, 0xf9, 0x66, 0x8e, 0x61, 0x67, 0x46, 0x85,
+ 0x30, 0xa6, 0x8c, 0x4f, 0xbb, 0xae, 0xe7, 0xf8, 0x0e, 0xd6, 0x52, 0x83, 0xee, 0x4e, 0xb4, 0xdf,
+ 0x0b, 0xf0, 0x68, 0x34, 0x9f, 0x08, 0xd3, 0x63, 0x13, 0xea, 0x5d, 0x86, 0x7f, 0x51, 0x7c, 0x01,
+ 0x05, 0xc6, 0x99, 0xdf, 0x92, 0xda, 0x52, 0x47, 0xe9, 0x1d, 0x74, 0xb3, 0x2e, 0xdd, 0x15, 0x78,
+ 0x77, 0xc0, 0x99, 0x1f, 0xff, 0x26, 0xa1, 0x23, 0x3e, 0x87, 0x6d, 0xc3, 0xbc, 0x69, 0xc9, 0xa1,
+ 0xff, 0xd7, 0xf7, 0xf9, 0x9f, 0x98, 0x37, 0x89, 0x7b, 0xe0, 0x86, 0x9f, 0x40, 0x85, 0x09, 0xdd,
+ 0xb4, 0x1d, 0x41, 0x5b, 0xdb, 0x6d, 0xa9, 0x53, 0x21, 0x65, 0x26, 0xfa, 0xc1, 0x51, 0xfd, 0x5b,
+ 0x06, 0x25, 0x93, 0x0e, 0xf7, 0xa0, 0xca, 0x8d, 0x19, 0x15, 0xae, 0x61, 0xd2, 0x90, 0x6e, 0x95,
+ 0x2c, 0x0c, 0xf8, 0x31, 0x14, 0x7d, 0xc7, 0x65, 0x66, 0x48, 0xa4, 0x4a, 0xa2, 0x43, 0xe0, 0xe3,
+ 0x1a, 0x9e, 0xcf, 0x7c, 0xe6, 0xf0, 0x30, 0x7e, 0x91, 0x2c, 0x0c, 0xa8, 0x43, 0x5d, 0xf8, 0x86,
+ 0xe7, 0x0f, 0x1d, 0x11, 0x21, 0x0a, 0x6d, 0xa9, 0xd3, 0xe8, 0x7d, 0xff, 0x3f, 0x8a, 0xd0, 0x1d,
+ 0x65, 0x03, 0x90, 0x7c, 0x3c, 0x6c, 0x83, 0xe2, 0xb3, 0x19, 0x15, 0xbe, 0x31, 0x73, 0xaf, 0x44,
+ 0xab, 0xd8, 0x96, 0x3a, 0xdb, 0x24, 0x6b, 0xc2, 0x67, 0x50, 0x17, 0x69, 0x7c, 0x9d, 0x59, 0xad,
+ 0x52, 0x48, 0xbf, 0xb6, 0x30, 0x0e, 0x2c, 0xed, 0x08, 0xea, 0xb9, 0x34, 0x08, 0x50, 0xba, 0x38,
+ 0x19, 0x9f, 0x8d, 0xc6, 0xcd, 0x2d, 0xac, 0x41, 0xe5, 0xec, 0x84, 0x5c, 0x0c, 0x82, 0x93, 0x84,
+ 0x75, 0xa8, 0x8e, 0x07, 0x97, 0x67, 0xa3, 0xf1, 0xc9, 0xe5, 0xb0, 0x29, 0xab, 0x07, 0x00, 0x8b,
+ 0x8a, 0xe3, 0x3e, 0x40, 0x74, 0x33, 0x1a, 0x64, 0x92, 0x42, 0x36, 0xd5, 0xd8, 0x32, 0xb0, 0xb4,
+ 0x7f, 0x25, 0x28, 0x27, 0xd0, 0x2f, 0xa0, 0x4e, 0xdf, 0x50, 0xee, 0xeb, 0x01, 0x59, 0x9d, 0x8b,
+ 0x08, 0x7d, 0x2a, 0x1f, 0x4a, 0x44, 0x09, 0xff, 0x18, 0xb3, 0x19, 0xbd, 0x12, 0xd8, 0x84, 0xed,
+ 0x1b, 0x7a, 0x17, 0x16, 0xbd, 0x46, 0x82, 0x9f, 0x41, 0x23, 0xde, 0x18, 0xf6, 0x3c, 0x6a, 0x67,
+ 0x8d, 0x44, 0x07, 0x7c, 0x0e, 0xe5, 0x6b, 0x6a, 0x58, 0xd4, 0x13, 0xad, 0x42, 0x7b, 0xbb, 0xa3,
+ 0xf4, 0xb4, 0x7c, 0x91, 0x93, 0x72, 0x9e, 0x47, 0xa0, 0x33, 0xee, 0x7b, 0x77, 0x24, 0x71, 0xc9,
+ 0xa9, 0xa4, 0x98, 0x57, 0xc9, 0x31, 0xd4, 0xb2, 0x3e, 0x09, 0xa1, 0x48, 0x1f, 0x79, 0x42, 0x72,
+ 0x86, 0xd0, 0xb1, 0x7c, 0x24, 0x69, 0xc7, 0x50, 0x3f, 0xf5, 0x9c, 0x9b, 0xc5, 0x30, 0x7c, 0x05,
+ 0x05, 0xcb, 0xf0, 0x8d, 0x78, 0x18, 0x76, 0xd7, 0x52, 0x24, 0x21, 0x44, 0xfb, 0x47, 0x82, 0xc6,
+ 0x70, 0x3e, 0xb1, 0x99, 0xb8, 0x26, 0xf4, 0xd7, 0x39, 0x15, 0xc1, 0x24, 0x64, 0x47, 0xa9, 0x93,
+ 0xf7, 0xce, 0x63, 0xd7, 0xcc, 0x51, 0x92, 0x5b, 0xbe, 0x37, 0xb7, 0xaa, 0x7f, 0xe0, 0xc1, 0xd0,
+ 0xfe, 0x90, 0x61, 0x27, 0x25, 0x2c, 0x5c, 0x87, 0x0b, 0x8a, 0x7d, 0x28, 0x99, 0x0e, 0x7f, 0xcd,
+ 0xa6, 0xeb, 0x57, 0xc5, 0x12, 0xbc, 0xdb, 0x0f, 0xb1, 0x09, 0xef, 0xd8, 0x15, 0x07, 0x50, 0xf1,
+ 0xa8, 0xc5, 0x3c, 0x6a, 0xfa, 0xf1, 0x45, 0xbf, 0xdd, 0x1c, 0x86, 0xc4, 0xe8, 0x24, 0x50, 0xea,
+ 0x8e, 0x4f, 0xa0, 0x9a, 0x68, 0xc2, 0x8a, 0x57, 0x47, 0x25, 0x16, 0x85, 0xa5, 0x1e, 0x41, 0x3d,
+ 0x47, 0x00, 0xbf, 0x84, 0x9d, 0xf4, 0x7a, 0xba, 0xe9, 0xcc, 0x79, 0xd4, 0xa6, 0x22, 0x69, 0xa4,
+ 0xe6, 0x7e, 0x60, 0x55, 0x0f, 0x61, 0x67, 0x29, 0x67, 0x30, 0x36, 0x9c, 0xde, 0xea, 0x93, 0x50,
+ 0x2a, 0x69, 0x81, 0xe9, 0x6d, 0xa4, 0x1d, 0xed, 0x1c, 0xf0, 0x07, 0x6a, 0x53, 0x9f, 0x8e, 0x83,
+ 0xca, 0x26, 0x62, 0x78, 0x8f, 0xa6, 0x68, 0xbb, 0xf0, 0x51, 0x2e, 0x52, 0x54, 0x03, 0xed, 0x37,
+ 0x09, 0x76, 0xa3, 0xdb, 0xcc, 0xbd, 0x07, 0x27, 0xc1, 0x97, 0x50, 0x37, 0xe3, 0x60, 0x46, 0xda,
+ 0x7d, 0xa5, 0xd7, 0xce, 0xf7, 0x21, 0x4c, 0xd3, 0xcf, 0xe2, 0x48, 0xde, 0x4d, 0x6b, 0xc1, 0xe3,
+ 0x65, 0x52, 0x31, 0x5f, 0x02, 0x7b, 0x3f, 0x52, 0x7f, 0x4d, 0x84, 0x07, 0x94, 0x66, 0x0a, 0xfb,
+ 0x6f, 0x89, 0x19, 0xcb, 0x73, 0xe5, 0x5a, 0xd2, 0xfb, 0x5d, 0xcb, 0x84, 0x47, 0x2f, 0x19, 0xb7,
+ 0xa2, 0xde, 0x3e, 0xa4, 0xce, 0x2a, 0x54, 0x5c, 0xc3, 0xcb, 0x0e, 0x58, 0x7a, 0xd6, 0xbe, 0x01,
+ 0xcc, 0x26, 0x89, 0xaf, 0xf0, 0x18, 0x4a, 0x39, 0x8d, 0xc5, 0x27, 0xed, 0x2f, 0x19, 0x70, 0x95,
+ 0xf8, 0x3b, 0x4b, 0x1a, 0x3f, 0x05, 0x30, 0x1d, 0xdb, 0xa6, 0x66, 0xc8, 0x25, 0x22, 0x99, 0xb1,
+ 0x04, 0xaf, 0x94, 0x47, 0x5d, 0x9b, 0x99, 0x0b, 0x3d, 0x54, 0x49, 0xd6, 0x84, 0x4f, 0xa1, 0xc6,
+ 0x84, 0xee, 0x7b, 0x06, 0x17, 0x8c, 0x72, 0x3f, 0x7c, 0x27, 0x2b, 0x44, 0x61, 0x62, 0x9c, 0x98,
+ 0xf0, 0x15, 0x28, 0x51, 0x5a, 0x87, 0x33, 0x3e, 0x0d, 0xb7, 0x74, 0x63, 0x79, 0xb8, 0x57, 0x2f,
+ 0xd1, 0x1d, 0x26, 0x54, 0x19, 0x9f, 0x92, 0x6c, 0x04, 0xed, 0x05, 0xd4, 0xb2, 0x7f, 0x22, 0x42,
+ 0xe3, 0xca, 0xe1, 0x57, 0x73, 0xdb, 0xfe, 0x89, 0xde, 0x9d, 0x1b, 0xe2, 0xba, 0xb9, 0x85, 0x0a,
+ 0x94, 0x93, 0x83, 0x84, 0x0d, 0x00, 0xe2, 0xcc, 0xb9, 0x45, 0x9c, 0x09, 0xe3, 0x4d, 0xb9, 0xf7,
+ 0x67, 0x01, 0x9a, 0x23, 0x6a, 0xdc, 0x52, 0x6a, 0x5d, 0x26, 0x2c, 0xf0, 0x15, 0x54, 0xd3, 0xf7,
+ 0x1c, 0x3f, 0xbb, 0xe7, 0xa1, 0x57, 0x9f, 0xe4, 0x01, 0xb9, 0xc7, 0x42, 0xdb, 0xea, 0x48, 0x87,
+ 0x12, 0x5e, 0x40, 0x39, 0xde, 0x59, 0xb8, 0xb7, 0x69, 0xe3, 0xab, 0xfb, 0x1b, 0x17, 0x5d, 0x1c,
+ 0x6d, 0x0c, 0x4a, 0x66, 0x03, 0xe0, 0x92, 0x7a, 0x57, 0xd7, 0x8c, 0xfa, 0x74, 0x03, 0x22, 0x89,
+ 0x8c, 0xbf, 0x40, 0x23, 0x3f, 0xaa, 0xf8, 0x2c, 0xef, 0xb6, 0x76, 0xbb, 0xa8, 0x9f, 0x6f, 0x06,
+ 0xa5, 0xe1, 0x3d, 0xd8, 0x5d, 0x3b, 0x9b, 0xb8, 0xf4, 0x35, 0xb8, 0x69, 0x29, 0xa8, 0x07, 0xef,
+ 0x84, 0x4d, 0x73, 0xfe, 0x0c, 0xb0, 0x98, 0xa0, 0xe5, 0x46, 0xae, 0x0c, 0xb0, 0xda, 0x7e, 0x3b,
+ 0x20, 0x09, 0x79, 0xaa, 0x41, 0x53, 0x44, 0x72, 0x79, 0x2d, 0xba, 0xa6, 0x1d, 0xa8, 0xfa, 0xb4,
+ 0x91, 0x2a, 0x67, 0x18, 0x7c, 0x51, 0x4f, 0x4a, 0xe1, 0x87, 0xf5, 0x77, 0xff, 0x05, 0x00, 0x00,
+ 0xff, 0xff, 0x7f, 0x62, 0xba, 0x48, 0x6b, 0x0b, 0x00, 0x00,
}
diff --git a/weed/pb/volume_server_pb/volume_server.pb.go b/weed/pb/volume_server_pb/volume_server.pb.go
index 8b02d2bba..ca96b8d20 100644
--- a/weed/pb/volume_server_pb/volume_server.pb.go
+++ b/weed/pb/volume_server_pb/volume_server.pb.go
@@ -1051,12 +1051,10 @@ func (m *VolumeEcShardsGenerateRequest) GetCollection() string {
type VolumeEcShardsGenerateResponse struct {
}
-func (m *VolumeEcShardsGenerateResponse) Reset() { *m = VolumeEcShardsGenerateResponse{} }
-func (m *VolumeEcShardsGenerateResponse) String() string { return proto.CompactTextString(m) }
-func (*VolumeEcShardsGenerateResponse) ProtoMessage() {}
-func (*VolumeEcShardsGenerateResponse) Descriptor() ([]byte, []int) {
- return fileDescriptor0, []int{41}
-}
+func (m *VolumeEcShardsGenerateResponse) Reset() { *m = VolumeEcShardsGenerateResponse{} }
+func (m *VolumeEcShardsGenerateResponse) String() string { return proto.CompactTextString(m) }
+func (*VolumeEcShardsGenerateResponse) ProtoMessage() {}
+func (*VolumeEcShardsGenerateResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{41} }
type VolumeEcShardsRebuildRequest struct {
VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId" json:"volume_id,omitempty"`
@@ -1429,12 +1427,10 @@ func (m *VolumeEcShardsToVolumeRequest) GetCollection() string {
type VolumeEcShardsToVolumeResponse struct {
}
-func (m *VolumeEcShardsToVolumeResponse) Reset() { *m = VolumeEcShardsToVolumeResponse{} }
-func (m *VolumeEcShardsToVolumeResponse) String() string { return proto.CompactTextString(m) }
-func (*VolumeEcShardsToVolumeResponse) ProtoMessage() {}
-func (*VolumeEcShardsToVolumeResponse) Descriptor() ([]byte, []int) {
- return fileDescriptor0, []int{57}
-}
+func (m *VolumeEcShardsToVolumeResponse) Reset() { *m = VolumeEcShardsToVolumeResponse{} }
+func (m *VolumeEcShardsToVolumeResponse) String() string { return proto.CompactTextString(m) }
+func (*VolumeEcShardsToVolumeResponse) ProtoMessage() {}
+func (*VolumeEcShardsToVolumeResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{57} }
type ReadVolumeFileStatusRequest struct {
VolumeId uint32 `protobuf:"varint,1,opt,name=volume_id,json=volumeId" json:"volume_id,omitempty"`
@@ -2101,10 +2097,8 @@ type QueryRequest_InputSerialization_JSONInput struct {
func (m *QueryRequest_InputSerialization_JSONInput) Reset() {
*m = QueryRequest_InputSerialization_JSONInput{}
}
-func (m *QueryRequest_InputSerialization_JSONInput) String() string {
- return proto.CompactTextString(m)
-}
-func (*QueryRequest_InputSerialization_JSONInput) ProtoMessage() {}
+func (m *QueryRequest_InputSerialization_JSONInput) String() string { return proto.CompactTextString(m) }
+func (*QueryRequest_InputSerialization_JSONInput) ProtoMessage() {}
func (*QueryRequest_InputSerialization_JSONInput) Descriptor() ([]byte, []int) {
return fileDescriptor0, []int{70, 1, 1}
}
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go
index a790e4601..901f798f0 100644
--- a/weed/server/filer_grpc_server.go
+++ b/weed/server/filer_grpc_server.go
@@ -381,3 +381,75 @@ func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb.
return t, nil
}
+
+func (fs *FilerServer) KeepConnected(stream filer_pb.SeaweedFiler_KeepConnectedServer) error {
+
+ req, err := stream.Recv()
+ if err != nil {
+ return err
+ }
+
+ clientName := fmt.Sprintf("%s:%d", req.Name, req.GrpcPort)
+ m := make(map[string]bool)
+ for _, tp := range req.Resources {
+ m[tp] = true
+ }
+ fs.brokersLock.Lock()
+ fs.brokers[clientName] = m
+ glog.V(0).Infof("+ broker %v", clientName)
+ fs.brokersLock.Unlock()
+
+ defer func() {
+ fs.brokersLock.Lock()
+ delete(fs.brokers, clientName)
+ glog.V(0).Infof("- broker %v: %v", clientName, err)
+ fs.brokersLock.Unlock()
+ }()
+
+ for {
+ if err := stream.Send(&filer_pb.KeepConnectedResponse{}); err != nil {
+ glog.V(0).Infof("send broker %v: %+v", clientName, err)
+ return err
+ }
+ // println("replied")
+
+ if _, err := stream.Recv(); err != nil {
+ glog.V(0).Infof("recv broker %v: %v", clientName, err)
+ return err
+ }
+ // println("received")
+ }
+
+}
+
+func (fs *FilerServer) LocateBroker(ctx context.Context, req *filer_pb.LocateBrokerRequest) (resp *filer_pb.LocateBrokerResponse, err error) {
+
+ resp = &filer_pb.LocateBrokerResponse{}
+
+ fs.brokersLock.Lock()
+ defer fs.brokersLock.Unlock()
+
+ var localBrokers []*filer_pb.LocateBrokerResponse_Resource
+
+ for b, m := range fs.brokers {
+ if _, found := m[req.Resource]; found {
+ resp.Found = true
+ resp.Resources = []*filer_pb.LocateBrokerResponse_Resource{
+ {
+ GrpcAddresses: b,
+ ResourceCount: int32(len(m)),
+ },
+ }
+ return
+ }
+ localBrokers = append(localBrokers, &filer_pb.LocateBrokerResponse_Resource{
+ GrpcAddresses: b,
+ ResourceCount: int32(len(m)),
+ })
+ }
+
+ resp.Resources = localBrokers
+
+ return resp, nil
+
+}
diff --git a/weed/server/filer_grpc_server_listen.go b/weed/server/filer_grpc_server_listen.go
index e45068f39..848a1fc3a 100644
--- a/weed/server/filer_grpc_server_listen.go
+++ b/weed/server/filer_grpc_server_listen.go
@@ -82,7 +82,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
lastReadTime = time.Unix(0, processedTsNs)
}
- _, err := fs.filer.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool {
+ err := fs.filer.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool {
fs.listenersLock.Lock()
fs.listenersCond.Wait()
fs.listenersLock.Unlock()
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index 956684d46..10b607dfe 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -8,9 +8,10 @@ import (
"sync"
"time"
- "github.com/chrislusf/seaweedfs/weed/util/grace"
"google.golang.org/grpc"
+ "github.com/chrislusf/seaweedfs/weed/util/grace"
+
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
@@ -62,6 +63,9 @@ type FilerServer struct {
// notifying clients
listenersLock sync.Mutex
listenersCond *sync.Cond
+
+ brokers map[string]map[string]bool
+ brokersLock sync.Mutex
}
func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) {
@@ -69,6 +73,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
fs = &FilerServer{
option: option,
grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
+ brokers: make(map[string]map[string]bool),
}
fs.listenersCond = sync.NewCond(&fs.listenersLock)
diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go
index 6ba7f3737..b02c45b52 100644
--- a/weed/util/log_buffer/log_buffer.go
+++ b/weed/util/log_buffer/log_buffer.go
@@ -34,6 +34,7 @@ type LogBuffer struct {
notifyFn func()
isStopping bool
flushChan chan *dataToFlush
+ lastTsNs int64
sync.RWMutex
}
@@ -64,8 +65,15 @@ func (m *LogBuffer) AddToBuffer(partitionKey, data []byte) {
// need to put the timestamp inside the lock
ts := time.Now()
+ tsNs := ts.UnixNano()
+ if m.lastTsNs >= tsNs {
+ // this is unlikely to happen, but just in case
+ tsNs = m.lastTsNs + 1
+ ts = time.Unix(0, tsNs)
+ }
+ m.lastTsNs = tsNs
logEntry := &filer_pb.LogEntry{
- TsNs: ts.UnixNano(),
+ TsNs: tsNs,
PartitionKeyHash: util.HashToInt32(partitionKey),
Data: data,
}
@@ -98,13 +106,14 @@ func (m *LogBuffer) AddToBuffer(partitionKey, data []byte) {
}
func (m *LogBuffer) Shutdown() {
+ m.Lock()
+ defer m.Unlock()
+
if m.isStopping {
return
}
m.isStopping = true
- m.Lock()
toFlush := m.copyToFlush()
- m.Unlock()
m.flushChan <- toFlush
close(m.flushChan)
}
@@ -123,10 +132,14 @@ func (m *LogBuffer) loopInterval() {
for !m.isStopping {
time.Sleep(m.flushInterval)
m.Lock()
+ if m.isStopping {
+ m.Unlock()
+ return
+ }
// println("loop interval")
toFlush := m.copyToFlush()
- m.Unlock()
m.flushChan <- toFlush
+ m.Unlock()
}
}
diff --git a/weed/util/log_buffer/log_read.go b/weed/util/log_buffer/log_read.go
index 6339d9d77..2b73a8064 100644
--- a/weed/util/log_buffer/log_read.go
+++ b/weed/util/log_buffer/log_read.go
@@ -12,8 +12,9 @@ import (
)
func (logBuffer *LogBuffer) LoopProcessLogData(
- startTreadTime time.Time, waitForDataFn func() bool,
- eachLogDataFn func(logEntry *filer_pb.LogEntry) error) (processed int64, err error) {
+ startTreadTime time.Time,
+ waitForDataFn func() bool,
+ eachLogDataFn func(logEntry *filer_pb.LogEntry) error) (err error) {
// loop through all messages
var bytesBuf *bytes.Buffer
lastReadTime := startTreadTime
@@ -29,6 +30,7 @@ func (logBuffer *LogBuffer) LoopProcessLogData(
logBuffer.ReleaseMeory(bytesBuf)
}
bytesBuf = logBuffer.ReadFromBuffer(lastReadTime)
+ // fmt.Printf("ReadFromBuffer by %v\n", lastReadTime)
if bytesBuf == nil {
if waitForDataFn() {
continue
@@ -38,6 +40,7 @@ func (logBuffer *LogBuffer) LoopProcessLogData(
}
buf := bytesBuf.Bytes()
+ // fmt.Printf("ReadFromBuffer by %v size %d\n", lastReadTime, len(buf))
batchSize := 0
var startReadTime time.Time
@@ -66,7 +69,6 @@ func (logBuffer *LogBuffer) LoopProcessLogData(
pos += 4 + int(size)
batchSize++
- processed++
}
// fmt.Printf("sent message ts[%d,%d] size %d\n", startReadTime.UnixNano(), lastReadTime.UnixNano(), batchSize)
diff --git a/weed/util/log_buffer/sealed_buffer.go b/weed/util/log_buffer/sealed_buffer.go
index cade45dd1..d133cf8d3 100644
--- a/weed/util/log_buffer/sealed_buffer.go
+++ b/weed/util/log_buffer/sealed_buffer.go
@@ -59,4 +59,4 @@ func (mb *MemBuffer) locateByTs(lastReadTime time.Time) (pos int) {
func (mb *MemBuffer) String() string {
return fmt.Sprintf("[%v,%v] bytes:%d", mb.startTime, mb.stopTime, mb.size)
-} \ No newline at end of file
+}