aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/broker/broker_grpc_sub_follow.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/broker/broker_grpc_sub_follow.go')
-rw-r--r--weed/mq/broker/broker_grpc_sub_follow.go37
1 files changed, 7 insertions, 30 deletions
diff --git a/weed/mq/broker/broker_grpc_sub_follow.go b/weed/mq/broker/broker_grpc_sub_follow.go
index bed906c30..0a74274d7 100644
--- a/weed/mq/broker/broker_grpc_sub_follow.go
+++ b/weed/mq/broker/broker_grpc_sub_follow.go
@@ -2,13 +2,11 @@ package broker
import (
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/filer"
+ "io"
+
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
- "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
- "github.com/seaweedfs/seaweedfs/weed/util"
- "io"
)
func (b *MessageQueueBroker) SubscribeFollowMe(stream mq_pb.SeaweedMessaging_SubscribeFollowMeServer) (err error) {
@@ -64,33 +62,12 @@ func (b *MessageQueueBroker) SubscribeFollowMe(stream mq_pb.SeaweedMessaging_Sub
func (b *MessageQueueBroker) readConsumerGroupOffset(initMessage *mq_pb.SubscribeMessageRequest_InitMessage) (offset int64, err error) {
t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.PartitionOffset.Partition)
- partitionDir := topic.PartitionDir(t, p)
- offsetFileName := fmt.Sprintf("%s.offset", initMessage.ConsumerGroup)
-
- err = b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
- data, err := filer.ReadInsideFiler(client, partitionDir, offsetFileName)
- if err != nil {
- return err
- }
- if len(data) != 8 {
- return fmt.Errorf("no offset found")
- }
- offset = int64(util.BytesToUint64(data))
- return nil
- })
- return offset, err
+ // Use the offset manager's consumer group storage
+ return b.offsetManager.LoadConsumerGroupOffset(t, p, initMessage.ConsumerGroup)
}
func (b *MessageQueueBroker) saveConsumerGroupOffset(t topic.Topic, p topic.Partition, consumerGroup string, offset int64) error {
-
- partitionDir := topic.PartitionDir(t, p)
- offsetFileName := fmt.Sprintf("%s.offset", consumerGroup)
-
- offsetBytes := make([]byte, 8)
- util.Uint64toBytes(offsetBytes, uint64(offset))
-
- return b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
- glog.V(0).Infof("saving topic %s partition %v consumer group %s offset %d", t, p, consumerGroup, offset)
- return filer.SaveInsideFiler(client, partitionDir, offsetFileName, offsetBytes)
- })
+ // Use the offset manager's consumer group storage
+ glog.V(0).Infof("saving topic %s partition %v consumer group %s offset %d", t, p, consumerGroup, offset)
+ return b.offsetManager.SaveConsumerGroupOffset(t, p, consumerGroup, offset)
}