diff options
Diffstat (limited to 'weed/mq/broker/broker_grpc_sub_follow.go')
| -rw-r--r-- | weed/mq/broker/broker_grpc_sub_follow.go | 37 |
1 files changed, 7 insertions, 30 deletions
diff --git a/weed/mq/broker/broker_grpc_sub_follow.go b/weed/mq/broker/broker_grpc_sub_follow.go index bed906c30..0a74274d7 100644 --- a/weed/mq/broker/broker_grpc_sub_follow.go +++ b/weed/mq/broker/broker_grpc_sub_follow.go @@ -2,13 +2,11 @@ package broker import ( "fmt" - "github.com/seaweedfs/seaweedfs/weed/filer" + "io" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/topic" - "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" - "github.com/seaweedfs/seaweedfs/weed/util" - "io" ) func (b *MessageQueueBroker) SubscribeFollowMe(stream mq_pb.SeaweedMessaging_SubscribeFollowMeServer) (err error) { @@ -64,33 +62,12 @@ func (b *MessageQueueBroker) SubscribeFollowMe(stream mq_pb.SeaweedMessaging_Sub func (b *MessageQueueBroker) readConsumerGroupOffset(initMessage *mq_pb.SubscribeMessageRequest_InitMessage) (offset int64, err error) { t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.PartitionOffset.Partition) - partitionDir := topic.PartitionDir(t, p) - offsetFileName := fmt.Sprintf("%s.offset", initMessage.ConsumerGroup) - - err = b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - data, err := filer.ReadInsideFiler(client, partitionDir, offsetFileName) - if err != nil { - return err - } - if len(data) != 8 { - return fmt.Errorf("no offset found") - } - offset = int64(util.BytesToUint64(data)) - return nil - }) - return offset, err + // Use the offset manager's consumer group storage + return b.offsetManager.LoadConsumerGroupOffset(t, p, initMessage.ConsumerGroup) } func (b *MessageQueueBroker) saveConsumerGroupOffset(t topic.Topic, p topic.Partition, consumerGroup string, offset int64) error { - - partitionDir := topic.PartitionDir(t, p) - offsetFileName := fmt.Sprintf("%s.offset", consumerGroup) - - offsetBytes := make([]byte, 8) - util.Uint64toBytes(offsetBytes, uint64(offset)) - - return b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - glog.V(0).Infof("saving topic %s partition %v consumer group %s offset %d", t, p, consumerGroup, offset) - return filer.SaveInsideFiler(client, partitionDir, offsetFileName, offsetBytes) - }) + // Use the offset manager's consumer group storage + glog.V(0).Infof("saving topic %s partition %v consumer group %s offset %d", t, p, consumerGroup, offset) + return b.offsetManager.SaveConsumerGroupOffset(t, p, consumerGroup, offset) } |
