aboutsummaryrefslogtreecommitdiff
path: root/weed/query/engine/broker_client.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/query/engine/broker_client.go')
-rw-r--r--weed/query/engine/broker_client.go115
1 files changed, 49 insertions, 66 deletions
diff --git a/weed/query/engine/broker_client.go b/weed/query/engine/broker_client.go
index 9b5f9819c..3e6517678 100644
--- a/weed/query/engine/broker_client.go
+++ b/weed/query/engine/broker_client.go
@@ -5,14 +5,15 @@ import (
"encoding/binary"
"fmt"
"io"
- "strconv"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/cluster"
"github.com/seaweedfs/seaweedfs/weed/filer"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
@@ -39,8 +40,9 @@ type BrokerClient struct {
// NewBrokerClient creates a new MQ broker client
// Uses master HTTP address and converts it to gRPC address for service discovery
func NewBrokerClient(masterHTTPAddress string) *BrokerClient {
- // Convert HTTP address to gRPC address (typically HTTP port + 10000)
- masterGRPCAddress := convertHTTPToGRPC(masterHTTPAddress)
+ // Convert HTTP address to gRPC address using pb.ServerAddress method
+ httpAddr := pb.ServerAddress(masterHTTPAddress)
+ masterGRPCAddress := httpAddr.ToGrpcAddress()
return &BrokerClient{
masterAddress: masterGRPCAddress,
@@ -48,20 +50,7 @@ func NewBrokerClient(masterHTTPAddress string) *BrokerClient {
}
}
-// convertHTTPToGRPC converts HTTP address to gRPC address
-// Follows SeaweedFS convention: gRPC port = HTTP port + 10000
-func convertHTTPToGRPC(httpAddress string) string {
- if strings.Contains(httpAddress, ":") {
- parts := strings.Split(httpAddress, ":")
- if len(parts) == 2 {
- if port, err := strconv.Atoi(parts[1]); err == nil {
- return fmt.Sprintf("%s:%d", parts[0], port+10000)
- }
- }
- }
- // Fallback: return original address if conversion fails
- return httpAddress
-}
+// No need for convertHTTPToGRPC - pb.ServerAddress.ToGrpcAddress() already handles this
// discoverFiler finds a filer from the master server
func (c *BrokerClient) discoverFiler() error {
@@ -92,7 +81,8 @@ func (c *BrokerClient) discoverFiler() error {
// Use the first available filer and convert HTTP address to gRPC
filerHTTPAddress := resp.ClusterNodes[0].Address
- c.filerAddress = convertHTTPToGRPC(filerHTTPAddress)
+ httpAddr := pb.ServerAddress(filerHTTPAddress)
+ c.filerAddress = httpAddr.ToGrpcAddress()
return nil
}
@@ -175,7 +165,6 @@ func (f *filerClientImpl) GetDataCenter() string {
}
// ListNamespaces retrieves all MQ namespaces (databases) from the filer
-// RESOLVED: Now queries actual topic directories instead of hardcoded values
func (c *BrokerClient) ListNamespaces(ctx context.Context) ([]string, error) {
// Get filer client to list directories under /topics
filerClient, err := c.GetFilerClient()
@@ -204,8 +193,8 @@ func (c *BrokerClient) ListNamespaces(ctx context.Context) ([]string, error) {
return fmt.Errorf("failed to receive entry: %v", recvErr)
}
- // Only include directories (namespaces), skip files
- if resp.Entry != nil && resp.Entry.IsDirectory {
+ // Only include directories (namespaces), skip files and system directories (starting with .)
+ if resp.Entry != nil && resp.Entry.IsDirectory && !strings.HasPrefix(resp.Entry.Name, ".") {
namespaces = append(namespaces, resp.Entry.Name)
}
}
@@ -222,7 +211,6 @@ func (c *BrokerClient) ListNamespaces(ctx context.Context) ([]string, error) {
}
// ListTopics retrieves all topics in a namespace from the filer
-// RESOLVED: Now queries actual topic directories instead of hardcoded values
func (c *BrokerClient) ListTopics(ctx context.Context, namespace string) ([]string, error) {
// Get filer client to list directories under /topics/{namespace}
filerClient, err := c.GetFilerClient()
@@ -271,16 +259,18 @@ func (c *BrokerClient) ListTopics(ctx context.Context, namespace string) ([]stri
return topics, nil
}
-// GetTopicSchema retrieves schema information for a specific topic
-// Reads the actual schema from topic configuration stored in filer
-func (c *BrokerClient) GetTopicSchema(ctx context.Context, namespace, topicName string) (*schema_pb.RecordType, error) {
+// GetTopicSchema retrieves the flat schema and key columns for a topic
+// Returns (flatSchema, keyColumns, schemaFormat, error)
+func (c *BrokerClient) GetTopicSchema(ctx context.Context, namespace, topicName string) (*schema_pb.RecordType, []string, string, error) {
// Get filer client to read topic configuration
filerClient, err := c.GetFilerClient()
if err != nil {
- return nil, fmt.Errorf("failed to get filer client: %v", err)
+ return nil, nil, "", fmt.Errorf("failed to get filer client: %v", err)
}
- var recordType *schema_pb.RecordType
+ var flatSchema *schema_pb.RecordType
+ var keyColumns []string
+ var schemaFormat string
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
// Read topic.conf file from /topics/{namespace}/{topic}/topic.conf
topicDir := fmt.Sprintf("/topics/%s/%s", namespace, topicName)
@@ -306,30 +296,23 @@ func (c *BrokerClient) GetTopicSchema(ctx context.Context, namespace, topicName
return fmt.Errorf("failed to unmarshal topic %s.%s configuration: %v", namespace, topicName, err)
}
- // Extract the record type (schema)
- if conf.RecordType != nil {
- recordType = conf.RecordType
- } else {
- return fmt.Errorf("no schema found for topic %s.%s", namespace, topicName)
- }
+ // Extract flat schema, key columns, and schema format
+ flatSchema = conf.MessageRecordType
+ keyColumns = conf.KeyColumns
+ schemaFormat = conf.SchemaFormat
return nil
})
if err != nil {
- return nil, err
- }
-
- if recordType == nil {
- return nil, fmt.Errorf("no record type found for topic %s.%s", namespace, topicName)
+ return nil, nil, "", err
}
- return recordType, nil
+ return flatSchema, keyColumns, schemaFormat, nil
}
-// ConfigureTopic creates or modifies a topic configuration
-// Assumption: Uses existing ConfigureTopic gRPC method for topic management
-func (c *BrokerClient) ConfigureTopic(ctx context.Context, namespace, topicName string, partitionCount int32, recordType *schema_pb.RecordType) error {
+// ConfigureTopic creates or modifies a topic using flat schema format
+func (c *BrokerClient) ConfigureTopic(ctx context.Context, namespace, topicName string, partitionCount int32, flatSchema *schema_pb.RecordType, keyColumns []string) error {
if err := c.findBrokerBalancer(); err != nil {
return err
}
@@ -342,14 +325,15 @@ func (c *BrokerClient) ConfigureTopic(ctx context.Context, namespace, topicName
client := mq_pb.NewSeaweedMessagingClient(conn)
- // Create topic configuration
+ // Create topic configuration using flat schema format
_, err = client.ConfigureTopic(ctx, &mq_pb.ConfigureTopicRequest{
Topic: &schema_pb.Topic{
Namespace: namespace,
Name: topicName,
},
- PartitionCount: partitionCount,
- RecordType: recordType,
+ PartitionCount: partitionCount,
+ MessageRecordType: flatSchema,
+ KeyColumns: keyColumns,
})
if err != nil {
return fmt.Errorf("failed to configure topic %s.%s: %v", namespace, topicName, err)
@@ -433,15 +417,21 @@ func (c *BrokerClient) ListTopicPartitions(ctx context.Context, namespace, topic
// Uses buffer_start metadata from disk files for precise deduplication
// This prevents double-counting when combining with disk-based data
func (c *BrokerClient) GetUnflushedMessages(ctx context.Context, namespace, topicName string, partition topic.Partition, startTimeNs int64) ([]*filer_pb.LogEntry, error) {
+ glog.V(2).Infof("GetUnflushedMessages called for %s/%s, partition: RangeStart=%d, RangeStop=%d",
+ namespace, topicName, partition.RangeStart, partition.RangeStop)
+
// Step 1: Find the broker that hosts this partition
if err := c.findBrokerBalancer(); err != nil {
+ glog.V(2).Infof("Failed to find broker balancer: %v", err)
// Return empty slice if we can't find broker - prevents double-counting
return []*filer_pb.LogEntry{}, nil
}
+ glog.V(2).Infof("Found broker at address: %s", c.brokerAddress)
// Step 2: Connect to broker
conn, err := grpc.Dial(c.brokerAddress, c.grpcDialOption)
if err != nil {
+ glog.V(2).Infof("Failed to connect to broker %s: %v", c.brokerAddress, err)
// Return empty slice if connection fails - prevents double-counting
return []*filer_pb.LogEntry{}, nil
}
@@ -449,16 +439,20 @@ func (c *BrokerClient) GetUnflushedMessages(ctx context.Context, namespace, topi
client := mq_pb.NewSeaweedMessagingClient(conn)
- // Step 3: Get earliest buffer_start from disk files for precise deduplication
+ // Step 3: For unflushed messages, always start from 0 to get all in-memory data
+ // The buffer_start metadata in log files uses timestamp-based indices for uniqueness,
+ // but the broker's LogBuffer uses sequential indices internally (0, 1, 2, 3...)
+ // For unflushed data queries, we want all messages in the buffer regardless of their
+ // timestamp-based buffer indices, so we always use 0.
topicObj := topic.Topic{Namespace: namespace, Name: topicName}
partitionPath := topic.PartitionDir(topicObj, partition)
- earliestBufferIndex, err := c.getEarliestBufferStart(ctx, partitionPath)
- if err != nil {
- // If we can't get buffer info, use 0 (get all unflushed data)
- earliestBufferIndex = 0
- }
+ glog.V(2).Infof("Getting buffer start from partition path: %s", partitionPath)
+
+ // Always use 0 for unflushed messages to ensure we get all in-memory data
+ earliestBufferOffset := int64(0)
+ glog.V(2).Infof("Using StartBufferOffset=0 for unflushed messages (buffer offsets are sequential internally)")
- // Step 4: Prepare request using buffer index filtering only
+ // Step 4: Prepare request using buffer offset filtering only
request := &mq_pb.GetUnflushedMessagesRequest{
Topic: &schema_pb.Topic{
Namespace: namespace,
@@ -470,12 +464,14 @@ func (c *BrokerClient) GetUnflushedMessages(ctx context.Context, namespace, topi
RangeStop: partition.RangeStop,
UnixTimeNs: partition.UnixTimeNs,
},
- StartBufferIndex: earliestBufferIndex,
+ StartBufferOffset: earliestBufferOffset,
}
// Step 5: Call the broker streaming API
+ glog.V(2).Infof("Calling GetUnflushedMessages gRPC with StartBufferOffset=%d", earliestBufferOffset)
stream, err := client.GetUnflushedMessages(ctx, request)
if err != nil {
+ glog.V(2).Infof("GetUnflushedMessages gRPC call failed: %v", err)
// Return empty slice if gRPC call fails - prevents double-counting
return []*filer_pb.LogEntry{}, nil
}
@@ -558,19 +554,6 @@ func (c *BrokerClient) getEarliestBufferStart(ctx context.Context, partitionPath
return nil
})
- // Debug: Show buffer_start determination logic in EXPLAIN mode
- if isDebugMode(ctx) && len(bufferStartSources) > 0 {
- if logFileCount == 0 && parquetFileCount > 0 {
- fmt.Printf("Debug: Using Parquet buffer_start metadata (binary format, no log files) - sources: %v\n", bufferStartSources)
- } else if logFileCount > 0 && parquetFileCount > 0 {
- fmt.Printf("Debug: Using mixed sources for buffer_start (binary format) - log files: %d, Parquet files: %d, sources: %v\n",
- logFileCount, parquetFileCount, bufferStartSources)
- } else {
- fmt.Printf("Debug: Using log file buffer_start metadata (binary format) - sources: %v\n", bufferStartSources)
- }
- fmt.Printf("Debug: Earliest buffer_start index: %d\n", earliestBufferIndex)
- }
-
if err != nil {
return 0, fmt.Errorf("failed to scan partition directory: %v", err)
}