aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-04-17 22:39:21 -0700
committerChris Lu <chris.lu@gmail.com>2020-04-17 22:39:21 -0700
commit5bea77010f2b290398c46a517d40fa7ad559dfed (patch)
tree661d106f73934262bc6ba587058e68460afce4e9
parent2e2537a9ea442476efb12e4caa2663b804cf4ee1 (diff)
downloadseaweedfs-5bea77010f2b290398c46a517d40fa7ad559dfed.tar.xz
seaweedfs-5bea77010f2b290398c46a517d40fa7ad559dfed.zip
refactor
-rw-r--r--weed/messaging/broker_grpc_server.go88
-rw-r--r--weed/messaging/broker_grpc_server_publish.go90
-rw-r--r--weed/messaging/broker_grpc_server_subscribe.go9
3 files changed, 99 insertions, 88 deletions
diff --git a/weed/messaging/broker_grpc_server.go b/weed/messaging/broker_grpc_server.go
index f4b8321e2..0d1eb72ac 100644
--- a/weed/messaging/broker_grpc_server.go
+++ b/weed/messaging/broker_grpc_server.go
@@ -2,98 +2,10 @@ package messaging
import (
"context"
- "fmt"
- "io"
- "time"
- "github.com/golang/protobuf/proto"
-
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
- "github.com/chrislusf/seaweedfs/weed/util/log_buffer"
)
-func (broker *MessageBroker) Subscribe(server messaging_pb.SeaweedMessaging_SubscribeServer) error {
- panic("implement me")
-}
-
-func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_PublishServer) error {
-
- // process initial request
- in, err := stream.Recv()
- if err == io.EOF {
- return nil
- }
- if err != nil {
- return err
- }
- namespace, topic, partition := in.Init.Namespace, in.Init.Topic, in.Init.Partition
-
- updatesChan := make(chan int32)
-
- // TODO look it up
- topicConfig := &messaging_pb.TopicConfiguration{
-
- }
-
- go func() {
- for update := range updatesChan {
- if err := stream.Send(&messaging_pb.PublishResponse{
- Config: &messaging_pb.PublishResponse_ConfigMessage{
- PartitionCount: update,
- },
- }); err != nil {
- glog.V(0).Infof("err sending publish response: %v", err)
- return
- }
- }
- }()
-
- logBuffer := log_buffer.NewLogBuffer(time.Minute, func(startTime, stopTime time.Time, buf []byte) {
-
- targetFile := fmt.Sprintf(
- "%s/%s/%s/%04d-%02d-%02d/%02d-%02d.part%02d",
- filer2.TopicsDir, namespace, topic,
- startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(),
- partition,
- )
-
- if err := broker.appendToFile(targetFile, topicConfig, buf); err != nil {
- glog.V(0).Infof("log write failed %s: %v", targetFile, err)
- }
-
- }, func() {
- // notify subscribers
- })
-
- for {
- in, err := stream.Recv()
- if err == io.EOF {
- return nil
- }
- if err != nil {
- return err
- }
-
- m := &messaging_pb.Message{
- Timestamp: time.Now().UnixNano(),
- Key: in.Data.Key,
- Value: in.Data.Value,
- Headers: in.Data.Headers,
- }
-
- data, err := proto.Marshal(m)
- if err != nil {
- glog.Errorf("marshall error: %v\n", err)
- continue
- }
-
- logBuffer.AddToBuffer(in.Data.Key, data)
-
- }
-}
-
func (broker *MessageBroker) ConfigureTopic(c context.Context, request *messaging_pb.ConfigureTopicRequest) (*messaging_pb.ConfigureTopicResponse, error) {
panic("implement me")
}
diff --git a/weed/messaging/broker_grpc_server_publish.go b/weed/messaging/broker_grpc_server_publish.go
new file mode 100644
index 000000000..db3bf0764
--- /dev/null
+++ b/weed/messaging/broker_grpc_server_publish.go
@@ -0,0 +1,90 @@
+package messaging
+
+import (
+ "fmt"
+ "io"
+ "time"
+
+ "github.com/golang/protobuf/proto"
+
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+ "github.com/chrislusf/seaweedfs/weed/util/log_buffer"
+)
+
+func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_PublishServer) error {
+
+ // process initial request
+ in, err := stream.Recv()
+ if err == io.EOF {
+ return nil
+ }
+ if err != nil {
+ return err
+ }
+ namespace, topic, partition := in.Init.Namespace, in.Init.Topic, in.Init.Partition
+
+ updatesChan := make(chan int32)
+
+ // TODO look it up
+ topicConfig := &messaging_pb.TopicConfiguration{
+
+ }
+
+ go func() {
+ for update := range updatesChan {
+ if err := stream.Send(&messaging_pb.PublishResponse{
+ Config: &messaging_pb.PublishResponse_ConfigMessage{
+ PartitionCount: update,
+ },
+ }); err != nil {
+ glog.V(0).Infof("err sending publish response: %v", err)
+ return
+ }
+ }
+ }()
+
+ logBuffer := log_buffer.NewLogBuffer(time.Minute, func(startTime, stopTime time.Time, buf []byte) {
+
+ targetFile := fmt.Sprintf(
+ "%s/%s/%s/%04d-%02d-%02d/%02d-%02d.part%02d",
+ filer2.TopicsDir, namespace, topic,
+ startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(),
+ partition,
+ )
+
+ if err := broker.appendToFile(targetFile, topicConfig, buf); err != nil {
+ glog.V(0).Infof("log write failed %s: %v", targetFile, err)
+ }
+
+ }, func() {
+ // notify subscribers
+ })
+
+ for {
+ in, err := stream.Recv()
+ if err == io.EOF {
+ return nil
+ }
+ if err != nil {
+ return err
+ }
+
+ m := &messaging_pb.Message{
+ Timestamp: time.Now().UnixNano(),
+ Key: in.Data.Key,
+ Value: in.Data.Value,
+ Headers: in.Data.Headers,
+ }
+
+ data, err := proto.Marshal(m)
+ if err != nil {
+ glog.Errorf("marshall error: %v\n", err)
+ continue
+ }
+
+ logBuffer.AddToBuffer(in.Data.Key, data)
+
+ }
+}
diff --git a/weed/messaging/broker_grpc_server_subscribe.go b/weed/messaging/broker_grpc_server_subscribe.go
new file mode 100644
index 000000000..137fcac8a
--- /dev/null
+++ b/weed/messaging/broker_grpc_server_subscribe.go
@@ -0,0 +1,9 @@
+package messaging
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+)
+
+func (broker *MessageBroker) Subscribe(server messaging_pb.SeaweedMessaging_SubscribeServer) error {
+ panic("implement me")
+}