aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-04-16 02:55:09 -0700
committerChris Lu <chris.lu@gmail.com>2020-04-16 02:55:09 -0700
commitf5a748d33c52a2874dbde92746a03140ef379296 (patch)
tree21a9bdf9c329a94249ca026e2aa04647c414beb4
parentce4b369be25f61be0810e2bc119964b613cca121 (diff)
downloadseaweedfs-f5a748d33c52a2874dbde92746a03140ef379296.tar.xz
seaweedfs-f5a748d33c52a2874dbde92746a03140ef379296.zip
refactoring
-rw-r--r--weed/command/msg_broker.go7
-rw-r--r--weed/messaging/msg_broker_grpc_server.go98
-rw-r--r--weed/messaging/msg_broker_server.go (renamed from weed/server/msg_broker_server.go)2
-rw-r--r--weed/server/msg_broker_grpc_server.go23
-rw-r--r--weed/util/log_buffer/log_buffer.go4
5 files changed, 104 insertions, 30 deletions
diff --git a/weed/command/msg_broker.go b/weed/command/msg_broker.go
index 3cb424298..f77582f03 100644
--- a/weed/command/msg_broker.go
+++ b/weed/command/msg_broker.go
@@ -8,13 +8,12 @@ import (
"google.golang.org/grpc/reflection"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/messaging"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
"github.com/chrislusf/seaweedfs/weed/security"
- weed_server "github.com/chrislusf/seaweedfs/weed/server"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -79,7 +78,7 @@ func (msgBrokerOpt *QueueOptions) startQueueServer() bool {
}
}
- qs, err := weed_server.NewMessageBroker(&weed_server.MessageBrokerOption{
+ qs, err := messaging.NewMessageBroker(&messaging.MessageBrokerOption{
Filers: []string{*msgBrokerOpt.filer},
DefaultReplication: "",
MaxMB: 0,
diff --git a/weed/messaging/msg_broker_grpc_server.go b/weed/messaging/msg_broker_grpc_server.go
new file mode 100644
index 000000000..a29bc11b0
--- /dev/null
+++ b/weed/messaging/msg_broker_grpc_server.go
@@ -0,0 +1,98 @@
+package messaging
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "time"
+
+ "github.com/golang/protobuf/proto"
+
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
+ "github.com/chrislusf/seaweedfs/weed/util/log_buffer"
+)
+
+func (broker *MessageBroker) Subscribe(request *messaging_pb.SubscribeRequest, server messaging_pb.SeaweedMessaging_SubscribeServer) error {
+ panic("implement me")
+}
+
+func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_PublishServer) error {
+
+ // process initial request
+ in, err := stream.Recv()
+ if err == io.EOF {
+ return nil
+ }
+ if err != nil {
+ return err
+ }
+ namespace, topic, partition := in.Namespace, in.Topic, in.Partition
+
+ updatesChan := make(chan int32)
+
+ go func() {
+ for update := range updatesChan {
+ if err := stream.Send(&messaging_pb.PublishResponse{
+ PartitionCount: update,
+ }); err != nil {
+ glog.V(0).Infof("err sending publish response: %v", err)
+ return
+ }
+ }
+ }()
+
+ logBuffer := log_buffer.NewLogBuffer(time.Minute, func(startTime, stopTime time.Time, buf []byte) {
+
+ //targetFile :=
+ fmt.Sprintf("%s/%s/%s/%04d-%02d-%02d/%02d-%02d.part%02d",
+ filer2.TopicsDir, namespace, topic,
+ startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(),
+ partition,
+ )
+
+ /*
+ if err := f.appendToFile(targetFile, buf); err != nil {
+ glog.V(0).Infof("log write failed %s: %v", targetFile, err)
+ }
+ */
+
+ }, func() {
+ // notify subscribers
+ })
+
+ for {
+ in, err := stream.Recv()
+ if err == io.EOF {
+ return nil
+ }
+ if err != nil {
+ return err
+ }
+
+ m := &messaging_pb.Message{
+ Timestamp: time.Now().UnixNano(),
+ Key: in.Key,
+ Value: in.Value,
+ Headers: in.Headers,
+ }
+
+ data, err := proto.Marshal(m)
+ if err != nil {
+ glog.Errorf("marshall error: %v\n", err)
+ continue
+ }
+
+ logBuffer.AddToBuffer(in.Key, data)
+
+ }
+}
+
+func (broker *MessageBroker) ConfigureTopic(c context.Context, request *messaging_pb.ConfigureTopicRequest) (*messaging_pb.ConfigureTopicResponse, error) {
+ panic("implement me")
+}
+
+func (broker *MessageBroker) GetTopicConfiguration(c context.Context, request *messaging_pb.GetTopicConfigurationRequest) (*messaging_pb.GetTopicConfigurationResponse, error) {
+ panic("implement me")
+}
diff --git a/weed/server/msg_broker_server.go b/weed/messaging/msg_broker_server.go
index a9d908581..9174ca4cf 100644
--- a/weed/server/msg_broker_server.go
+++ b/weed/messaging/msg_broker_server.go
@@ -1,4 +1,4 @@
-package weed_server
+package messaging
import (
"context"
diff --git a/weed/server/msg_broker_grpc_server.go b/weed/server/msg_broker_grpc_server.go
deleted file mode 100644
index 6feaa0c63..000000000
--- a/weed/server/msg_broker_grpc_server.go
+++ /dev/null
@@ -1,23 +0,0 @@
-package weed_server
-
-import (
- "context"
-
- "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
-)
-
-func (broker *MessageBroker) Subscribe(request *messaging_pb.SubscribeRequest, server messaging_pb.SeaweedMessaging_SubscribeServer) error {
- panic("implement me")
-}
-
-func (broker *MessageBroker) Publish(server messaging_pb.SeaweedMessaging_PublishServer) error {
- panic("implement me")
-}
-
-func (broker *MessageBroker) ConfigureTopic(c context.Context, request *messaging_pb.ConfigureTopicRequest) (*messaging_pb.ConfigureTopicResponse, error) {
- panic("implement me")
-}
-
-func (broker *MessageBroker) GetTopicConfiguration(c context.Context, request *messaging_pb.GetTopicConfigurationRequest) (*messaging_pb.GetTopicConfigurationResponse, error) {
- panic("implement me")
-}
diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go
index 04fabdf8c..c7cb90549 100644
--- a/weed/util/log_buffer/log_buffer.go
+++ b/weed/util/log_buffer/log_buffer.go
@@ -51,7 +51,7 @@ func NewLogBuffer(flushInterval time.Duration, flushFn func(startTime, stopTime
return lb
}
-func (m *LogBuffer) AddToBuffer(key, data []byte) {
+func (m *LogBuffer) AddToBuffer(partitionKey, data []byte) {
m.Lock()
defer func() {
@@ -65,7 +65,7 @@ func (m *LogBuffer) AddToBuffer(key, data []byte) {
ts := time.Now()
logEntry := &filer_pb.LogEntry{
TsNs: ts.UnixNano(),
- PartitionKeyHash: util.HashToInt32(key),
+ PartitionKeyHash: util.HashToInt32(partitionKey),
Data: data,
}