diff options
Diffstat (limited to 'weed/messaging/broker/broker_grpc_server_subscribe.go')
| -rw-r--r-- | weed/messaging/broker/broker_grpc_server_subscribe.go | 69 |
1 files changed, 31 insertions, 38 deletions
diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go index acf0330c6..c358eccf6 100644 --- a/weed/messaging/broker/broker_grpc_server_subscribe.go +++ b/weed/messaging/broker/broker_grpc_server_subscribe.go @@ -1,6 +1,7 @@ package broker import ( + "fmt" "io" "time" @@ -9,7 +10,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/messaging_pb" - "github.com/chrislusf/seaweedfs/weed/util" ) func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_SubscribeServer) error { @@ -23,12 +23,22 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs return err } + var messageCount int64 subscriberId := in.Init.SubscriberId - println("+ subscriber:", subscriberId) - defer println("- subscriber:", subscriberId) + fmt.Printf("+ subscriber %s\n", subscriberId) + defer func() { + fmt.Printf("- subscriber %s: %d messages\n", subscriberId, messageCount) + }() // TODO look it up topicConfig := &messaging_pb.TopicConfiguration{ + IsTransient: true, + } + + if err = stream.Send(&messaging_pb.BrokerMessage{ + Redirect: nil, + }); err != nil { + return err } // get lock @@ -52,7 +62,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs // an error returned will end the subscription eachMessageFn := func(m *messaging_pb.Message) error { err := stream.Send(&messaging_pb.BrokerMessage{ - Data: m, + Data: m, }) if err != nil { glog.V(0).Infof("=> subscriber %v: %+v", subscriberId, err) @@ -60,42 +70,25 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs return err } - // loop through all messages - for { - - _, buf := lock.logBuffer.ReadFromBuffer(lastReadTime) - - for pos := 0; pos+4 < len(buf); { - - size := util.BytesToUint32(buf[pos : pos+4]) - entryData := buf[pos+4 : pos+4+int(size)] - - logEntry := &filer_pb.LogEntry{} - if err = proto.Unmarshal(entryData, logEntry); err != nil { - glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err) - pos += 4 + int(size) - continue - } - - m := &messaging_pb.Message{} - if err = proto.Unmarshal(logEntry.Data, m); err != nil { - glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err) - pos += 4 + int(size) - continue - } - - // fmt.Printf("sending : %d : %s\n", len(m.Value), string(m.Value)) - if err = eachMessageFn(m); err != nil { - return err - } - - lastReadTime = time.Unix(0, m.Timestamp) - pos += 4 + int(size) - } - + messageCount, err = lock.logBuffer.LoopProcessLogData(lastReadTime, func() bool { lock.Mutex.Lock() lock.cond.Wait() lock.Mutex.Unlock() - } + return true + }, func(logEntry *filer_pb.LogEntry) error { + m := &messaging_pb.Message{} + if err = proto.Unmarshal(logEntry.Data, m); err != nil { + glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err) + return err + } + // fmt.Printf("sending : %d bytes\n", len(m.Value)) + if err = eachMessageFn(m); err != nil { + glog.Errorf("sending %d bytes to %s: %s", len(m.Value), subscriberId, err) + return err + } + return nil + }) + + return err } |
