aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-07-01 22:44:28 -0700
committerchrislu <chris.lu@gmail.com>2022-07-28 23:22:07 -0700
commitac66227798208b80b79791b3dbb68aeac3449a1c (patch)
tree09360da2818a5fdb7680d35483fe29934f90cb39 /weed
parent21b6b07dd8d0379d835f9d9c1259155a12f1e61b (diff)
downloadseaweedfs-ac66227798208b80b79791b3dbb68aeac3449a1c.tar.xz
seaweedfs-ac66227798208b80b79791b3dbb68aeac3449a1c.zip
renaming
Diffstat (limited to 'weed')
-rw-r--r--weed/mq/broker/broker_append.go10
-rw-r--r--weed/mq/broker/broker_grpc_server_subscribe.go4
-rw-r--r--weed/mq/broker/broker_server.go16
-rw-r--r--weed/mq/broker/topic_manager.go4
4 files changed, 17 insertions, 17 deletions
diff --git a/weed/mq/broker/broker_append.go b/weed/mq/broker/broker_append.go
index 4f3af0ff8..c8e0da93c 100644
--- a/weed/mq/broker/broker_append.go
+++ b/weed/mq/broker/broker_append.go
@@ -14,7 +14,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
)
-func (broker *MessageBroker) appendToFile(targetFile string, topicConfig *mq_pb.TopicConfiguration, data []byte) error {
+func (broker *MessageQueueBroker) appendToFile(targetFile string, topicConfig *mq_pb.TopicConfiguration, data []byte) error {
assignResult, uploadResult, err2 := broker.assignAndUpload(topicConfig, data)
if err2 != nil {
@@ -46,7 +46,7 @@ func (broker *MessageBroker) appendToFile(targetFile string, topicConfig *mq_pb.
return nil
}
-func (broker *MessageBroker) assignAndUpload(topicConfig *mq_pb.TopicConfiguration, data []byte) (*operation.AssignResult, *operation.UploadResult, error) {
+func (broker *MessageQueueBroker) assignAndUpload(topicConfig *mq_pb.TopicConfiguration, data []byte) (*operation.AssignResult, *operation.UploadResult, error) {
var assignResult = &operation.AssignResult{}
@@ -106,9 +106,9 @@ func (broker *MessageBroker) assignAndUpload(topicConfig *mq_pb.TopicConfigurati
return assignResult, uploadResult, nil
}
-var _ = filer_pb.FilerClient(&MessageBroker{})
+var _ = filer_pb.FilerClient(&MessageQueueBroker{})
-func (broker *MessageBroker) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) (err error) {
+func (broker *MessageQueueBroker) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) (err error) {
for _, filer := range broker.option.Filers {
if err = pb.WithFilerClient(streamingMode, filer, broker.grpcDialOption, fn); err != nil {
@@ -125,6 +125,6 @@ func (broker *MessageBroker) WithFilerClient(streamingMode bool, fn func(filer_p
}
-func (broker *MessageBroker) AdjustedUrl(location *filer_pb.Location) string {
+func (broker *MessageQueueBroker) AdjustedUrl(location *filer_pb.Location) string {
return location.Url
}
diff --git a/weed/mq/broker/broker_grpc_server_subscribe.go b/weed/mq/broker/broker_grpc_server_subscribe.go
index 1a9c62d75..3743218b1 100644
--- a/weed/mq/broker/broker_grpc_server_subscribe.go
+++ b/weed/mq/broker/broker_grpc_server_subscribe.go
@@ -16,7 +16,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
)
-func (broker *MessageBroker) Subscribe(stream mq_pb.SeaweedMessaging_SubscribeServer) error {
+func (broker *MessageQueueBroker) Subscribe(stream mq_pb.SeaweedMessaging_SubscribeServer) error {
// process initial request
in, err := stream.Recv()
@@ -138,7 +138,7 @@ func (broker *MessageBroker) Subscribe(stream mq_pb.SeaweedMessaging_SubscribeSe
}
-func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTime time.Time, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (err error) {
+func (broker *MessageQueueBroker) readPersistedLogBuffer(tp *TopicPartition, startTime time.Time, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (err error) {
startTime = startTime.UTC()
startDate := fmt.Sprintf("%04d-%02d-%02d", startTime.Year(), startTime.Month(), startTime.Day())
startHourMinute := fmt.Sprintf("%02d-%02d", startTime.Hour(), startTime.Minute())
diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go
index 5aa5285c9..3fd01fb53 100644
--- a/weed/mq/broker/broker_server.go
+++ b/weed/mq/broker/broker_server.go
@@ -13,7 +13,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
)
-type MessageBrokerOption struct {
+type MessageQueueBrokerOption struct {
Filers []pb.ServerAddress
DefaultReplication string
MaxMB int
@@ -22,16 +22,16 @@ type MessageBrokerOption struct {
Cipher bool
}
-type MessageBroker struct {
+type MessageQueueBroker struct {
mq_pb.UnimplementedSeaweedMessagingServer
- option *MessageBrokerOption
+ option *MessageQueueBrokerOption
grpcDialOption grpc.DialOption
topicManager *TopicManager
}
-func NewMessageBroker(option *MessageBrokerOption, grpcDialOption grpc.DialOption) (messageBroker *MessageBroker, err error) {
+func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (messageBroker *MessageQueueBroker, err error) {
- messageBroker = &MessageBroker{
+ messageBroker = &MessageQueueBroker{
option: option,
grpcDialOption: grpcDialOption,
}
@@ -45,7 +45,7 @@ func NewMessageBroker(option *MessageBrokerOption, grpcDialOption grpc.DialOptio
return messageBroker, nil
}
-func (broker *MessageBroker) keepConnectedToOneFiler() {
+func (broker *MessageQueueBroker) keepConnectedToOneFiler() {
for {
for _, filer := range broker.option.Filers {
@@ -101,13 +101,13 @@ func (broker *MessageBroker) keepConnectedToOneFiler() {
}
-func (broker *MessageBroker) withFilerClient(streamingMode bool, filer pb.ServerAddress, fn func(filer_pb.SeaweedFilerClient) error) error {
+func (broker *MessageQueueBroker) withFilerClient(streamingMode bool, filer pb.ServerAddress, fn func(filer_pb.SeaweedFilerClient) error) error {
return pb.WithFilerClient(streamingMode, filer, broker.grpcDialOption, fn)
}
-func (broker *MessageBroker) withMasterClient(streamingMode bool, master pb.ServerAddress, fn func(client master_pb.SeaweedClient) error) error {
+func (broker *MessageQueueBroker) withMasterClient(streamingMode bool, master pb.ServerAddress, fn func(client master_pb.SeaweedClient) error) error {
return pb.WithMasterClient(streamingMode, master, broker.grpcDialOption, func(client master_pb.SeaweedClient) error {
return fn(client)
diff --git a/weed/mq/broker/topic_manager.go b/weed/mq/broker/topic_manager.go
index 1acf085fa..34f063d9a 100644
--- a/weed/mq/broker/topic_manager.go
+++ b/weed/mq/broker/topic_manager.go
@@ -36,10 +36,10 @@ type TopicControl struct {
type TopicManager struct {
sync.Mutex
topicControls map[TopicPartition]*TopicControl
- broker *MessageBroker
+ broker *MessageQueueBroker
}
-func NewTopicManager(messageBroker *MessageBroker) *TopicManager {
+func NewTopicManager(messageBroker *MessageQueueBroker) *TopicManager {
return &TopicManager{
topicControls: make(map[TopicPartition]*TopicControl),
broker: messageBroker,