diff options
Diffstat (limited to 'weed/mq/agent/agent_grpc_subscribe.go')
| -rw-r--r-- | weed/mq/agent/agent_grpc_subscribe.go | 75 |
1 files changed, 75 insertions, 0 deletions
diff --git a/weed/mq/agent/agent_grpc_subscribe.go b/weed/mq/agent/agent_grpc_subscribe.go new file mode 100644 index 000000000..feb5bd47c --- /dev/null +++ b/weed/mq/agent/agent_grpc_subscribe.go @@ -0,0 +1,75 @@ +package agent + +import ( + "fmt" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/mq/client/sub_client" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_agent_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" + "google.golang.org/protobuf/proto" + "time" +) + +func (a *MessageQueueAgent) SubscribeRecordRequest(stream mq_agent_pb.SeaweedMessagingAgent_SubscribeRecordServer) error { + // the first message is the subscribe request + // it should only contain the session id + m, err := stream.Recv() + if err != nil { + return err + } + a.subscribersLock.RLock() + subscriberEntry, found := a.subscribers[SessionId(m.SessionId)] + a.subscribersLock.RUnlock() + if !found { + return fmt.Errorf("subscribe session id %d not found", m.SessionId) + } + defer func() { + subscriberEntry.lastActiveTsNs = time.Now().UnixNano() + }() + subscriberEntry.lastActiveTsNs = 0 + + var lastErr error + subscriberEntry.entry.SetOnDataMessageFn(func(m *mq_pb.SubscribeMessageResponse_Data) { + record := &schema_pb.RecordValue{} + err := proto.Unmarshal(m.Data.Value, record) + if err != nil { + if lastErr == nil { + lastErr = err + } + return + } + if sendErr := stream.Send(&mq_agent_pb.SubscribeRecordResponse{ + Key: m.Data.Key, + Value: record, + TsNs: m.Data.TsNs, + }); sendErr != nil { + if lastErr == nil { + lastErr = sendErr + } + } + }) + + go func() { + subErr := subscriberEntry.entry.Subscribe() + if subErr != nil { + glog.V(0).Infof("subscriber %d subscribe: %v", m.SessionId, subErr) + if lastErr == nil { + lastErr = subErr + } + } + }() + + for { + m, err := stream.Recv() + if err != nil { + return err + } + if m != nil { + subscriberEntry.entry.PartitionOffsetChan <- sub_client.KeyedOffset{ + Key: m.AckKey, + Offset: m.AckSequence, + } + } + } +} |
