aboutsummaryrefslogtreecommitdiff
path: root/weed/mq
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-07-16 10:49:34 -0700
committerchrislu <chris.lu@gmail.com>2022-07-16 10:49:34 -0700
commit113a4546fd36b9a0eb78c4e462f38f7488cf1e88 (patch)
tree174077b2572802518dd2b1faa7f326d0e4dbffea /weed/mq
parentda8bab2b33f209a1b0c339e6250aa940d5c8572d (diff)
downloadseaweedfs-113a4546fd36b9a0eb78c4e462f38f7488cf1e88.tar.xz
seaweedfs-113a4546fd36b9a0eb78c4e462f38f7488cf1e88.zip
segment serde
Diffstat (limited to 'weed/mq')
-rw-r--r--weed/mq/broker/brokder_grpc_admin.go31
-rw-r--r--weed/mq/broker/broker_segment_serde.go89
-rw-r--r--weed/mq/broker/broker_server.go10
-rw-r--r--weed/mq/topic.go32
4 files changed, 139 insertions, 23 deletions
diff --git a/weed/mq/broker/brokder_grpc_admin.go b/weed/mq/broker/brokder_grpc_admin.go
index 31b5bb84e..0dfb69c50 100644
--- a/weed/mq/broker/brokder_grpc_admin.go
+++ b/weed/mq/broker/brokder_grpc_admin.go
@@ -53,16 +53,19 @@ func (broker *MessageQueueBroker) AssignSegmentBrokers(c context.Context, reques
if err != nil {
return ret, err
}
- // good if the segment is still on the brokers
- isActive, err := broker.checkSegmentsOnBrokers(segment, existingBrokers)
- if err != nil {
- return ret, err
- }
- if isActive {
- for _, broker := range existingBrokers {
- ret.Brokers = append(ret.Brokers, string(broker))
+
+ if len(existingBrokers) > 0 {
+ // good if the segment is still on the brokers
+ isActive, err := broker.checkSegmentsOnBrokers(segment, existingBrokers)
+ if err != nil {
+ return ret, err
+ }
+ if isActive {
+ for _, broker := range existingBrokers {
+ ret.Brokers = append(ret.Brokers, string(broker))
+ }
+ return ret, nil
}
- return ret, nil
}
// randomly pick up to 10 brokers, and find the ones with the lightest load
@@ -72,7 +75,7 @@ func (broker *MessageQueueBroker) AssignSegmentBrokers(c context.Context, reques
}
// save the allocated brokers info for this segment on the filer
- if err := broker.saveSegmentOnFiler(segment, selectedBrokers); err != nil {
+ if err := broker.saveSegmentBrokersOnFiler(segment, selectedBrokers); err != nil {
return ret, err
}
@@ -82,10 +85,6 @@ func (broker *MessageQueueBroker) AssignSegmentBrokers(c context.Context, reques
return ret, nil
}
-func (broker *MessageQueueBroker) checkSegmentOnFiler(segment *mq.Segment) (brokers []pb.ServerAddress, err error) {
- return
-}
-
func (broker *MessageQueueBroker) checkSegmentsOnBrokers(segment *mq.Segment, brokers []pb.ServerAddress) (active bool, err error) {
var wg sync.WaitGroup
@@ -206,7 +205,3 @@ func (broker *MessageQueueBroker) checkBrokerStatus(candidates []pb.ServerAddres
wg.Wait()
return
}
-
-func (broker *MessageQueueBroker) saveSegmentOnFiler(segment *mq.Segment, brokers []pb.ServerAddress) (err error) {
- return
-}
diff --git a/weed/mq/broker/broker_segment_serde.go b/weed/mq/broker/broker_segment_serde.go
new file mode 100644
index 000000000..25fca005b
--- /dev/null
+++ b/weed/mq/broker/broker_segment_serde.go
@@ -0,0 +1,89 @@
+package broker
+
+import (
+ "bytes"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/mq"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
+ "github.com/golang/protobuf/jsonpb"
+ "time"
+)
+
+func (broker *MessageQueueBroker) checkSegmentOnFiler(segment *mq.Segment) (brokers []pb.ServerAddress, err error) {
+ info, found, err := broker.readSegmentOnFiler(segment)
+ if err != nil {
+ return
+ }
+ if !found {
+ return
+ }
+ for _, b := range info.Brokers {
+ brokers = append(brokers, pb.ServerAddress(b))
+ }
+
+ return
+}
+
+func (broker *MessageQueueBroker) saveSegmentBrokersOnFiler(segment *mq.Segment, brokers []pb.ServerAddress) (err error) {
+ var nodes []string
+ for _, b := range brokers {
+ nodes = append(nodes, string(b))
+ }
+ broker.saveSegmentToFiler(segment, &mq_pb.SegmentInfo{
+ Segment: segment.ToPbSegment(),
+ StartTsNs: time.Now().UnixNano(),
+ Brokers: nodes,
+ StopTsNs: 0,
+ PreviousSegments: nil,
+ NextSegments: nil,
+ })
+ return
+}
+
+func (broker *MessageQueueBroker) readSegmentOnFiler(segment *mq.Segment) (info *mq_pb.SegmentInfo, found bool, err error) {
+ dir, name := segment.DirAndName()
+
+ found, err = filer_pb.Exists(broker, dir, name, false)
+ if !found || err != nil {
+ return
+ }
+
+ err = pb.WithFilerClient(false, broker.GetFiler(), broker.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ // read filer conf first
+ data, err := filer.ReadInsideFiler(client, dir, name)
+ if err != nil {
+ return fmt.Errorf("ReadEntry: %v", err)
+ }
+
+ // parse into filer conf object
+ info = &mq_pb.SegmentInfo{}
+ if err = jsonpb.Unmarshal(bytes.NewReader(data), info); err != nil {
+ return err
+ }
+ found = true
+ return nil
+ })
+
+ return
+}
+
+func (broker *MessageQueueBroker) saveSegmentToFiler(segment *mq.Segment, info *mq_pb.SegmentInfo) (err error) {
+ dir, name := segment.DirAndName()
+
+ var buf bytes.Buffer
+ filer.ProtoToText(&buf, info)
+
+ err = pb.WithFilerClient(false, broker.GetFiler(), broker.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ // read filer conf first
+ err := filer.SaveInsideFiler(client, dir, name, buf.Bytes())
+ if err != nil {
+ return fmt.Errorf("save segment info: %v", err)
+ }
+ return nil
+ })
+
+ return
+}
diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go
index f940b00c3..b95db0ecf 100644
--- a/weed/mq/broker/broker_server.go
+++ b/weed/mq/broker/broker_server.go
@@ -80,9 +80,15 @@ func (broker *MessageQueueBroker) GetFiler() pb.ServerAddress {
return broker.currentFiler
}
-func (broker *MessageQueueBroker) withFilerClient(streamingMode bool, filer pb.ServerAddress, fn func(filer_pb.SeaweedFilerClient) error) error {
+func (broker *MessageQueueBroker) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
- return pb.WithFilerClient(streamingMode, filer, broker.grpcDialOption, fn)
+ return pb.WithFilerClient(streamingMode, broker.GetFiler(), broker.grpcDialOption, fn)
+
+}
+
+func (broker *MessageQueueBroker) AdjustedUrl(location *filer_pb.Location) string {
+
+ return location.Url
}
diff --git a/weed/mq/topic.go b/weed/mq/topic.go
index 87621fca7..12a749133 100644
--- a/weed/mq/topic.go
+++ b/weed/mq/topic.go
@@ -1,6 +1,8 @@
package mq
import (
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
"time"
)
@@ -13,9 +15,9 @@ type Topic struct {
}
type Partition struct {
- RangeStart int
- RangeStop int // exclusive
- RingSize int
+ RangeStart int32
+ RangeStop int32 // exclusive
+ RingSize int32
}
type Segment struct {
@@ -32,5 +34,29 @@ func FromPbSegment(segment *mq_pb.Segment) *Segment {
Name: segment.Topic,
},
Id: segment.Id,
+ Partition: Partition{
+ RangeStart: segment.Partition.RangeStart,
+ RangeStop: segment.Partition.RangeStop,
+ RingSize: segment.Partition.RingSize,
+ },
}
}
+
+func (segment *Segment) ToPbSegment() *mq_pb.Segment {
+ return &mq_pb.Segment{
+ Namespace: string(segment.Topic.Namespace),
+ Topic: segment.Topic.Name,
+ Id: segment.Id,
+ Partition: &mq_pb.Partition{
+ RingSize: segment.Partition.RingSize,
+ RangeStart: segment.Partition.RangeStart,
+ RangeStop: segment.Partition.RangeStop,
+ },
+ }
+}
+
+func (segment *Segment) DirAndName() (dir string, name string) {
+ dir = fmt.Sprintf("%s/%s/%s", filer.TopicsDir, segment.Topic.Namespace, segment.Topic.Name)
+ name = fmt.Sprintf("%4d.segment", segment.Id)
+ return
+}