diff options
| author | Chris Lu <chris.lu@gmail.com> | 2020-04-19 23:37:04 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2020-04-19 23:37:50 -0700 |
| commit | ce3cb25cfbf30a06348386210f72cc51c3fbd13a (patch) | |
| tree | 217836bc07221d97a180f7f2fba41dd51ddb3eae /weed/messaging | |
| parent | f37323222751c104723273293a0b15b209021f32 (diff) | |
| download | seaweedfs-ce3cb25cfbf30a06348386210f72cc51c3fbd13a.tar.xz seaweedfs-ce3cb25cfbf30a06348386210f72cc51c3fbd13a.zip | |
working for in memory single log buffer
Diffstat (limited to 'weed/messaging')
| -rw-r--r-- | weed/messaging/broker/broker_append.go | 4 | ||||
| -rw-r--r-- | weed/messaging/broker/broker_grpc_server_publish.go | 12 | ||||
| -rw-r--r-- | weed/messaging/broker/broker_grpc_server_subscribe.go | 69 | ||||
| -rw-r--r-- | weed/messaging/broker/topic_lock.go | 4 | ||||
| -rw-r--r-- | weed/messaging/client/publisher.go | 2 |
5 files changed, 38 insertions, 53 deletions
diff --git a/weed/messaging/broker/broker_append.go b/weed/messaging/broker/broker_append.go index c1ef063fb..7194dfcfc 100644 --- a/weed/messaging/broker/broker_append.go +++ b/weed/messaging/broker/broker_append.go @@ -16,10 +16,6 @@ import ( func (broker *MessageBroker) appendToFile(targetFile string, topicConfig *messaging_pb.TopicConfiguration, data []byte) error { - if topicConfig.IsTransient { - return nil - } - assignResult, uploadResult, err2 := broker.assignAndUpload(topicConfig, data) if err2 != nil { return err2 diff --git a/weed/messaging/broker/broker_grpc_server_publish.go b/weed/messaging/broker/broker_grpc_server_publish.go index 210127be3..985f708b5 100644 --- a/weed/messaging/broker/broker_grpc_server_publish.go +++ b/weed/messaging/broker/broker_grpc_server_publish.go @@ -2,7 +2,6 @@ package broker import ( "io" - "time" "github.com/golang/protobuf/proto" @@ -77,16 +76,9 @@ func (broker *MessageBroker) Publish(stream messaging_pb.SeaweedMessaging_Publis continue } - m := &messaging_pb.Message{ - Timestamp: time.Now().UnixNano(), - Key: in.Data.Key, - Value: in.Data.Value, - Headers: in.Data.Headers, - } - - // fmt.Printf("received: %d : %s\n", len(m.Value), string(m.Value)) + // fmt.Printf("received: %d : %s\n", len(in.Data.Value), string(in.Data.Value)) - data, err := proto.Marshal(m) + data, err := proto.Marshal(in.Data) if err != nil { glog.Errorf("marshall error: %v\n", err) continue diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go index acf0330c6..c358eccf6 100644 --- a/weed/messaging/broker/broker_grpc_server_subscribe.go +++ b/weed/messaging/broker/broker_grpc_server_subscribe.go @@ -1,6 +1,7 @@ package broker import ( + "fmt" "io" "time" @@ -9,7 +10,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" - "github.com/chrislusf/seaweedfs/weed/util" ) func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_SubscribeServer) error { @@ -23,12 +23,22 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs return err } + var messageCount int64 subscriberId := in.Init.SubscriberId - println("+ subscriber:", subscriberId) - defer println("- subscriber:", subscriberId) + fmt.Printf("+ subscriber %s\n", subscriberId) + defer func() { + fmt.Printf("- subscriber %s: %d messages\n", subscriberId, messageCount) + }() // TODO look it up topicConfig := &messaging_pb.TopicConfiguration{ + IsTransient: true, + } + + if err = stream.Send(&messaging_pb.BrokerMessage{ + Redirect: nil, + }); err != nil { + return err } // get lock @@ -52,7 +62,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs // an error returned will end the subscription eachMessageFn := func(m *messaging_pb.Message) error { err := stream.Send(&messaging_pb.BrokerMessage{ - Data: m, + Data: m, }) if err != nil { glog.V(0).Infof("=> subscriber %v: %+v", subscriberId, err) @@ -60,42 +70,25 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs return err } - // loop through all messages - for { - - _, buf := lock.logBuffer.ReadFromBuffer(lastReadTime) - - for pos := 0; pos+4 < len(buf); { - - size := util.BytesToUint32(buf[pos : pos+4]) - entryData := buf[pos+4 : pos+4+int(size)] - - logEntry := &filer_pb.LogEntry{} - if err = proto.Unmarshal(entryData, logEntry); err != nil { - glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err) - pos += 4 + int(size) - continue - } - - m := &messaging_pb.Message{} - if err = proto.Unmarshal(logEntry.Data, m); err != nil { - glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err) - pos += 4 + int(size) - continue - } - - // fmt.Printf("sending : %d : %s\n", len(m.Value), string(m.Value)) - if err = eachMessageFn(m); err != nil { - return err - } - - lastReadTime = time.Unix(0, m.Timestamp) - pos += 4 + int(size) - } - + messageCount, err = lock.logBuffer.LoopProcessLogData(lastReadTime, func() bool { lock.Mutex.Lock() lock.cond.Wait() lock.Mutex.Unlock() - } + return true + }, func(logEntry *filer_pb.LogEntry) error { + m := &messaging_pb.Message{} + if err = proto.Unmarshal(logEntry.Data, m); err != nil { + glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err) + return err + } + // fmt.Printf("sending : %d bytes\n", len(m.Value)) + if err = eachMessageFn(m); err != nil { + glog.Errorf("sending %d bytes to %s: %s", len(m.Value), subscriberId, err) + return err + } + return nil + }) + + return err } diff --git a/weed/messaging/broker/topic_lock.go b/weed/messaging/broker/topic_lock.go index 652ff0545..74861a147 100644 --- a/weed/messaging/broker/topic_lock.go +++ b/weed/messaging/broker/topic_lock.go @@ -41,6 +41,10 @@ func (locks *TopicLocks) buildLogBuffer(tl *TopicLock, tp TopicPartition, topicC flushFn := func(startTime, stopTime time.Time, buf []byte) { + if topicConfig.IsTransient { + return + } + targetFile := fmt.Sprintf( "%s/%s/%s/%04d-%02d-%02d/%02d-%02d.part%02d", filer2.TopicsDir, tp.Namespace, tp.Topic, diff --git a/weed/messaging/client/publisher.go b/weed/messaging/client/publisher.go index 238b67783..3480ff55d 100644 --- a/weed/messaging/client/publisher.go +++ b/weed/messaging/client/publisher.go @@ -61,7 +61,7 @@ func (mc *MessagingClient) NewPublisher(namespace, topic string) (*Publisher, er }, nil } -func (p *Publisher) Publish(m *messaging_pb.RawData) error { +func (p *Publisher) Publish(m *messaging_pb.Message) error { return p.publishClient.Send(&messaging_pb.PublishRequest{ Data: m, |
