aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/broker/broker_segment_serde.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/broker/broker_segment_serde.go')
-rw-r--r--weed/mq/broker/broker_segment_serde.go14
1 files changed, 7 insertions, 7 deletions
diff --git a/weed/mq/broker/broker_segment_serde.go b/weed/mq/broker/broker_segment_serde.go
index e36867da0..bb9aecc0b 100644
--- a/weed/mq/broker/broker_segment_serde.go
+++ b/weed/mq/broker/broker_segment_serde.go
@@ -4,7 +4,7 @@ import (
"bytes"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/filer"
- "github.com/seaweedfs/seaweedfs/weed/mq"
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
@@ -12,8 +12,8 @@ import (
"time"
)
-func (broker *MessageQueueBroker) checkSegmentOnFiler(segment *mq.Segment) (brokers []pb.ServerAddress, err error) {
- info, found, err := broker.readSegmentOnFiler(segment)
+func (broker *MessageQueueBroker) checkSegmentOnFiler(segment *topic.Segment) (brokers []pb.ServerAddress, err error) {
+ info, found, err := broker.readSegmentInfoOnFiler(segment)
if err != nil {
return
}
@@ -27,12 +27,12 @@ func (broker *MessageQueueBroker) checkSegmentOnFiler(segment *mq.Segment) (brok
return
}
-func (broker *MessageQueueBroker) saveSegmentBrokersOnFiler(segment *mq.Segment, brokers []pb.ServerAddress) (err error) {
+func (broker *MessageQueueBroker) saveSegmentBrokersOnFiler(segment *topic.Segment, brokers []pb.ServerAddress) (err error) {
var nodes []string
for _, b := range brokers {
nodes = append(nodes, string(b))
}
- broker.saveSegmentToFiler(segment, &mq_pb.SegmentInfo{
+ broker.saveSegmentInfoToFiler(segment, &mq_pb.SegmentInfo{
Segment: segment.ToPbSegment(),
StartTsNs: time.Now().UnixNano(),
Brokers: nodes,
@@ -43,7 +43,7 @@ func (broker *MessageQueueBroker) saveSegmentBrokersOnFiler(segment *mq.Segment,
return
}
-func (broker *MessageQueueBroker) readSegmentOnFiler(segment *mq.Segment) (info *mq_pb.SegmentInfo, found bool, err error) {
+func (broker *MessageQueueBroker) readSegmentInfoOnFiler(segment *topic.Segment) (info *mq_pb.SegmentInfo, found bool, err error) {
dir, name := segment.DirAndName()
found, err = filer_pb.Exists(broker, dir, name, false)
@@ -70,7 +70,7 @@ func (broker *MessageQueueBroker) readSegmentOnFiler(segment *mq.Segment) (info
return
}
-func (broker *MessageQueueBroker) saveSegmentToFiler(segment *mq.Segment, info *mq_pb.SegmentInfo) (err error) {
+func (broker *MessageQueueBroker) saveSegmentInfoToFiler(segment *topic.Segment, info *mq_pb.SegmentInfo) (err error) {
dir, name := segment.DirAndName()
var buf bytes.Buffer