aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/command/mq_broker.go6
-rw-r--r--weed/mq/broker/broker_append.go (renamed from weed/messaging/broker/broker_append.go)6
-rw-r--r--weed/mq/broker/broker_grpc_server.go (renamed from weed/messaging/broker/broker_grpc_server.go)10
-rw-r--r--weed/mq/broker/broker_grpc_server_discovery.go (renamed from weed/messaging/broker/broker_grpc_server_discovery.go)6
-rw-r--r--weed/mq/broker/broker_grpc_server_publish.go (renamed from weed/messaging/broker/broker_grpc_server_publish.go)10
-rw-r--r--weed/mq/broker/broker_grpc_server_subscribe.go (renamed from weed/messaging/broker/broker_grpc_server_subscribe.go)20
-rw-r--r--weed/mq/broker/broker_server.go (renamed from weed/messaging/broker/broker_server.go)4
-rw-r--r--weed/mq/broker/consistent_distribution.go (renamed from weed/messaging/broker/consistent_distribution.go)0
-rw-r--r--weed/mq/broker/consistent_distribution_test.go (renamed from weed/messaging/broker/consistent_distribution_test.go)0
-rw-r--r--weed/mq/broker/topic_manager.go (renamed from weed/messaging/broker/topic_manager.go)6
-rw-r--r--weed/mq/msgclient/chan_config.go (renamed from weed/messaging/msgclient/chan_config.go)0
-rw-r--r--weed/mq/msgclient/chan_pub.go (renamed from weed/messaging/msgclient/chan_pub.go)14
-rw-r--r--weed/mq/msgclient/chan_sub.go (renamed from weed/messaging/msgclient/chan_sub.go)8
-rw-r--r--weed/mq/msgclient/client.go (renamed from weed/messaging/msgclient/client.go)8
-rw-r--r--weed/mq/msgclient/config.go (renamed from weed/messaging/msgclient/config.go)18
-rw-r--r--weed/mq/msgclient/publisher.go (renamed from weed/messaging/msgclient/publisher.go)28
-rw-r--r--weed/mq/msgclient/subscriber.go (renamed from weed/messaging/msgclient/subscriber.go)26
-rw-r--r--weed/pb/Makefile2
-rw-r--r--weed/pb/grpc_client_server.go6
-rw-r--r--weed/pb/messaging.proto2
-rw-r--r--weed/pb/mount_pb/mount.pb.go8
-rw-r--r--weed/pb/mount_pb/mount_grpc.pb.go6
-rw-r--r--weed/pb/mq_pb/messaging.pb.go (renamed from weed/pb/messaging_pb/messaging.pb.go)100
-rw-r--r--weed/pb/mq_pb/messaging_grpc.pb.go (renamed from weed/pb/messaging_pb/messaging_grpc.pb.go)26
-rw-r--r--weed/pb/s3_pb/s3.pb.go24
-rw-r--r--weed/pb/s3_pb/s3_grpc.pb.go6
-rw-r--r--weed/util/log_buffer/log_read.go2
27 files changed, 176 insertions, 176 deletions
diff --git a/weed/command/mq_broker.go b/weed/command/mq_broker.go
index a5a6e3566..da7f59596 100644
--- a/weed/command/mq_broker.go
+++ b/weed/command/mq_broker.go
@@ -10,10 +10,10 @@ import (
"github.com/chrislusf/seaweedfs/weed/util/grace"
"github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/messaging/broker"
+ "github.com/chrislusf/seaweedfs/weed/mq/broker"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -100,7 +100,7 @@ func (mqBrokerOpt *MessageQueueBrokerOptions) startQueueServer() bool {
glog.Fatalf("failed to listen on grpc port %d: %v", *mqBrokerOpt.port, err)
}
grpcS := pb.NewGrpcServer(security.LoadServerTLS(util.GetViper(), "grpc.msg_broker"))
- messaging_pb.RegisterSeaweedMessagingServer(grpcS, qs)
+ mq_pb.RegisterSeaweedMessagingServer(grpcS, qs)
reflection.Register(grpcS)
grpcS.Serve(grpcL)
diff --git a/weed/messaging/broker/broker_append.go b/weed/mq/broker/broker_append.go
index 9a31a8ac0..4f3af0ff8 100644
--- a/weed/messaging/broker/broker_append.go
+++ b/weed/mq/broker/broker_append.go
@@ -10,11 +10,11 @@ import (
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
"github.com/chrislusf/seaweedfs/weed/util"
)
-func (broker *MessageBroker) appendToFile(targetFile string, topicConfig *messaging_pb.TopicConfiguration, data []byte) error {
+func (broker *MessageBroker) appendToFile(targetFile string, topicConfig *mq_pb.TopicConfiguration, data []byte) error {
assignResult, uploadResult, err2 := broker.assignAndUpload(topicConfig, data)
if err2 != nil {
@@ -46,7 +46,7 @@ func (broker *MessageBroker) appendToFile(targetFile string, topicConfig *messag
return nil
}
-func (broker *MessageBroker) assignAndUpload(topicConfig *messaging_pb.TopicConfiguration, data []byte) (*operation.AssignResult, *operation.UploadResult, error) {
+func (broker *MessageBroker) assignAndUpload(topicConfig *mq_pb.TopicConfiguration, data []byte) (*operation.AssignResult, *operation.UploadResult, error) {
var assignResult = &operation.AssignResult{}
diff --git a/weed/messaging/broker/broker_grpc_server.go b/weed/mq/broker/broker_grpc_server.go
index ba141fdd0..9aa9b1908 100644
--- a/weed/messaging/broker/broker_grpc_server.go
+++ b/weed/mq/broker/broker_grpc_server.go
@@ -6,15 +6,15 @@ import (
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
)
-func (broker *MessageBroker) ConfigureTopic(c context.Context, request *messaging_pb.ConfigureTopicRequest) (*messaging_pb.ConfigureTopicResponse, error) {
+func (broker *MessageBroker) ConfigureTopic(c context.Context, request *mq_pb.ConfigureTopicRequest) (*mq_pb.ConfigureTopicResponse, error) {
panic("implement me")
}
-func (broker *MessageBroker) DeleteTopic(c context.Context, request *messaging_pb.DeleteTopicRequest) (*messaging_pb.DeleteTopicResponse, error) {
- resp := &messaging_pb.DeleteTopicResponse{}
+func (broker *MessageBroker) DeleteTopic(c context.Context, request *mq_pb.DeleteTopicRequest) (*mq_pb.DeleteTopicResponse, error) {
+ resp := &mq_pb.DeleteTopicResponse{}
dir, entry := genTopicDirEntry(request.Namespace, request.Topic)
if exists, err := filer_pb.Exists(broker, dir, entry, true); err != nil {
return nil, err
@@ -24,7 +24,7 @@ func (broker *MessageBroker) DeleteTopic(c context.Context, request *messaging_p
return resp, nil
}
-func (broker *MessageBroker) GetTopicConfiguration(c context.Context, request *messaging_pb.GetTopicConfigurationRequest) (*messaging_pb.GetTopicConfigurationResponse, error) {
+func (broker *MessageBroker) GetTopicConfiguration(c context.Context, request *mq_pb.GetTopicConfigurationRequest) (*mq_pb.GetTopicConfigurationResponse, error) {
panic("implement me")
}
diff --git a/weed/messaging/broker/broker_grpc_server_discovery.go b/weed/mq/broker/broker_grpc_server_discovery.go
index 5cd8edd33..0c8d70e68 100644
--- a/weed/messaging/broker/broker_grpc_server_discovery.go
+++ b/weed/mq/broker/broker_grpc_server_discovery.go
@@ -10,7 +10,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
- "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
)
/*
@@ -26,9 +26,9 @@ If one of the pub or sub connects very late, and the system topo changed quite a
*/
-func (broker *MessageBroker) FindBroker(c context.Context, request *messaging_pb.FindBrokerRequest) (*messaging_pb.FindBrokerResponse, error) {
+func (broker *MessageBroker) FindBroker(c context.Context, request *mq_pb.FindBrokerRequest) (*mq_pb.FindBrokerResponse, error) {
- t := &messaging_pb.FindBrokerResponse{}
+ t := &mq_pb.FindBrokerResponse{}
var peers []string
targetTopicPartition := fmt.Sprintf(TopicPartitionFmt, request.Namespace, request.Topic, request.Parition)
diff --git a/weed/messaging/broker/broker_grpc_server_publish.go b/weed/mq/broker/broker_grpc_server_publish.go
index 6e6b723d1..4ff9ad809 100644
--- a/weed/messaging/broker/broker_grpc_server_publish.go
+++ b/weed/mq/broker/broker_grpc_server_publish.go
@@ -10,10 +10,10 @@ import (
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
)
-func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_PublishServer) error {
+func (broker *MessageBroker) Publish(stream mq_pb.SeaweedMessaging_PublishServer) error {
// process initial request
in, err := stream.Recv()
@@ -25,12 +25,12 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis
}
// TODO look it up
- topicConfig := &messaging_pb.TopicConfiguration{
+ topicConfig := &mq_pb.TopicConfiguration{
// IsTransient: true,
}
// send init response
- initResponse := &messaging_pb.PublishResponse{
+ initResponse := &mq_pb.PublishResponse{
Config: nil,
Redirect: nil,
}
@@ -104,7 +104,7 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis
// send the close ack
// println("server send ack closing")
- if err := stream.Send(&messaging_pb.PublishResponse{IsClosed: true}); err != nil {
+ if err := stream.Send(&mq_pb.PublishResponse{IsClosed: true}); err != nil {
glog.V(0).Infof("err sending close response: %v", err)
}
return nil
diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/mq/broker/broker_grpc_server_subscribe.go
index 20d529239..1a9c62d75 100644
--- a/weed/messaging/broker/broker_grpc_server_subscribe.go
+++ b/weed/mq/broker/broker_grpc_server_subscribe.go
@@ -13,10 +13,10 @@ import (
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
)
-func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_SubscribeServer) error {
+func (broker *MessageBroker) Subscribe(stream mq_pb.SeaweedMessaging_SubscribeServer) error {
// process initial request
in, err := stream.Recv()
@@ -32,7 +32,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
subscriberId := in.Init.SubscriberId
// TODO look it up
- topicConfig := &messaging_pb.TopicConfiguration{
+ topicConfig := &mq_pb.TopicConfiguration{
// IsTransient: true,
}
@@ -63,17 +63,17 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
lastReadTime := time.Now()
switch in.Init.StartPosition {
- case messaging_pb.SubscriberMessage_InitMessage_TIMESTAMP:
+ case mq_pb.SubscriberMessage_InitMessage_TIMESTAMP:
lastReadTime = time.Unix(0, in.Init.TimestampNs)
- case messaging_pb.SubscriberMessage_InitMessage_LATEST:
- case messaging_pb.SubscriberMessage_InitMessage_EARLIEST:
+ case mq_pb.SubscriberMessage_InitMessage_LATEST:
+ case mq_pb.SubscriberMessage_InitMessage_EARLIEST:
lastReadTime = time.Unix(0, 0)
}
// how to process each message
// an error returned will end the subscription
- eachMessageFn := func(m *messaging_pb.Message) error {
- err := stream.Send(&messaging_pb.BrokerMessage{
+ eachMessageFn := func(m *mq_pb.Message) error {
+ err := stream.Send(&mq_pb.BrokerMessage{
Data: m,
})
if err != nil {
@@ -83,9 +83,9 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
}
eachLogEntryFn := func(logEntry *filer_pb.LogEntry) error {
- m := &messaging_pb.Message{}
+ m := &mq_pb.Message{}
if err = proto.Unmarshal(logEntry.Data, m); err != nil {
- glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err)
+ glog.Errorf("unexpected unmarshal mq_pb.Message: %v", err)
return err
}
// fmt.Printf("sending : %d bytes ts %d\n", len(m.Value), logEntry.TsNs)
diff --git a/weed/messaging/broker/broker_server.go b/weed/mq/broker/broker_server.go
index acf2d6d34..5aa5285c9 100644
--- a/weed/messaging/broker/broker_server.go
+++ b/weed/mq/broker/broker_server.go
@@ -2,7 +2,7 @@ package broker
import (
"context"
- "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
"time"
"google.golang.org/grpc"
@@ -23,7 +23,7 @@ type MessageBrokerOption struct {
}
type MessageBroker struct {
- messaging_pb.UnimplementedSeaweedMessagingServer
+ mq_pb.UnimplementedSeaweedMessagingServer
option *MessageBrokerOption
grpcDialOption grpc.DialOption
topicManager *TopicManager
diff --git a/weed/messaging/broker/consistent_distribution.go b/weed/mq/broker/consistent_distribution.go
index 465a2a8f2..465a2a8f2 100644
--- a/weed/messaging/broker/consistent_distribution.go
+++ b/weed/mq/broker/consistent_distribution.go
diff --git a/weed/messaging/broker/consistent_distribution_test.go b/weed/mq/broker/consistent_distribution_test.go
index f58fe4e0e..f58fe4e0e 100644
--- a/weed/messaging/broker/consistent_distribution_test.go
+++ b/weed/mq/broker/consistent_distribution_test.go
diff --git a/weed/messaging/broker/topic_manager.go b/weed/mq/broker/topic_manager.go
index c303c29b3..1acf085fa 100644
--- a/weed/messaging/broker/topic_manager.go
+++ b/weed/mq/broker/topic_manager.go
@@ -7,7 +7,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
"github.com/chrislusf/seaweedfs/weed/util/log_buffer"
)
@@ -46,7 +46,7 @@ func NewTopicManager(messageBroker *MessageBroker) *TopicManager {
}
}
-func (tm *TopicManager) buildLogBuffer(tl *TopicControl, tp TopicPartition, topicConfig *messaging_pb.TopicConfiguration) *log_buffer.LogBuffer {
+func (tm *TopicManager) buildLogBuffer(tl *TopicControl, tp TopicPartition, topicConfig *mq_pb.TopicConfiguration) *log_buffer.LogBuffer {
flushFn := func(startTime, stopTime time.Time, buf []byte) {
@@ -75,7 +75,7 @@ func (tm *TopicManager) buildLogBuffer(tl *TopicControl, tp TopicPartition, topi
return logBuffer
}
-func (tm *TopicManager) RequestLock(partition TopicPartition, topicConfig *messaging_pb.TopicConfiguration, isPublisher bool) *TopicControl {
+func (tm *TopicManager) RequestLock(partition TopicPartition, topicConfig *mq_pb.TopicConfiguration, isPublisher bool) *TopicControl {
tm.Lock()
defer tm.Unlock()
diff --git a/weed/messaging/msgclient/chan_config.go b/weed/mq/msgclient/chan_config.go
index a75678815..a75678815 100644
--- a/weed/messaging/msgclient/chan_config.go
+++ b/weed/mq/msgclient/chan_config.go
diff --git a/weed/messaging/msgclient/chan_pub.go b/weed/mq/msgclient/chan_pub.go
index 9bc88f7c0..f4ffe832a 100644
--- a/weed/messaging/msgclient/chan_pub.go
+++ b/weed/mq/msgclient/chan_pub.go
@@ -8,12 +8,12 @@ import (
"google.golang.org/grpc"
- "github.com/chrislusf/seaweedfs/weed/messaging/broker"
- "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+ "github.com/chrislusf/seaweedfs/weed/mq/broker"
+ "github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
)
type PubChannel struct {
- client messaging_pb.SeaweedMessaging_PublishClient
+ client mq_pb.SeaweedMessaging_PublishClient
grpcConnection *grpc.ClientConn
md5hash hash.Hash
}
@@ -40,8 +40,8 @@ func (mc *MessagingClient) NewPubChannel(chanName string) (*PubChannel, error) {
}
func (pc *PubChannel) Publish(m []byte) error {
- err := pc.client.Send(&messaging_pb.PublishRequest{
- Data: &messaging_pb.Message{
+ err := pc.client.Send(&mq_pb.PublishRequest{
+ Data: &mq_pb.Message{
Value: m,
},
})
@@ -53,8 +53,8 @@ func (pc *PubChannel) Publish(m []byte) error {
func (pc *PubChannel) Close() error {
// println("send closing")
- if err := pc.client.Send(&messaging_pb.PublishRequest{
- Data: &messaging_pb.Message{
+ if err := pc.client.Send(&mq_pb.PublishRequest{
+ Data: &mq_pb.Message{
IsClose: true,
},
}); err != nil {
diff --git a/weed/messaging/msgclient/chan_sub.go b/weed/mq/msgclient/chan_sub.go
index 213ff4666..859b482ef 100644
--- a/weed/messaging/msgclient/chan_sub.go
+++ b/weed/mq/msgclient/chan_sub.go
@@ -8,13 +8,13 @@ import (
"log"
"time"
- "github.com/chrislusf/seaweedfs/weed/messaging/broker"
- "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+ "github.com/chrislusf/seaweedfs/weed/mq/broker"
+ "github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
)
type SubChannel struct {
ch chan []byte
- stream messaging_pb.SeaweedMessaging_SubscribeClient
+ stream mq_pb.SeaweedMessaging_SubscribeClient
md5hash hash.Hash
cancel context.CancelFunc
}
@@ -57,7 +57,7 @@ func (mc *MessagingClient) NewSubChannel(subscriberId, chanName string) (*SubCha
continue
}
if resp.Data.IsClose {
- t.stream.Send(&messaging_pb.SubscriberMessage{
+ t.stream.Send(&mq_pb.SubscriberMessage{
IsClose: true,
})
close(t.ch)
diff --git a/weed/messaging/msgclient/client.go b/weed/mq/msgclient/client.go
index 4d7ef2b8e..cc64f1acb 100644
--- a/weed/messaging/msgclient/client.go
+++ b/weed/mq/msgclient/client.go
@@ -7,9 +7,9 @@ import (
"google.golang.org/grpc"
- "github.com/chrislusf/seaweedfs/weed/messaging/broker"
+ "github.com/chrislusf/seaweedfs/weed/mq/broker"
"github.com/chrislusf/seaweedfs/weed/pb"
- "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -38,8 +38,8 @@ func (mc *MessagingClient) findBroker(tp broker.TopicPartition) (*grpc.ClientCon
}
defer grpcConnection.Close()
- resp, err := messaging_pb.NewSeaweedMessagingClient(grpcConnection).FindBroker(context.Background(),
- &messaging_pb.FindBrokerRequest{
+ resp, err := mq_pb.NewSeaweedMessagingClient(grpcConnection).FindBroker(context.Background(),
+ &mq_pb.FindBrokerRequest{
Namespace: tp.Namespace,
Topic: tp.Topic,
Parition: tp.Partition,
diff --git a/weed/messaging/msgclient/config.go b/weed/mq/msgclient/config.go
index 2b9eba1a8..263ee856e 100644
--- a/weed/messaging/msgclient/config.go
+++ b/weed/mq/msgclient/config.go
@@ -4,19 +4,19 @@ import (
"context"
"log"
- "github.com/chrislusf/seaweedfs/weed/messaging/broker"
+ "github.com/chrislusf/seaweedfs/weed/mq/broker"
"github.com/chrislusf/seaweedfs/weed/pb"
- "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
)
func (mc *MessagingClient) configureTopic(tp broker.TopicPartition) error {
- return mc.withAnyBroker(func(client messaging_pb.SeaweedMessagingClient) error {
+ return mc.withAnyBroker(func(client mq_pb.SeaweedMessagingClient) error {
_, err := client.ConfigureTopic(context.Background(),
- &messaging_pb.ConfigureTopicRequest{
+ &mq_pb.ConfigureTopicRequest{
Namespace: tp.Namespace,
Topic: tp.Topic,
- Configuration: &messaging_pb.TopicConfiguration{
+ Configuration: &mq_pb.TopicConfiguration{
PartitionCount: 0,
Collection: "",
Replication: "",
@@ -31,9 +31,9 @@ func (mc *MessagingClient) configureTopic(tp broker.TopicPartition) error {
func (mc *MessagingClient) DeleteTopic(namespace, topic string) error {
- return mc.withAnyBroker(func(client messaging_pb.SeaweedMessagingClient) error {
+ return mc.withAnyBroker(func(client mq_pb.SeaweedMessagingClient) error {
_, err := client.DeleteTopic(context.Background(),
- &messaging_pb.DeleteTopicRequest{
+ &mq_pb.DeleteTopicRequest{
Namespace: namespace,
Topic: topic,
})
@@ -41,7 +41,7 @@ func (mc *MessagingClient) DeleteTopic(namespace, topic string) error {
})
}
-func (mc *MessagingClient) withAnyBroker(fn func(client messaging_pb.SeaweedMessagingClient) error) error {
+func (mc *MessagingClient) withAnyBroker(fn func(client mq_pb.SeaweedMessagingClient) error) error {
var lastErr error
for _, broker := range mc.bootstrapBrokers {
@@ -52,7 +52,7 @@ func (mc *MessagingClient) withAnyBroker(fn func(client messaging_pb.SeaweedMess
}
defer grpcConnection.Close()
- err = fn(messaging_pb.NewSeaweedMessagingClient(grpcConnection))
+ err = fn(mq_pb.NewSeaweedMessagingClient(grpcConnection))
if err == nil {
return nil
}
diff --git a/weed/messaging/msgclient/publisher.go b/weed/mq/msgclient/publisher.go
index 1aa483ff8..823791d10 100644
--- a/weed/messaging/msgclient/publisher.go
+++ b/weed/mq/msgclient/publisher.go
@@ -6,23 +6,23 @@ import (
"github.com/OneOfOne/xxhash"
"google.golang.org/grpc"
- "github.com/chrislusf/seaweedfs/weed/messaging/broker"
- "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+ "github.com/chrislusf/seaweedfs/weed/mq/broker"
+ "github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
)
type Publisher struct {
- publishClients []messaging_pb.SeaweedMessaging_PublishClient
- topicConfiguration *messaging_pb.TopicConfiguration
+ publishClients []mq_pb.SeaweedMessaging_PublishClient
+ topicConfiguration *mq_pb.TopicConfiguration
messageCount uint64
publisherId string
}
func (mc *MessagingClient) NewPublisher(publisherId, namespace, topic string) (*Publisher, error) {
// read topic configuration
- topicConfiguration := &messaging_pb.TopicConfiguration{
+ topicConfiguration := &mq_pb.TopicConfiguration{
PartitionCount: 4,
}
- publishClients := make([]messaging_pb.SeaweedMessaging_PublishClient, topicConfiguration.PartitionCount)
+ publishClients := make([]mq_pb.SeaweedMessaging_PublishClient, topicConfiguration.PartitionCount)
for i := 0; i < int(topicConfiguration.PartitionCount); i++ {
tp := broker.TopicPartition{
Namespace: namespace,
@@ -45,16 +45,16 @@ func (mc *MessagingClient) NewPublisher(publisherId, namespace, topic string) (*
}, nil
}
-func setupPublisherClient(grpcConnection *grpc.ClientConn, tp broker.TopicPartition) (messaging_pb.SeaweedMessaging_PublishClient, error) {
+func setupPublisherClient(grpcConnection *grpc.ClientConn, tp broker.TopicPartition) (mq_pb.SeaweedMessaging_PublishClient, error) {
- stream, err := messaging_pb.NewSeaweedMessagingClient(grpcConnection).Publish(context.Background())
+ stream, err := mq_pb.NewSeaweedMessagingClient(grpcConnection).Publish(context.Background())
if err != nil {
return nil, err
}
// send init message
- err = stream.Send(&messaging_pb.PublishRequest{
- Init: &messaging_pb.PublishRequest_InitMessage{
+ err = stream.Send(&mq_pb.PublishRequest{
+ Init: &mq_pb.PublishRequest_InitMessage{
Namespace: tp.Namespace,
Topic: tp.Topic,
Partition: tp.Partition,
@@ -95,14 +95,14 @@ func setupPublisherClient(grpcConnection *grpc.ClientConn, tp broker.TopicPartit
}
-func (p *Publisher) Publish(m *messaging_pb.Message) error {
+func (p *Publisher) Publish(m *mq_pb.Message) error {
hashValue := p.messageCount
p.messageCount++
- if p.topicConfiguration.Partitoning == messaging_pb.TopicConfiguration_NonNullKeyHash {
+ if p.topicConfiguration.Partitoning == mq_pb.TopicConfiguration_NonNullKeyHash {
if m.Key != nil {
hashValue = xxhash.Checksum64(m.Key)
}
- } else if p.topicConfiguration.Partitoning == messaging_pb.TopicConfiguration_KeyHash {
+ } else if p.topicConfiguration.Partitoning == mq_pb.TopicConfiguration_KeyHash {
hashValue = xxhash.Checksum64(m.Key)
} else {
// round robin
@@ -112,7 +112,7 @@ func (p *Publisher) Publish(m *messaging_pb.Message) error {
if idx < 0 {
idx += len(p.publishClients)
}
- return p.publishClients[idx].Send(&messaging_pb.PublishRequest{
+ return p.publishClients[idx].Send(&mq_pb.PublishRequest{
Data: m,
})
}
diff --git a/weed/messaging/msgclient/subscriber.go b/weed/mq/msgclient/subscriber.go
index 6c7dc1ab7..f3da40fb3 100644
--- a/weed/messaging/msgclient/subscriber.go
+++ b/weed/mq/msgclient/subscriber.go
@@ -6,23 +6,23 @@ import (
"sync"
"time"
- "github.com/chrislusf/seaweedfs/weed/messaging/broker"
- "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+ "github.com/chrislusf/seaweedfs/weed/mq/broker"
+ "github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc"
)
type Subscriber struct {
- subscriberClients []messaging_pb.SeaweedMessaging_SubscribeClient
+ subscriberClients []mq_pb.SeaweedMessaging_SubscribeClient
subscriberCancels []context.CancelFunc
subscriberId string
}
func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string, partitionId int, startTime time.Time) (*Subscriber, error) {
// read topic configuration
- topicConfiguration := &messaging_pb.TopicConfiguration{
+ topicConfiguration := &mq_pb.TopicConfiguration{
PartitionCount: 4,
}
- subscriberClients := make([]messaging_pb.SeaweedMessaging_SubscribeClient, topicConfiguration.PartitionCount)
+ subscriberClients := make([]mq_pb.SeaweedMessaging_SubscribeClient, topicConfiguration.PartitionCount)
subscriberCancels := make([]context.CancelFunc, topicConfiguration.PartitionCount)
for i := 0; i < int(topicConfiguration.PartitionCount); i++ {
@@ -54,19 +54,19 @@ func (mc *MessagingClient) NewSubscriber(subscriberId, namespace, topic string,
}, nil
}
-func setupSubscriberClient(ctx context.Context, grpcConnection *grpc.ClientConn, tp broker.TopicPartition, subscriberId string, startTime time.Time) (stream messaging_pb.SeaweedMessaging_SubscribeClient, err error) {
- stream, err = messaging_pb.NewSeaweedMessagingClient(grpcConnection).Subscribe(ctx)
+func setupSubscriberClient(ctx context.Context, grpcConnection *grpc.ClientConn, tp broker.TopicPartition, subscriberId string, startTime time.Time) (stream mq_pb.SeaweedMessaging_SubscribeClient, err error) {
+ stream, err = mq_pb.NewSeaweedMessagingClient(grpcConnection).Subscribe(ctx)
if err != nil {
return
}
// send init message
- err = stream.Send(&messaging_pb.SubscriberMessage{
- Init: &messaging_pb.SubscriberMessage_InitMessage{
+ err = stream.Send(&mq_pb.SubscriberMessage{
+ Init: &mq_pb.SubscriberMessage_InitMessage{
Namespace: tp.Namespace,
Topic: tp.Topic,
Partition: tp.Partition,
- StartPosition: messaging_pb.SubscriberMessage_InitMessage_TIMESTAMP,
+ StartPosition: mq_pb.SubscriberMessage_InitMessage_TIMESTAMP,
TimestampNs: startTime.UnixNano(),
SubscriberId: subscriberId,
},
@@ -78,7 +78,7 @@ func setupSubscriberClient(ctx context.Context, grpcConnection *grpc.ClientConn,
return stream, nil
}
-func doSubscribe(subscriberClient messaging_pb.SeaweedMessaging_SubscribeClient, processFn func(m *messaging_pb.Message)) error {
+func doSubscribe(subscriberClient mq_pb.SeaweedMessaging_SubscribeClient, processFn func(m *mq_pb.Message)) error {
for {
resp, listenErr := subscriberClient.Recv()
if listenErr == io.EOF {
@@ -97,12 +97,12 @@ func doSubscribe(subscriberClient messaging_pb.SeaweedMessaging_SubscribeClient,
}
// Subscribe starts goroutines to process the messages
-func (s *Subscriber) Subscribe(processFn func(m *messaging_pb.Message)) {
+func (s *Subscriber) Subscribe(processFn func(m *mq_pb.Message)) {
var wg sync.WaitGroup
for i := 0; i < len(s.subscriberClients); i++ {
if s.subscriberClients[i] != nil {
wg.Add(1)
- go func(subscriberClient messaging_pb.SeaweedMessaging_SubscribeClient) {
+ go func(subscriberClient mq_pb.SeaweedMessaging_SubscribeClient) {
defer wg.Done()
doSubscribe(subscriberClient, processFn)
}(s.subscriberClients[i])
diff --git a/weed/pb/Makefile b/weed/pb/Makefile
index a8992bde2..01322ffda 100644
--- a/weed/pb/Makefile
+++ b/weed/pb/Makefile
@@ -10,6 +10,6 @@ gen:
protoc iam.proto --go_out=./iam_pb --go-grpc_out=./iam_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative
protoc mount.proto --go_out=./mount_pb --go-grpc_out=./mount_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative
protoc s3.proto --go_out=./s3_pb --go-grpc_out=./s3_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative
- protoc messaging.proto --go_out=./messaging_pb --go-grpc_out=./messaging_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative
+ protoc mq.proto --go_out=./mq_pb --go-grpc_out=./mq_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative
# protoc filer.proto --java_out=../../other/java/client/src/main/java
cp filer.proto ../../other/java/client/src/main/proto
diff --git a/weed/pb/grpc_client_server.go b/weed/pb/grpc_client_server.go
index c7cb82a22..0a0d65a87 100644
--- a/weed/pb/grpc_client_server.go
+++ b/weed/pb/grpc_client_server.go
@@ -18,7 +18,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
- "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
)
const (
@@ -231,10 +231,10 @@ func WithOneOfGrpcMasterClients(streamingMode bool, masterGrpcAddresses map[stri
return err
}
-func WithBrokerGrpcClient(streamingMode bool, brokerGrpcAddress string, grpcDialOption grpc.DialOption, fn func(client messaging_pb.SeaweedMessagingClient) error) error {
+func WithBrokerGrpcClient(streamingMode bool, brokerGrpcAddress string, grpcDialOption grpc.DialOption, fn func(client mq_pb.SeaweedMessagingClient) error) error {
return WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
- client := messaging_pb.NewSeaweedMessagingClient(grpcConnection)
+ client := mq_pb.NewSeaweedMessagingClient(grpcConnection)
return fn(client)
}, brokerGrpcAddress, grpcDialOption)
diff --git a/weed/pb/messaging.proto b/weed/pb/messaging.proto
index 04446ad16..a73bc99a6 100644
--- a/weed/pb/messaging.proto
+++ b/weed/pb/messaging.proto
@@ -2,7 +2,7 @@ syntax = "proto3";
package messaging_pb;
-option go_package = "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb";
+option go_package = "github.com/chrislusf/seaweedfs/weed/pb/mq_pb";
option java_package = "seaweedfs.client";
option java_outer_classname = "MessagingProto";
diff --git a/weed/pb/mount_pb/mount.pb.go b/weed/pb/mount_pb/mount.pb.go
index cbaf533fe..642ffe59c 100644
--- a/weed/pb/mount_pb/mount.pb.go
+++ b/weed/pb/mount_pb/mount.pb.go
@@ -143,12 +143,12 @@ func file_mount_proto_rawDescGZIP() []byte {
var file_mount_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
var file_mount_proto_goTypes = []interface{}{
- (*ConfigureRequest)(nil), // 0: messaging_pb.ConfigureRequest
- (*ConfigureResponse)(nil), // 1: messaging_pb.ConfigureResponse
+ (*ConfigureRequest)(nil), // 0: mq_pb.ConfigureRequest
+ (*ConfigureResponse)(nil), // 1: mq_pb.ConfigureResponse
}
var file_mount_proto_depIdxs = []int32{
- 0, // 0: messaging_pb.SeaweedMount.Configure:input_type -> messaging_pb.ConfigureRequest
- 1, // 1: messaging_pb.SeaweedMount.Configure:output_type -> messaging_pb.ConfigureResponse
+ 0, // 0: mq_pb.SeaweedMount.Configure:input_type -> mq_pb.ConfigureRequest
+ 1, // 1: mq_pb.SeaweedMount.Configure:output_type -> mq_pb.ConfigureResponse
1, // [1:2] is the sub-list for method output_type
0, // [0:1] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
diff --git a/weed/pb/mount_pb/mount_grpc.pb.go b/weed/pb/mount_pb/mount_grpc.pb.go
index 41737aa21..1da6542dc 100644
--- a/weed/pb/mount_pb/mount_grpc.pb.go
+++ b/weed/pb/mount_pb/mount_grpc.pb.go
@@ -31,7 +31,7 @@ func NewSeaweedMountClient(cc grpc.ClientConnInterface) SeaweedMountClient {
func (c *seaweedMountClient) Configure(ctx context.Context, in *ConfigureRequest, opts ...grpc.CallOption) (*ConfigureResponse, error) {
out := new(ConfigureResponse)
- err := c.cc.Invoke(ctx, "/messaging_pb.SeaweedMount/Configure", in, out, opts...)
+ err := c.cc.Invoke(ctx, "/mq_pb.SeaweedMount/Configure", in, out, opts...)
if err != nil {
return nil, err
}
@@ -76,7 +76,7 @@ func _SeaweedMount_Configure_Handler(srv interface{}, ctx context.Context, dec f
}
info := &grpc.UnaryServerInfo{
Server: srv,
- FullMethod: "/messaging_pb.SeaweedMount/Configure",
+ FullMethod: "/mq_pb.SeaweedMount/Configure",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(SeaweedMountServer).Configure(ctx, req.(*ConfigureRequest))
@@ -88,7 +88,7 @@ func _SeaweedMount_Configure_Handler(srv interface{}, ctx context.Context, dec f
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var SeaweedMount_ServiceDesc = grpc.ServiceDesc{
- ServiceName: "messaging_pb.SeaweedMount",
+ ServiceName: "mq_pb.SeaweedMount",
HandlerType: (*SeaweedMountServer)(nil),
Methods: []grpc.MethodDesc{
{
diff --git a/weed/pb/messaging_pb/messaging.pb.go b/weed/pb/mq_pb/messaging.pb.go
index 5b9ca1ee3..8de152f1b 100644
--- a/weed/pb/messaging_pb/messaging.pb.go
+++ b/weed/pb/mq_pb/messaging.pb.go
@@ -2,9 +2,9 @@
// versions:
// protoc-gen-go v1.26.0
// protoc v3.17.3
-// source: messaging.proto
+// source: mq.proto
-package messaging_pb
+package mq_pb
import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
@@ -840,7 +840,7 @@ type TopicConfiguration struct {
Collection string `protobuf:"bytes,2,opt,name=collection,proto3" json:"collection,omitempty"`
Replication string `protobuf:"bytes,3,opt,name=replication,proto3" json:"replication,omitempty"`
IsTransient bool `protobuf:"varint,4,opt,name=is_transient,json=isTransient,proto3" json:"is_transient,omitempty"`
- Partitoning TopicConfiguration_Partitioning `protobuf:"varint,5,opt,name=partitoning,proto3,enum=messaging_pb.TopicConfiguration_Partitioning" json:"partitoning,omitempty"`
+ Partitoning TopicConfiguration_Partitioning `protobuf:"varint,5,opt,name=partitoning,proto3,enum=mq_pb.TopicConfiguration_Partitioning" json:"partitoning,omitempty"`
}
func (x *TopicConfiguration) Reset() {
@@ -918,7 +918,7 @@ type SubscriberMessage_InitMessage struct {
Namespace string `protobuf:"bytes,1,opt,name=namespace,proto3" json:"namespace,omitempty"`
Topic string `protobuf:"bytes,2,opt,name=topic,proto3" json:"topic,omitempty"`
Partition int32 `protobuf:"varint,3,opt,name=partition,proto3" json:"partition,omitempty"`
- StartPosition SubscriberMessage_InitMessage_StartPosition `protobuf:"varint,4,opt,name=startPosition,proto3,enum=messaging_pb.SubscriberMessage_InitMessage_StartPosition" json:"startPosition,omitempty"` // Where to begin consuming from
+ StartPosition SubscriberMessage_InitMessage_StartPosition `protobuf:"varint,4,opt,name=startPosition,proto3,enum=mq_pb.SubscriberMessage_InitMessage_StartPosition" json:"startPosition,omitempty"` // Where to begin consuming from
TimestampNs int64 `protobuf:"varint,5,opt,name=timestampNs,proto3" json:"timestampNs,omitempty"` // timestamp in nano seconds
SubscriberId string `protobuf:"bytes,6,opt,name=subscriber_id,json=subscriberId,proto3" json:"subscriber_id,omitempty"` // uniquely identify a subscriber to track consumption
}
@@ -1407,54 +1407,54 @@ func file_messaging_proto_rawDescGZIP() []byte {
var file_messaging_proto_enumTypes = make([]protoimpl.EnumInfo, 2)
var file_messaging_proto_msgTypes = make([]protoimpl.MessageInfo, 20)
var file_messaging_proto_goTypes = []interface{}{
- (SubscriberMessage_InitMessage_StartPosition)(0), // 0: messaging_pb.SubscriberMessage.InitMessage.StartPosition
- (TopicConfiguration_Partitioning)(0), // 1: messaging_pb.TopicConfiguration.Partitioning
- (*SubscriberMessage)(nil), // 2: messaging_pb.SubscriberMessage
- (*Message)(nil), // 3: messaging_pb.Message
- (*BrokerMessage)(nil), // 4: messaging_pb.BrokerMessage
- (*PublishRequest)(nil), // 5: messaging_pb.PublishRequest
- (*PublishResponse)(nil), // 6: messaging_pb.PublishResponse
- (*DeleteTopicRequest)(nil), // 7: messaging_pb.DeleteTopicRequest
- (*DeleteTopicResponse)(nil), // 8: messaging_pb.DeleteTopicResponse
- (*ConfigureTopicRequest)(nil), // 9: messaging_pb.ConfigureTopicRequest
- (*ConfigureTopicResponse)(nil), // 10: messaging_pb.ConfigureTopicResponse
- (*GetTopicConfigurationRequest)(nil), // 11: messaging_pb.GetTopicConfigurationRequest
- (*GetTopicConfigurationResponse)(nil), // 12: messaging_pb.GetTopicConfigurationResponse
- (*FindBrokerRequest)(nil), // 13: messaging_pb.FindBrokerRequest
- (*FindBrokerResponse)(nil), // 14: messaging_pb.FindBrokerResponse
- (*TopicConfiguration)(nil), // 15: messaging_pb.TopicConfiguration
- (*SubscriberMessage_InitMessage)(nil), // 16: messaging_pb.SubscriberMessage.InitMessage
- (*SubscriberMessage_AckMessage)(nil), // 17: messaging_pb.SubscriberMessage.AckMessage
- nil, // 18: messaging_pb.Message.HeadersEntry
- (*PublishRequest_InitMessage)(nil), // 19: messaging_pb.PublishRequest.InitMessage
- (*PublishResponse_ConfigMessage)(nil), // 20: messaging_pb.PublishResponse.ConfigMessage
- (*PublishResponse_RedirectMessage)(nil), // 21: messaging_pb.PublishResponse.RedirectMessage
+ (SubscriberMessage_InitMessage_StartPosition)(0), // 0: mq_pb.SubscriberMessage.InitMessage.StartPosition
+ (TopicConfiguration_Partitioning)(0), // 1: mq_pb.TopicConfiguration.Partitioning
+ (*SubscriberMessage)(nil), // 2: mq_pb.SubscriberMessage
+ (*Message)(nil), // 3: mq_pb.Message
+ (*BrokerMessage)(nil), // 4: mq_pb.BrokerMessage
+ (*PublishRequest)(nil), // 5: mq_pb.PublishRequest
+ (*PublishResponse)(nil), // 6: mq_pb.PublishResponse
+ (*DeleteTopicRequest)(nil), // 7: mq_pb.DeleteTopicRequest
+ (*DeleteTopicResponse)(nil), // 8: mq_pb.DeleteTopicResponse
+ (*ConfigureTopicRequest)(nil), // 9: mq_pb.ConfigureTopicRequest
+ (*ConfigureTopicResponse)(nil), // 10: mq_pb.ConfigureTopicResponse
+ (*GetTopicConfigurationRequest)(nil), // 11: mq_pb.GetTopicConfigurationRequest
+ (*GetTopicConfigurationResponse)(nil), // 12: mq_pb.GetTopicConfigurationResponse
+ (*FindBrokerRequest)(nil), // 13: mq_pb.FindBrokerRequest
+ (*FindBrokerResponse)(nil), // 14: mq_pb.FindBrokerResponse
+ (*TopicConfiguration)(nil), // 15: mq_pb.TopicConfiguration
+ (*SubscriberMessage_InitMessage)(nil), // 16: mq_pb.SubscriberMessage.InitMessage
+ (*SubscriberMessage_AckMessage)(nil), // 17: mq_pb.SubscriberMessage.AckMessage
+ nil, // 18: mq_pb.Message.HeadersEntry
+ (*PublishRequest_InitMessage)(nil), // 19: mq_pb.PublishRequest.InitMessage
+ (*PublishResponse_ConfigMessage)(nil), // 20: mq_pb.PublishResponse.ConfigMessage
+ (*PublishResponse_RedirectMessage)(nil), // 21: mq_pb.PublishResponse.RedirectMessage
}
var file_messaging_proto_depIdxs = []int32{
- 16, // 0: messaging_pb.SubscriberMessage.init:type_name -> messaging_pb.SubscriberMessage.InitMessage
- 17, // 1: messaging_pb.SubscriberMessage.ack:type_name -> messaging_pb.SubscriberMessage.AckMessage
- 18, // 2: messaging_pb.Message.headers:type_name -> messaging_pb.Message.HeadersEntry
- 3, // 3: messaging_pb.BrokerMessage.data:type_name -> messaging_pb.Message
- 19, // 4: messaging_pb.PublishRequest.init:type_name -> messaging_pb.PublishRequest.InitMessage
- 3, // 5: messaging_pb.PublishRequest.data:type_name -> messaging_pb.Message
- 20, // 6: messaging_pb.PublishResponse.config:type_name -> messaging_pb.PublishResponse.ConfigMessage
- 21, // 7: messaging_pb.PublishResponse.redirect:type_name -> messaging_pb.PublishResponse.RedirectMessage
- 15, // 8: messaging_pb.ConfigureTopicRequest.configuration:type_name -> messaging_pb.TopicConfiguration
- 15, // 9: messaging_pb.GetTopicConfigurationResponse.configuration:type_name -> messaging_pb.TopicConfiguration
- 1, // 10: messaging_pb.TopicConfiguration.partitoning:type_name -> messaging_pb.TopicConfiguration.Partitioning
- 0, // 11: messaging_pb.SubscriberMessage.InitMessage.startPosition:type_name -> messaging_pb.SubscriberMessage.InitMessage.StartPosition
- 2, // 12: messaging_pb.SeaweedMessaging.Subscribe:input_type -> messaging_pb.SubscriberMessage
- 5, // 13: messaging_pb.SeaweedMessaging.Publish:input_type -> messaging_pb.PublishRequest
- 7, // 14: messaging_pb.SeaweedMessaging.DeleteTopic:input_type -> messaging_pb.DeleteTopicRequest
- 9, // 15: messaging_pb.SeaweedMessaging.ConfigureTopic:input_type -> messaging_pb.ConfigureTopicRequest
- 11, // 16: messaging_pb.SeaweedMessaging.GetTopicConfiguration:input_type -> messaging_pb.GetTopicConfigurationRequest
- 13, // 17: messaging_pb.SeaweedMessaging.FindBroker:input_type -> messaging_pb.FindBrokerRequest
- 4, // 18: messaging_pb.SeaweedMessaging.Subscribe:output_type -> messaging_pb.BrokerMessage
- 6, // 19: messaging_pb.SeaweedMessaging.Publish:output_type -> messaging_pb.PublishResponse
- 8, // 20: messaging_pb.SeaweedMessaging.DeleteTopic:output_type -> messaging_pb.DeleteTopicResponse
- 10, // 21: messaging_pb.SeaweedMessaging.ConfigureTopic:output_type -> messaging_pb.ConfigureTopicResponse
- 12, // 22: messaging_pb.SeaweedMessaging.GetTopicConfiguration:output_type -> messaging_pb.GetTopicConfigurationResponse
- 14, // 23: messaging_pb.SeaweedMessaging.FindBroker:output_type -> messaging_pb.FindBrokerResponse
+ 16, // 0: mq_pb.SubscriberMessage.init:type_name -> mq_pb.SubscriberMessage.InitMessage
+ 17, // 1: mq_pb.SubscriberMessage.ack:type_name -> mq_pb.SubscriberMessage.AckMessage
+ 18, // 2: mq_pb.Message.headers:type_name -> mq_pb.Message.HeadersEntry
+ 3, // 3: mq_pb.BrokerMessage.data:type_name -> mq_pb.Message
+ 19, // 4: mq_pb.PublishRequest.init:type_name -> mq_pb.PublishRequest.InitMessage
+ 3, // 5: mq_pb.PublishRequest.data:type_name -> mq_pb.Message
+ 20, // 6: mq_pb.PublishResponse.config:type_name -> mq_pb.PublishResponse.ConfigMessage
+ 21, // 7: mq_pb.PublishResponse.redirect:type_name -> mq_pb.PublishResponse.RedirectMessage
+ 15, // 8: mq_pb.ConfigureTopicRequest.configuration:type_name -> mq_pb.TopicConfiguration
+ 15, // 9: mq_pb.GetTopicConfigurationResponse.configuration:type_name -> mq_pb.TopicConfiguration
+ 1, // 10: mq_pb.TopicConfiguration.partitoning:type_name -> mq_pb.TopicConfiguration.Partitioning
+ 0, // 11: mq_pb.SubscriberMessage.InitMessage.startPosition:type_name -> mq_pb.SubscriberMessage.InitMessage.StartPosition
+ 2, // 12: mq_pb.SeaweedMessaging.Subscribe:input_type -> mq_pb.SubscriberMessage
+ 5, // 13: mq_pb.SeaweedMessaging.Publish:input_type -> mq_pb.PublishRequest
+ 7, // 14: mq_pb.SeaweedMessaging.DeleteTopic:input_type -> mq_pb.DeleteTopicRequest
+ 9, // 15: mq_pb.SeaweedMessaging.ConfigureTopic:input_type -> mq_pb.ConfigureTopicRequest
+ 11, // 16: mq_pb.SeaweedMessaging.GetTopicConfiguration:input_type -> mq_pb.GetTopicConfigurationRequest
+ 13, // 17: mq_pb.SeaweedMessaging.FindBroker:input_type -> mq_pb.FindBrokerRequest
+ 4, // 18: mq_pb.SeaweedMessaging.Subscribe:output_type -> mq_pb.BrokerMessage
+ 6, // 19: mq_pb.SeaweedMessaging.Publish:output_type -> mq_pb.PublishResponse
+ 8, // 20: mq_pb.SeaweedMessaging.DeleteTopic:output_type -> mq_pb.DeleteTopicResponse
+ 10, // 21: mq_pb.SeaweedMessaging.ConfigureTopic:output_type -> mq_pb.ConfigureTopicResponse
+ 12, // 22: mq_pb.SeaweedMessaging.GetTopicConfiguration:output_type -> mq_pb.GetTopicConfigurationResponse
+ 14, // 23: mq_pb.SeaweedMessaging.FindBroker:output_type -> mq_pb.FindBrokerResponse
18, // [18:24] is the sub-list for method output_type
12, // [12:18] is the sub-list for method input_type
12, // [12:12] is the sub-list for extension type_name
diff --git a/weed/pb/messaging_pb/messaging_grpc.pb.go b/weed/pb/mq_pb/messaging_grpc.pb.go
index 234cffa95..0249a0b9c 100644
--- a/weed/pb/messaging_pb/messaging_grpc.pb.go
+++ b/weed/pb/mq_pb/messaging_grpc.pb.go
@@ -1,6 +1,6 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
-package messaging_pb
+package mq_pb
import (
context "context"
@@ -35,7 +35,7 @@ func NewSeaweedMessagingClient(cc grpc.ClientConnInterface) SeaweedMessagingClie
}
func (c *seaweedMessagingClient) Subscribe(ctx context.Context, opts ...grpc.CallOption) (SeaweedMessaging_SubscribeClient, error) {
- stream, err := c.cc.NewStream(ctx, &SeaweedMessaging_ServiceDesc.Streams[0], "/messaging_pb.SeaweedMessaging/Subscribe", opts...)
+ stream, err := c.cc.NewStream(ctx, &SeaweedMessaging_ServiceDesc.Streams[0], "/mq_pb.SeaweedMessaging/Subscribe", opts...)
if err != nil {
return nil, err
}
@@ -66,7 +66,7 @@ func (x *seaweedMessagingSubscribeClient) Recv() (*BrokerMessage, error) {
}
func (c *seaweedMessagingClient) Publish(ctx context.Context, opts ...grpc.CallOption) (SeaweedMessaging_PublishClient, error) {
- stream, err := c.cc.NewStream(ctx, &SeaweedMessaging_ServiceDesc.Streams[1], "/messaging_pb.SeaweedMessaging/Publish", opts...)
+ stream, err := c.cc.NewStream(ctx, &SeaweedMessaging_ServiceDesc.Streams[1], "/mq_pb.SeaweedMessaging/Publish", opts...)
if err != nil {
return nil, err
}
@@ -98,7 +98,7 @@ func (x *seaweedMessagingPublishClient) Recv() (*PublishResponse, error) {
func (c *seaweedMessagingClient) DeleteTopic(ctx context.Context, in *DeleteTopicRequest, opts ...grpc.CallOption) (*DeleteTopicResponse, error) {
out := new(DeleteTopicResponse)
- err := c.cc.Invoke(ctx, "/messaging_pb.SeaweedMessaging/DeleteTopic", in, out, opts...)
+ err := c.cc.Invoke(ctx, "/mq_pb.SeaweedMessaging/DeleteTopic", in, out, opts...)
if err != nil {
return nil, err
}
@@ -107,7 +107,7 @@ func (c *seaweedMessagingClient) DeleteTopic(ctx context.Context, in *DeleteTopi
func (c *seaweedMessagingClient) ConfigureTopic(ctx context.Context, in *ConfigureTopicRequest, opts ...grpc.CallOption) (*ConfigureTopicResponse, error) {
out := new(ConfigureTopicResponse)
- err := c.cc.Invoke(ctx, "/messaging_pb.SeaweedMessaging/ConfigureTopic", in, out, opts...)
+ err := c.cc.Invoke(ctx, "/mq_pb.SeaweedMessaging/ConfigureTopic", in, out, opts...)
if err != nil {
return nil, err
}
@@ -116,7 +116,7 @@ func (c *seaweedMessagingClient) ConfigureTopic(ctx context.Context, in *Configu
func (c *seaweedMessagingClient) GetTopicConfiguration(ctx context.Context, in *GetTopicConfigurationRequest, opts ...grpc.CallOption) (*GetTopicConfigurationResponse, error) {
out := new(GetTopicConfigurationResponse)
- err := c.cc.Invoke(ctx, "/messaging_pb.SeaweedMessaging/GetTopicConfiguration", in, out, opts...)
+ err := c.cc.Invoke(ctx, "/mq_pb.SeaweedMessaging/GetTopicConfiguration", in, out, opts...)
if err != nil {
return nil, err
}
@@ -125,7 +125,7 @@ func (c *seaweedMessagingClient) GetTopicConfiguration(ctx context.Context, in *
func (c *seaweedMessagingClient) FindBroker(ctx context.Context, in *FindBrokerRequest, opts ...grpc.CallOption) (*FindBrokerResponse, error) {
out := new(FindBrokerResponse)
- err := c.cc.Invoke(ctx, "/messaging_pb.SeaweedMessaging/FindBroker", in, out, opts...)
+ err := c.cc.Invoke(ctx, "/mq_pb.SeaweedMessaging/FindBroker", in, out, opts...)
if err != nil {
return nil, err
}
@@ -242,7 +242,7 @@ func _SeaweedMessaging_DeleteTopic_Handler(srv interface{}, ctx context.Context,
}
info := &grpc.UnaryServerInfo{
Server: srv,
- FullMethod: "/messaging_pb.SeaweedMessaging/DeleteTopic",
+ FullMethod: "/mq_pb.SeaweedMessaging/DeleteTopic",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(SeaweedMessagingServer).DeleteTopic(ctx, req.(*DeleteTopicRequest))
@@ -260,7 +260,7 @@ func _SeaweedMessaging_ConfigureTopic_Handler(srv interface{}, ctx context.Conte
}
info := &grpc.UnaryServerInfo{
Server: srv,
- FullMethod: "/messaging_pb.SeaweedMessaging/ConfigureTopic",
+ FullMethod: "/mq_pb.SeaweedMessaging/ConfigureTopic",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(SeaweedMessagingServer).ConfigureTopic(ctx, req.(*ConfigureTopicRequest))
@@ -278,7 +278,7 @@ func _SeaweedMessaging_GetTopicConfiguration_Handler(srv interface{}, ctx contex
}
info := &grpc.UnaryServerInfo{
Server: srv,
- FullMethod: "/messaging_pb.SeaweedMessaging/GetTopicConfiguration",
+ FullMethod: "/mq_pb.SeaweedMessaging/GetTopicConfiguration",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(SeaweedMessagingServer).GetTopicConfiguration(ctx, req.(*GetTopicConfigurationRequest))
@@ -296,7 +296,7 @@ func _SeaweedMessaging_FindBroker_Handler(srv interface{}, ctx context.Context,
}
info := &grpc.UnaryServerInfo{
Server: srv,
- FullMethod: "/messaging_pb.SeaweedMessaging/FindBroker",
+ FullMethod: "/mq_pb.SeaweedMessaging/FindBroker",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(SeaweedMessagingServer).FindBroker(ctx, req.(*FindBrokerRequest))
@@ -308,7 +308,7 @@ func _SeaweedMessaging_FindBroker_Handler(srv interface{}, ctx context.Context,
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var SeaweedMessaging_ServiceDesc = grpc.ServiceDesc{
- ServiceName: "messaging_pb.SeaweedMessaging",
+ ServiceName: "mq_pb.SeaweedMessaging",
HandlerType: (*SeaweedMessagingServer)(nil),
Methods: []grpc.MethodDesc{
{
@@ -342,5 +342,5 @@ var SeaweedMessaging_ServiceDesc = grpc.ServiceDesc{
ClientStreams: true,
},
},
- Metadata: "messaging.proto",
+ Metadata: "mq.proto",
}
diff --git a/weed/pb/s3_pb/s3.pb.go b/weed/pb/s3_pb/s3.pb.go
index c1bd23556..9923961a8 100644
--- a/weed/pb/s3_pb/s3.pb.go
+++ b/weed/pb/s3_pb/s3.pb.go
@@ -283,20 +283,20 @@ func file_s3_proto_rawDescGZIP() []byte {
var file_s3_proto_msgTypes = make([]protoimpl.MessageInfo, 6)
var file_s3_proto_goTypes = []interface{}{
- (*S3ConfigureRequest)(nil), // 0: messaging_pb.S3ConfigureRequest
- (*S3ConfigureResponse)(nil), // 1: messaging_pb.S3ConfigureResponse
- (*S3CircuitBreakerConfig)(nil), // 2: messaging_pb.S3CircuitBreakerConfig
- (*S3CircuitBreakerOptions)(nil), // 3: messaging_pb.S3CircuitBreakerOptions
- nil, // 4: messaging_pb.S3CircuitBreakerConfig.BucketsEntry
- nil, // 5: messaging_pb.S3CircuitBreakerOptions.ActionsEntry
+ (*S3ConfigureRequest)(nil), // 0: mq_pb.S3ConfigureRequest
+ (*S3ConfigureResponse)(nil), // 1: mq_pb.S3ConfigureResponse
+ (*S3CircuitBreakerConfig)(nil), // 2: mq_pb.S3CircuitBreakerConfig
+ (*S3CircuitBreakerOptions)(nil), // 3: mq_pb.S3CircuitBreakerOptions
+ nil, // 4: mq_pb.S3CircuitBreakerConfig.BucketsEntry
+ nil, // 5: mq_pb.S3CircuitBreakerOptions.ActionsEntry
}
var file_s3_proto_depIdxs = []int32{
- 3, // 0: messaging_pb.S3CircuitBreakerConfig.global:type_name -> messaging_pb.S3CircuitBreakerOptions
- 4, // 1: messaging_pb.S3CircuitBreakerConfig.buckets:type_name -> messaging_pb.S3CircuitBreakerConfig.BucketsEntry
- 5, // 2: messaging_pb.S3CircuitBreakerOptions.actions:type_name -> messaging_pb.S3CircuitBreakerOptions.ActionsEntry
- 3, // 3: messaging_pb.S3CircuitBreakerConfig.BucketsEntry.value:type_name -> messaging_pb.S3CircuitBreakerOptions
- 0, // 4: messaging_pb.SeaweedS3.Configure:input_type -> messaging_pb.S3ConfigureRequest
- 1, // 5: messaging_pb.SeaweedS3.Configure:output_type -> messaging_pb.S3ConfigureResponse
+ 3, // 0: mq_pb.S3CircuitBreakerConfig.global:type_name -> mq_pb.S3CircuitBreakerOptions
+ 4, // 1: mq_pb.S3CircuitBreakerConfig.buckets:type_name -> mq_pb.S3CircuitBreakerConfig.BucketsEntry
+ 5, // 2: mq_pb.S3CircuitBreakerOptions.actions:type_name -> mq_pb.S3CircuitBreakerOptions.ActionsEntry
+ 3, // 3: mq_pb.S3CircuitBreakerConfig.BucketsEntry.value:type_name -> mq_pb.S3CircuitBreakerOptions
+ 0, // 4: mq_pb.SeaweedS3.Configure:input_type -> mq_pb.S3ConfigureRequest
+ 1, // 5: mq_pb.SeaweedS3.Configure:output_type -> mq_pb.S3ConfigureResponse
5, // [5:6] is the sub-list for method output_type
4, // [4:5] is the sub-list for method input_type
4, // [4:4] is the sub-list for extension type_name
diff --git a/weed/pb/s3_pb/s3_grpc.pb.go b/weed/pb/s3_pb/s3_grpc.pb.go
index 1bc956be6..c00268bad 100644
--- a/weed/pb/s3_pb/s3_grpc.pb.go
+++ b/weed/pb/s3_pb/s3_grpc.pb.go
@@ -31,7 +31,7 @@ func NewSeaweedS3Client(cc grpc.ClientConnInterface) SeaweedS3Client {
func (c *seaweedS3Client) Configure(ctx context.Context, in *S3ConfigureRequest, opts ...grpc.CallOption) (*S3ConfigureResponse, error) {
out := new(S3ConfigureResponse)
- err := c.cc.Invoke(ctx, "/messaging_pb.SeaweedS3/Configure", in, out, opts...)
+ err := c.cc.Invoke(ctx, "/mq_pb.SeaweedS3/Configure", in, out, opts...)
if err != nil {
return nil, err
}
@@ -76,7 +76,7 @@ func _SeaweedS3_Configure_Handler(srv interface{}, ctx context.Context, dec func
}
info := &grpc.UnaryServerInfo{
Server: srv,
- FullMethod: "/messaging_pb.SeaweedS3/Configure",
+ FullMethod: "/mq_pb.SeaweedS3/Configure",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(SeaweedS3Server).Configure(ctx, req.(*S3ConfigureRequest))
@@ -88,7 +88,7 @@ func _SeaweedS3_Configure_Handler(srv interface{}, ctx context.Context, dec func
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var SeaweedS3_ServiceDesc = grpc.ServiceDesc{
- ServiceName: "messaging_pb.SeaweedS3",
+ ServiceName: "mq_pb.SeaweedS3",
HandlerType: (*SeaweedS3Server)(nil),
Methods: []grpc.MethodDesc{
{
diff --git a/weed/util/log_buffer/log_read.go b/weed/util/log_buffer/log_read.go
index 99532b47b..35bc8ffd5 100644
--- a/weed/util/log_buffer/log_read.go
+++ b/weed/util/log_buffer/log_read.go
@@ -68,7 +68,7 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startReadTime
logEntry := &filer_pb.LogEntry{}
if err = proto.Unmarshal(entryData, logEntry); err != nil {
- glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err)
+ glog.Errorf("unexpected unmarshal mq_pb.Message: %v", err)
pos += 4 + int(size)
continue
}