1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
|
package dash
import (
"context"
"fmt"
"io"
"path/filepath"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/cluster"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
)
// GetTopics retrieves message queue topics data
func (s *AdminServer) GetTopics() (*TopicsData, error) {
var topics []TopicInfo
// Find broker leader and get topics
brokerLeader, err := s.findBrokerLeader()
if err != nil {
// If no broker leader found, return empty data
return &TopicsData{
Topics: topics,
TotalTopics: len(topics),
LastUpdated: time.Now(),
}, nil
}
// Connect to broker leader and list topics
err = s.withBrokerClient(brokerLeader, func(client mq_pb.SeaweedMessagingClient) error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
resp, err := client.ListTopics(ctx, &mq_pb.ListTopicsRequest{})
if err != nil {
return err
}
// Convert protobuf topics to TopicInfo - only include available data
for _, pbTopic := range resp.Topics {
topicInfo := TopicInfo{
Name: fmt.Sprintf("%s.%s", pbTopic.Namespace, pbTopic.Name),
Partitions: 0, // Will be populated by LookupTopicBrokers call
Retention: TopicRetentionInfo{
Enabled: false,
DisplayValue: 0,
DisplayUnit: "days",
},
}
// Get topic configuration to get partition count and retention info
lookupResp, err := client.LookupTopicBrokers(ctx, &mq_pb.LookupTopicBrokersRequest{
Topic: pbTopic,
})
if err == nil {
topicInfo.Partitions = len(lookupResp.BrokerPartitionAssignments)
}
// Get topic configuration for retention information
configResp, err := client.GetTopicConfiguration(ctx, &mq_pb.GetTopicConfigurationRequest{
Topic: pbTopic,
})
if err == nil && configResp.Retention != nil {
topicInfo.Retention = convertTopicRetention(configResp.Retention)
}
topics = append(topics, topicInfo)
}
return nil
})
if err != nil {
// If connection fails, return empty data
return &TopicsData{
Topics: topics,
TotalTopics: len(topics),
LastUpdated: time.Now(),
}, nil
}
return &TopicsData{
Topics: topics,
TotalTopics: len(topics),
LastUpdated: time.Now(),
// Don't include TotalMessages and TotalSize as they're not available
}, nil
}
// GetSubscribers retrieves message queue subscribers data
func (s *AdminServer) GetSubscribers() (*SubscribersData, error) {
var subscribers []SubscriberInfo
// Find broker leader and get subscriber info from broker stats
brokerLeader, err := s.findBrokerLeader()
if err != nil {
// If no broker leader found, return empty data
return &SubscribersData{
Subscribers: subscribers,
TotalSubscribers: len(subscribers),
ActiveSubscribers: 0,
LastUpdated: time.Now(),
}, nil
}
// Connect to broker leader and get subscriber information
// Note: SeaweedMQ doesn't have a direct API to list all subscribers
// We would need to collect this information from broker statistics
// For now, return empty data structure as subscriber info is not
// directly available through the current MQ API
err = s.withBrokerClient(brokerLeader, func(client mq_pb.SeaweedMessagingClient) error {
// TODO: Implement subscriber data collection from broker statistics
// This would require access to broker internal statistics about
// active subscribers, consumer groups, etc.
return nil
})
if err != nil {
// If connection fails, return empty data
return &SubscribersData{
Subscribers: subscribers,
TotalSubscribers: len(subscribers),
ActiveSubscribers: 0,
LastUpdated: time.Now(),
}, nil
}
activeCount := 0
for _, sub := range subscribers {
if sub.Status == "active" {
activeCount++
}
}
return &SubscribersData{
Subscribers: subscribers,
TotalSubscribers: len(subscribers),
ActiveSubscribers: activeCount,
LastUpdated: time.Now(),
}, nil
}
// GetTopicDetails retrieves detailed information about a specific topic
func (s *AdminServer) GetTopicDetails(namespace, topicName string) (*TopicDetailsData, error) {
// Find broker leader
brokerLeader, err := s.findBrokerLeader()
if err != nil {
return nil, fmt.Errorf("failed to find broker leader: %w", err)
}
var topicDetails *TopicDetailsData
// Connect to broker leader and get topic configuration
err = s.withBrokerClient(brokerLeader, func(client mq_pb.SeaweedMessagingClient) error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// Get topic configuration using the new API
configResp, err := client.GetTopicConfiguration(ctx, &mq_pb.GetTopicConfigurationRequest{
Topic: &schema_pb.Topic{
Namespace: namespace,
Name: topicName,
},
})
if err != nil {
return fmt.Errorf("failed to get topic configuration: %w", err)
}
// Initialize topic details
topicDetails = &TopicDetailsData{
TopicName: fmt.Sprintf("%s.%s", namespace, topicName),
Namespace: namespace,
Name: topicName,
Partitions: []PartitionInfo{},
Schema: []SchemaFieldInfo{},
Publishers: []PublisherInfo{},
Subscribers: []TopicSubscriberInfo{},
ConsumerGroupOffsets: []ConsumerGroupOffsetInfo{},
Retention: convertTopicRetention(configResp.Retention),
CreatedAt: time.Unix(0, configResp.CreatedAtNs),
LastUpdated: time.Unix(0, configResp.LastUpdatedNs),
}
// Set current time if timestamps are not available
if configResp.CreatedAtNs == 0 {
topicDetails.CreatedAt = time.Now()
}
if configResp.LastUpdatedNs == 0 {
topicDetails.LastUpdated = time.Now()
}
// Process partitions
for _, assignment := range configResp.BrokerPartitionAssignments {
if assignment.Partition != nil {
partitionInfo := PartitionInfo{
ID: assignment.Partition.RangeStart,
LeaderBroker: assignment.LeaderBroker,
FollowerBroker: assignment.FollowerBroker,
MessageCount: 0, // Will be enhanced later with actual stats
TotalSize: 0, // Will be enhanced later with actual stats
LastDataTime: time.Time{}, // Will be enhanced later
CreatedAt: time.Now(),
}
topicDetails.Partitions = append(topicDetails.Partitions, partitionInfo)
}
}
// Process schema from RecordType
if configResp.RecordType != nil {
topicDetails.Schema = convertRecordTypeToSchemaFields(configResp.RecordType)
}
// Get publishers information
publishersResp, err := client.GetTopicPublishers(ctx, &mq_pb.GetTopicPublishersRequest{
Topic: &schema_pb.Topic{
Namespace: namespace,
Name: topicName,
},
})
if err != nil {
// Log error but don't fail the entire request
glog.V(0).Infof("failed to get topic publishers for %s.%s: %v", namespace, topicName, err)
} else {
glog.V(1).Infof("got %d publishers for topic %s.%s", len(publishersResp.Publishers), namespace, topicName)
topicDetails.Publishers = convertTopicPublishers(publishersResp.Publishers)
}
// Get subscribers information
subscribersResp, err := client.GetTopicSubscribers(ctx, &mq_pb.GetTopicSubscribersRequest{
Topic: &schema_pb.Topic{
Namespace: namespace,
Name: topicName,
},
})
if err != nil {
// Log error but don't fail the entire request
glog.V(0).Infof("failed to get topic subscribers for %s.%s: %v", namespace, topicName, err)
} else {
glog.V(1).Infof("got %d subscribers for topic %s.%s", len(subscribersResp.Subscribers), namespace, topicName)
topicDetails.Subscribers = convertTopicSubscribers(subscribersResp.Subscribers)
}
return nil
})
if err != nil {
return nil, err
}
// Get consumer group offsets from the filer
offsets, err := s.GetConsumerGroupOffsets(namespace, topicName)
if err != nil {
// Log error but don't fail the entire request
glog.V(0).Infof("failed to get consumer group offsets for %s.%s: %v", namespace, topicName, err)
} else {
glog.V(1).Infof("got %d consumer group offsets for topic %s.%s", len(offsets), namespace, topicName)
topicDetails.ConsumerGroupOffsets = offsets
}
return topicDetails, nil
}
// GetConsumerGroupOffsets retrieves consumer group offsets for a topic from the filer
func (s *AdminServer) GetConsumerGroupOffsets(namespace, topicName string) ([]ConsumerGroupOffsetInfo, error) {
var offsets []ConsumerGroupOffsetInfo
err := s.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
// Get the topic directory: /topics/namespace/topicName
topicObj := topic.NewTopic(namespace, topicName)
topicDir := topicObj.Dir()
// List all version directories under the topic directory (e.g., v2025-07-10-05-44-34)
versionStream, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{
Directory: topicDir,
Prefix: "",
StartFromFileName: "",
InclusiveStartFrom: false,
Limit: 1000,
})
if err != nil {
return fmt.Errorf("failed to list topic directory %s: %v", topicDir, err)
}
// Process each version directory
for {
versionResp, err := versionStream.Recv()
if err != nil {
if err == io.EOF {
break
}
return fmt.Errorf("failed to receive version entries: %w", err)
}
// Only process directories that are versions (start with "v")
if versionResp.Entry.IsDirectory && strings.HasPrefix(versionResp.Entry.Name, "v") {
versionDir := filepath.Join(topicDir, versionResp.Entry.Name)
// List all partition directories under the version directory (e.g., 0315-0630)
partitionStream, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{
Directory: versionDir,
Prefix: "",
StartFromFileName: "",
InclusiveStartFrom: false,
Limit: 1000,
})
if err != nil {
glog.Warningf("Failed to list version directory %s: %v", versionDir, err)
continue
}
// Process each partition directory
for {
partitionResp, err := partitionStream.Recv()
if err != nil {
if err == io.EOF {
break
}
glog.Warningf("Failed to receive partition entries: %v", err)
break
}
// Only process directories that are partitions (format: NNNN-NNNN)
if partitionResp.Entry.IsDirectory {
// Parse partition range to get partition start ID (e.g., "0315-0630" -> 315)
var partitionStart, partitionStop int32
if n, err := fmt.Sscanf(partitionResp.Entry.Name, "%04d-%04d", &partitionStart, &partitionStop); n != 2 || err != nil {
// Skip directories that don't match the partition format
continue
}
partitionDir := filepath.Join(versionDir, partitionResp.Entry.Name)
// List all .offset files in this partition directory
offsetStream, err := client.ListEntries(context.Background(), &filer_pb.ListEntriesRequest{
Directory: partitionDir,
Prefix: "",
StartFromFileName: "",
InclusiveStartFrom: false,
Limit: 1000,
})
if err != nil {
glog.Warningf("Failed to list partition directory %s: %v", partitionDir, err)
continue
}
// Process each offset file
for {
offsetResp, err := offsetStream.Recv()
if err != nil {
if err == io.EOF {
break
}
glog.Warningf("Failed to receive offset entries: %v", err)
break
}
// Only process .offset files
if !offsetResp.Entry.IsDirectory && strings.HasSuffix(offsetResp.Entry.Name, ".offset") {
consumerGroup := strings.TrimSuffix(offsetResp.Entry.Name, ".offset")
// Read the offset value from the file
offsetData, err := filer.ReadInsideFiler(client, partitionDir, offsetResp.Entry.Name)
if err != nil {
glog.Warningf("Failed to read offset file %s: %v", offsetResp.Entry.Name, err)
continue
}
if len(offsetData) == 8 {
offset := int64(util.BytesToUint64(offsetData))
// Get the file modification time
lastUpdated := time.Unix(offsetResp.Entry.Attributes.Mtime, 0)
offsets = append(offsets, ConsumerGroupOffsetInfo{
ConsumerGroup: consumerGroup,
PartitionID: partitionStart, // Use partition start as the ID
Offset: offset,
LastUpdated: lastUpdated,
})
}
}
}
}
}
}
}
return nil
})
if err != nil {
return nil, fmt.Errorf("failed to get consumer group offsets: %w", err)
}
return offsets, nil
}
// convertRecordTypeToSchemaFields converts a protobuf RecordType to SchemaFieldInfo slice
func convertRecordTypeToSchemaFields(recordType *schema_pb.RecordType) []SchemaFieldInfo {
var schemaFields []SchemaFieldInfo
if recordType == nil || recordType.Fields == nil {
return schemaFields
}
for _, field := range recordType.Fields {
schemaField := SchemaFieldInfo{
Name: field.Name,
Type: getFieldTypeString(field.Type),
Required: field.IsRequired,
}
schemaFields = append(schemaFields, schemaField)
}
return schemaFields
}
// getFieldTypeString converts a protobuf Type to a human-readable string
func getFieldTypeString(fieldType *schema_pb.Type) string {
if fieldType == nil {
return "unknown"
}
switch kind := fieldType.Kind.(type) {
case *schema_pb.Type_ScalarType:
return getScalarTypeString(kind.ScalarType)
case *schema_pb.Type_RecordType:
return "record"
case *schema_pb.Type_ListType:
elementType := getFieldTypeString(kind.ListType.ElementType)
return fmt.Sprintf("list<%s>", elementType)
default:
return "unknown"
}
}
// getScalarTypeString converts a protobuf ScalarType to a string
func getScalarTypeString(scalarType schema_pb.ScalarType) string {
switch scalarType {
case schema_pb.ScalarType_BOOL:
return "bool"
case schema_pb.ScalarType_INT32:
return "int32"
case schema_pb.ScalarType_INT64:
return "int64"
case schema_pb.ScalarType_FLOAT:
return "float"
case schema_pb.ScalarType_DOUBLE:
return "double"
case schema_pb.ScalarType_BYTES:
return "bytes"
case schema_pb.ScalarType_STRING:
return "string"
default:
return "unknown"
}
}
// convertTopicPublishers converts protobuf TopicPublisher slice to PublisherInfo slice
func convertTopicPublishers(publishers []*mq_pb.TopicPublisher) []PublisherInfo {
publisherInfos := make([]PublisherInfo, 0, len(publishers))
for _, publisher := range publishers {
publisherInfo := PublisherInfo{
PublisherName: publisher.PublisherName,
ClientID: publisher.ClientId,
PartitionID: publisher.Partition.RangeStart,
Broker: publisher.Broker,
IsActive: publisher.IsActive,
LastPublishedOffset: publisher.LastPublishedOffset,
LastAckedOffset: publisher.LastAckedOffset,
}
// Convert timestamps
if publisher.ConnectTimeNs > 0 {
publisherInfo.ConnectTime = time.Unix(0, publisher.ConnectTimeNs)
}
if publisher.LastSeenTimeNs > 0 {
publisherInfo.LastSeenTime = time.Unix(0, publisher.LastSeenTimeNs)
}
publisherInfos = append(publisherInfos, publisherInfo)
}
return publisherInfos
}
// convertTopicSubscribers converts protobuf TopicSubscriber slice to TopicSubscriberInfo slice
func convertTopicSubscribers(subscribers []*mq_pb.TopicSubscriber) []TopicSubscriberInfo {
subscriberInfos := make([]TopicSubscriberInfo, 0, len(subscribers))
for _, subscriber := range subscribers {
subscriberInfo := TopicSubscriberInfo{
ConsumerGroup: subscriber.ConsumerGroup,
ConsumerID: subscriber.ConsumerId,
ClientID: subscriber.ClientId,
PartitionID: subscriber.Partition.RangeStart,
Broker: subscriber.Broker,
IsActive: subscriber.IsActive,
CurrentOffset: subscriber.CurrentOffset,
LastReceivedOffset: subscriber.LastReceivedOffset,
}
// Convert timestamps
if subscriber.ConnectTimeNs > 0 {
subscriberInfo.ConnectTime = time.Unix(0, subscriber.ConnectTimeNs)
}
if subscriber.LastSeenTimeNs > 0 {
subscriberInfo.LastSeenTime = time.Unix(0, subscriber.LastSeenTimeNs)
}
subscriberInfos = append(subscriberInfos, subscriberInfo)
}
return subscriberInfos
}
// findBrokerLeader finds the current broker leader
func (s *AdminServer) findBrokerLeader() (string, error) {
// First, try to find any broker from the cluster
var brokers []string
err := s.WithMasterClient(func(client master_pb.SeaweedClient) error {
resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
ClientType: cluster.BrokerType,
})
if err != nil {
return err
}
for _, node := range resp.ClusterNodes {
brokers = append(brokers, node.Address)
}
return nil
})
if err != nil {
return "", fmt.Errorf("failed to list brokers: %w", err)
}
if len(brokers) == 0 {
return "", fmt.Errorf("no brokers found in cluster")
}
// Try each broker to find the leader
for _, brokerAddr := range brokers {
err := s.withBrokerClient(brokerAddr, func(client mq_pb.SeaweedMessagingClient) error {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
// Try to find broker leader
_, err := client.FindBrokerLeader(ctx, &mq_pb.FindBrokerLeaderRequest{
FilerGroup: "",
})
if err == nil {
return nil // This broker is the leader
}
return err
})
if err == nil {
return brokerAddr, nil
}
}
return "", fmt.Errorf("no broker leader found")
}
// withBrokerClient connects to a message queue broker and executes a function
func (s *AdminServer) withBrokerClient(brokerAddress string, fn func(client mq_pb.SeaweedMessagingClient) error) error {
return pb.WithBrokerGrpcClient(false, brokerAddress, s.grpcDialOption, fn)
}
// convertTopicRetention converts protobuf retention to TopicRetentionInfo
func convertTopicRetention(retention *mq_pb.TopicRetention) TopicRetentionInfo {
if retention == nil || !retention.Enabled {
return TopicRetentionInfo{
Enabled: false,
RetentionSeconds: 0,
DisplayValue: 0,
DisplayUnit: "days",
}
}
// Convert seconds to human-readable format
seconds := retention.RetentionSeconds
var displayValue int32
var displayUnit string
if seconds >= 86400 { // >= 1 day
displayValue = int32(seconds / 86400)
displayUnit = "days"
} else if seconds >= 3600 { // >= 1 hour
displayValue = int32(seconds / 3600)
displayUnit = "hours"
} else {
displayValue = int32(seconds)
displayUnit = "seconds"
}
return TopicRetentionInfo{
Enabled: retention.Enabled,
RetentionSeconds: seconds,
DisplayValue: displayValue,
DisplayUnit: displayUnit,
}
}
|