diff options
| author | chrislu <chris.lu@gmail.com> | 2022-07-16 10:49:34 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2022-07-16 10:49:34 -0700 |
| commit | 113a4546fd36b9a0eb78c4e462f38f7488cf1e88 (patch) | |
| tree | 174077b2572802518dd2b1faa7f326d0e4dbffea /weed/mq | |
| parent | da8bab2b33f209a1b0c339e6250aa940d5c8572d (diff) | |
| download | seaweedfs-113a4546fd36b9a0eb78c4e462f38f7488cf1e88.tar.xz seaweedfs-113a4546fd36b9a0eb78c4e462f38f7488cf1e88.zip | |
segment serde
Diffstat (limited to 'weed/mq')
| -rw-r--r-- | weed/mq/broker/brokder_grpc_admin.go | 31 | ||||
| -rw-r--r-- | weed/mq/broker/broker_segment_serde.go | 89 | ||||
| -rw-r--r-- | weed/mq/broker/broker_server.go | 10 | ||||
| -rw-r--r-- | weed/mq/topic.go | 32 |
4 files changed, 139 insertions, 23 deletions
diff --git a/weed/mq/broker/brokder_grpc_admin.go b/weed/mq/broker/brokder_grpc_admin.go index 31b5bb84e..0dfb69c50 100644 --- a/weed/mq/broker/brokder_grpc_admin.go +++ b/weed/mq/broker/brokder_grpc_admin.go @@ -53,16 +53,19 @@ func (broker *MessageQueueBroker) AssignSegmentBrokers(c context.Context, reques if err != nil { return ret, err } - // good if the segment is still on the brokers - isActive, err := broker.checkSegmentsOnBrokers(segment, existingBrokers) - if err != nil { - return ret, err - } - if isActive { - for _, broker := range existingBrokers { - ret.Brokers = append(ret.Brokers, string(broker)) + + if len(existingBrokers) > 0 { + // good if the segment is still on the brokers + isActive, err := broker.checkSegmentsOnBrokers(segment, existingBrokers) + if err != nil { + return ret, err + } + if isActive { + for _, broker := range existingBrokers { + ret.Brokers = append(ret.Brokers, string(broker)) + } + return ret, nil } - return ret, nil } // randomly pick up to 10 brokers, and find the ones with the lightest load @@ -72,7 +75,7 @@ func (broker *MessageQueueBroker) AssignSegmentBrokers(c context.Context, reques } // save the allocated brokers info for this segment on the filer - if err := broker.saveSegmentOnFiler(segment, selectedBrokers); err != nil { + if err := broker.saveSegmentBrokersOnFiler(segment, selectedBrokers); err != nil { return ret, err } @@ -82,10 +85,6 @@ func (broker *MessageQueueBroker) AssignSegmentBrokers(c context.Context, reques return ret, nil } -func (broker *MessageQueueBroker) checkSegmentOnFiler(segment *mq.Segment) (brokers []pb.ServerAddress, err error) { - return -} - func (broker *MessageQueueBroker) checkSegmentsOnBrokers(segment *mq.Segment, brokers []pb.ServerAddress) (active bool, err error) { var wg sync.WaitGroup @@ -206,7 +205,3 @@ func (broker *MessageQueueBroker) checkBrokerStatus(candidates []pb.ServerAddres wg.Wait() return } - -func (broker *MessageQueueBroker) saveSegmentOnFiler(segment *mq.Segment, brokers []pb.ServerAddress) (err error) { - return -} diff --git a/weed/mq/broker/broker_segment_serde.go b/weed/mq/broker/broker_segment_serde.go new file mode 100644 index 000000000..25fca005b --- /dev/null +++ b/weed/mq/broker/broker_segment_serde.go @@ -0,0 +1,89 @@ +package broker + +import ( + "bytes" + "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/mq" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/mq_pb" + "github.com/golang/protobuf/jsonpb" + "time" +) + +func (broker *MessageQueueBroker) checkSegmentOnFiler(segment *mq.Segment) (brokers []pb.ServerAddress, err error) { + info, found, err := broker.readSegmentOnFiler(segment) + if err != nil { + return + } + if !found { + return + } + for _, b := range info.Brokers { + brokers = append(brokers, pb.ServerAddress(b)) + } + + return +} + +func (broker *MessageQueueBroker) saveSegmentBrokersOnFiler(segment *mq.Segment, brokers []pb.ServerAddress) (err error) { + var nodes []string + for _, b := range brokers { + nodes = append(nodes, string(b)) + } + broker.saveSegmentToFiler(segment, &mq_pb.SegmentInfo{ + Segment: segment.ToPbSegment(), + StartTsNs: time.Now().UnixNano(), + Brokers: nodes, + StopTsNs: 0, + PreviousSegments: nil, + NextSegments: nil, + }) + return +} + +func (broker *MessageQueueBroker) readSegmentOnFiler(segment *mq.Segment) (info *mq_pb.SegmentInfo, found bool, err error) { + dir, name := segment.DirAndName() + + found, err = filer_pb.Exists(broker, dir, name, false) + if !found || err != nil { + return + } + + err = pb.WithFilerClient(false, broker.GetFiler(), broker.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + // read filer conf first + data, err := filer.ReadInsideFiler(client, dir, name) + if err != nil { + return fmt.Errorf("ReadEntry: %v", err) + } + + // parse into filer conf object + info = &mq_pb.SegmentInfo{} + if err = jsonpb.Unmarshal(bytes.NewReader(data), info); err != nil { + return err + } + found = true + return nil + }) + + return +} + +func (broker *MessageQueueBroker) saveSegmentToFiler(segment *mq.Segment, info *mq_pb.SegmentInfo) (err error) { + dir, name := segment.DirAndName() + + var buf bytes.Buffer + filer.ProtoToText(&buf, info) + + err = pb.WithFilerClient(false, broker.GetFiler(), broker.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + // read filer conf first + err := filer.SaveInsideFiler(client, dir, name, buf.Bytes()) + if err != nil { + return fmt.Errorf("save segment info: %v", err) + } + return nil + }) + + return +} diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go index f940b00c3..b95db0ecf 100644 --- a/weed/mq/broker/broker_server.go +++ b/weed/mq/broker/broker_server.go @@ -80,9 +80,15 @@ func (broker *MessageQueueBroker) GetFiler() pb.ServerAddress { return broker.currentFiler } -func (broker *MessageQueueBroker) withFilerClient(streamingMode bool, filer pb.ServerAddress, fn func(filer_pb.SeaweedFilerClient) error) error { +func (broker *MessageQueueBroker) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { - return pb.WithFilerClient(streamingMode, filer, broker.grpcDialOption, fn) + return pb.WithFilerClient(streamingMode, broker.GetFiler(), broker.grpcDialOption, fn) + +} + +func (broker *MessageQueueBroker) AdjustedUrl(location *filer_pb.Location) string { + + return location.Url } diff --git a/weed/mq/topic.go b/weed/mq/topic.go index 87621fca7..12a749133 100644 --- a/weed/mq/topic.go +++ b/weed/mq/topic.go @@ -1,6 +1,8 @@ package mq import ( + "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/pb/mq_pb" "time" ) @@ -13,9 +15,9 @@ type Topic struct { } type Partition struct { - RangeStart int - RangeStop int // exclusive - RingSize int + RangeStart int32 + RangeStop int32 // exclusive + RingSize int32 } type Segment struct { @@ -32,5 +34,29 @@ func FromPbSegment(segment *mq_pb.Segment) *Segment { Name: segment.Topic, }, Id: segment.Id, + Partition: Partition{ + RangeStart: segment.Partition.RangeStart, + RangeStop: segment.Partition.RangeStop, + RingSize: segment.Partition.RingSize, + }, } } + +func (segment *Segment) ToPbSegment() *mq_pb.Segment { + return &mq_pb.Segment{ + Namespace: string(segment.Topic.Namespace), + Topic: segment.Topic.Name, + Id: segment.Id, + Partition: &mq_pb.Partition{ + RingSize: segment.Partition.RingSize, + RangeStart: segment.Partition.RangeStart, + RangeStop: segment.Partition.RangeStop, + }, + } +} + +func (segment *Segment) DirAndName() (dir string, name string) { + dir = fmt.Sprintf("%s/%s/%s", filer.TopicsDir, segment.Topic.Namespace, segment.Topic.Name) + name = fmt.Sprintf("%4d.segment", segment.Id) + return +} |
