aboutsummaryrefslogtreecommitdiff
path: root/weed/messaging
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-04-19 03:03:40 -0700
committerChris Lu <chris.lu@gmail.com>2020-04-19 03:03:40 -0700
commit71ffb98475d05c7e76e8c46a9525f29ef97f6228 (patch)
tree133b4b4d5bbe685ac33334ec2e6ef759a619b71a /weed/messaging
parentd278b4c228905694a24cca2c9d08d8b8faa905bb (diff)
downloadseaweedfs-71ffb98475d05c7e76e8c46a9525f29ef97f6228.tar.xz
seaweedfs-71ffb98475d05c7e76e8c46a9525f29ef97f6228.zip
broker: add profiling
Diffstat (limited to 'weed/messaging')
-rw-r--r--weed/messaging/broker/broker_append.go4
-rw-r--r--weed/messaging/broker/broker_grpc_server_publish.go2
-rw-r--r--weed/messaging/client/client.go8
-rw-r--r--weed/messaging/client/publisher.go5
-rw-r--r--weed/messaging/client/subscriber.go4
5 files changed, 11 insertions, 12 deletions
diff --git a/weed/messaging/broker/broker_append.go b/weed/messaging/broker/broker_append.go
index 7194dfcfc..c1ef063fb 100644
--- a/weed/messaging/broker/broker_append.go
+++ b/weed/messaging/broker/broker_append.go
@@ -16,6 +16,10 @@ import (
func (broker *MessageBroker) appendToFile(targetFile string, topicConfig *messaging_pb.TopicConfiguration, data []byte) error {
+ if topicConfig.IsTransient {
+ return nil
+ }
+
assignResult, uploadResult, err2 := broker.assignAndUpload(topicConfig, data)
if err2 != nil {
return err2
diff --git a/weed/messaging/broker/broker_grpc_server_publish.go b/weed/messaging/broker/broker_grpc_server_publish.go
index 89c568b0d..210127be3 100644
--- a/weed/messaging/broker/broker_grpc_server_publish.go
+++ b/weed/messaging/broker/broker_grpc_server_publish.go
@@ -23,7 +23,7 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis
// TODO look it up
topicConfig := &messaging_pb.TopicConfiguration{
-
+ IsTransient: true,
}
// send init response
diff --git a/weed/messaging/client/client.go b/weed/messaging/client/client.go
index 3f6d1ca53..910fd02e9 100644
--- a/weed/messaging/client/client.go
+++ b/weed/messaging/client/client.go
@@ -23,12 +23,12 @@ func NewMessagingClient(bootstrapBrokers []string) (*MessagingClient, error) {
return nil, err
}
+ util.OnInterrupt(func() {
+ grpcConnection.Close()
+ })
+
return &MessagingClient{
bootstrapBrokers: bootstrapBrokers,
grpcConnection: grpcConnection,
}, nil
}
-
-func (mc *MessagingClient) Shutdown() {
- mc.grpcConnection.Close()
-}
diff --git a/weed/messaging/client/publisher.go b/weed/messaging/client/publisher.go
index d4c0f798a..238b67783 100644
--- a/weed/messaging/client/publisher.go
+++ b/weed/messaging/client/publisher.go
@@ -69,8 +69,7 @@ func (p *Publisher) Publish(m *messaging_pb.RawData) error {
}
-func (p *Publisher) Shutdown() {
-
- p.publishClient.CloseSend()
+func (p *Publisher) Close() error {
+ return p.publishClient.CloseSend()
}
diff --git a/weed/messaging/client/subscriber.go b/weed/messaging/client/subscriber.go
index 0b0cf58f9..407cd4ac6 100644
--- a/weed/messaging/client/subscriber.go
+++ b/weed/messaging/client/subscriber.go
@@ -59,7 +59,3 @@ func (s *Subscriber) Subscribe(processFn func(m *messaging_pb.Message)) error {
processFn(resp.Data)
}
}
-
-func (s *Subscriber) Shutdown() {
- s.subscriberClient.CloseSend()
-}