diff options
Diffstat (limited to 'weed/query/engine/broker_client.go')
| -rw-r--r-- | weed/query/engine/broker_client.go | 115 |
1 files changed, 49 insertions, 66 deletions
diff --git a/weed/query/engine/broker_client.go b/weed/query/engine/broker_client.go index 9b5f9819c..3e6517678 100644 --- a/weed/query/engine/broker_client.go +++ b/weed/query/engine/broker_client.go @@ -5,14 +5,15 @@ import ( "encoding/binary" "fmt" "io" - "strconv" "strings" "time" "github.com/seaweedfs/seaweedfs/weed/cluster" "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" @@ -39,8 +40,9 @@ type BrokerClient struct { // NewBrokerClient creates a new MQ broker client // Uses master HTTP address and converts it to gRPC address for service discovery func NewBrokerClient(masterHTTPAddress string) *BrokerClient { - // Convert HTTP address to gRPC address (typically HTTP port + 10000) - masterGRPCAddress := convertHTTPToGRPC(masterHTTPAddress) + // Convert HTTP address to gRPC address using pb.ServerAddress method + httpAddr := pb.ServerAddress(masterHTTPAddress) + masterGRPCAddress := httpAddr.ToGrpcAddress() return &BrokerClient{ masterAddress: masterGRPCAddress, @@ -48,20 +50,7 @@ func NewBrokerClient(masterHTTPAddress string) *BrokerClient { } } -// convertHTTPToGRPC converts HTTP address to gRPC address -// Follows SeaweedFS convention: gRPC port = HTTP port + 10000 -func convertHTTPToGRPC(httpAddress string) string { - if strings.Contains(httpAddress, ":") { - parts := strings.Split(httpAddress, ":") - if len(parts) == 2 { - if port, err := strconv.Atoi(parts[1]); err == nil { - return fmt.Sprintf("%s:%d", parts[0], port+10000) - } - } - } - // Fallback: return original address if conversion fails - return httpAddress -} +// No need for convertHTTPToGRPC - pb.ServerAddress.ToGrpcAddress() already handles this // discoverFiler finds a filer from the master server func (c *BrokerClient) discoverFiler() error { @@ -92,7 +81,8 @@ func (c *BrokerClient) discoverFiler() error { // Use the first available filer and convert HTTP address to gRPC filerHTTPAddress := resp.ClusterNodes[0].Address - c.filerAddress = convertHTTPToGRPC(filerHTTPAddress) + httpAddr := pb.ServerAddress(filerHTTPAddress) + c.filerAddress = httpAddr.ToGrpcAddress() return nil } @@ -175,7 +165,6 @@ func (f *filerClientImpl) GetDataCenter() string { } // ListNamespaces retrieves all MQ namespaces (databases) from the filer -// RESOLVED: Now queries actual topic directories instead of hardcoded values func (c *BrokerClient) ListNamespaces(ctx context.Context) ([]string, error) { // Get filer client to list directories under /topics filerClient, err := c.GetFilerClient() @@ -204,8 +193,8 @@ func (c *BrokerClient) ListNamespaces(ctx context.Context) ([]string, error) { return fmt.Errorf("failed to receive entry: %v", recvErr) } - // Only include directories (namespaces), skip files - if resp.Entry != nil && resp.Entry.IsDirectory { + // Only include directories (namespaces), skip files and system directories (starting with .) + if resp.Entry != nil && resp.Entry.IsDirectory && !strings.HasPrefix(resp.Entry.Name, ".") { namespaces = append(namespaces, resp.Entry.Name) } } @@ -222,7 +211,6 @@ func (c *BrokerClient) ListNamespaces(ctx context.Context) ([]string, error) { } // ListTopics retrieves all topics in a namespace from the filer -// RESOLVED: Now queries actual topic directories instead of hardcoded values func (c *BrokerClient) ListTopics(ctx context.Context, namespace string) ([]string, error) { // Get filer client to list directories under /topics/{namespace} filerClient, err := c.GetFilerClient() @@ -271,16 +259,18 @@ func (c *BrokerClient) ListTopics(ctx context.Context, namespace string) ([]stri return topics, nil } -// GetTopicSchema retrieves schema information for a specific topic -// Reads the actual schema from topic configuration stored in filer -func (c *BrokerClient) GetTopicSchema(ctx context.Context, namespace, topicName string) (*schema_pb.RecordType, error) { +// GetTopicSchema retrieves the flat schema and key columns for a topic +// Returns (flatSchema, keyColumns, schemaFormat, error) +func (c *BrokerClient) GetTopicSchema(ctx context.Context, namespace, topicName string) (*schema_pb.RecordType, []string, string, error) { // Get filer client to read topic configuration filerClient, err := c.GetFilerClient() if err != nil { - return nil, fmt.Errorf("failed to get filer client: %v", err) + return nil, nil, "", fmt.Errorf("failed to get filer client: %v", err) } - var recordType *schema_pb.RecordType + var flatSchema *schema_pb.RecordType + var keyColumns []string + var schemaFormat string err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { // Read topic.conf file from /topics/{namespace}/{topic}/topic.conf topicDir := fmt.Sprintf("/topics/%s/%s", namespace, topicName) @@ -306,30 +296,23 @@ func (c *BrokerClient) GetTopicSchema(ctx context.Context, namespace, topicName return fmt.Errorf("failed to unmarshal topic %s.%s configuration: %v", namespace, topicName, err) } - // Extract the record type (schema) - if conf.RecordType != nil { - recordType = conf.RecordType - } else { - return fmt.Errorf("no schema found for topic %s.%s", namespace, topicName) - } + // Extract flat schema, key columns, and schema format + flatSchema = conf.MessageRecordType + keyColumns = conf.KeyColumns + schemaFormat = conf.SchemaFormat return nil }) if err != nil { - return nil, err - } - - if recordType == nil { - return nil, fmt.Errorf("no record type found for topic %s.%s", namespace, topicName) + return nil, nil, "", err } - return recordType, nil + return flatSchema, keyColumns, schemaFormat, nil } -// ConfigureTopic creates or modifies a topic configuration -// Assumption: Uses existing ConfigureTopic gRPC method for topic management -func (c *BrokerClient) ConfigureTopic(ctx context.Context, namespace, topicName string, partitionCount int32, recordType *schema_pb.RecordType) error { +// ConfigureTopic creates or modifies a topic using flat schema format +func (c *BrokerClient) ConfigureTopic(ctx context.Context, namespace, topicName string, partitionCount int32, flatSchema *schema_pb.RecordType, keyColumns []string) error { if err := c.findBrokerBalancer(); err != nil { return err } @@ -342,14 +325,15 @@ func (c *BrokerClient) ConfigureTopic(ctx context.Context, namespace, topicName client := mq_pb.NewSeaweedMessagingClient(conn) - // Create topic configuration + // Create topic configuration using flat schema format _, err = client.ConfigureTopic(ctx, &mq_pb.ConfigureTopicRequest{ Topic: &schema_pb.Topic{ Namespace: namespace, Name: topicName, }, - PartitionCount: partitionCount, - RecordType: recordType, + PartitionCount: partitionCount, + MessageRecordType: flatSchema, + KeyColumns: keyColumns, }) if err != nil { return fmt.Errorf("failed to configure topic %s.%s: %v", namespace, topicName, err) @@ -433,15 +417,21 @@ func (c *BrokerClient) ListTopicPartitions(ctx context.Context, namespace, topic // Uses buffer_start metadata from disk files for precise deduplication // This prevents double-counting when combining with disk-based data func (c *BrokerClient) GetUnflushedMessages(ctx context.Context, namespace, topicName string, partition topic.Partition, startTimeNs int64) ([]*filer_pb.LogEntry, error) { + glog.V(2).Infof("GetUnflushedMessages called for %s/%s, partition: RangeStart=%d, RangeStop=%d", + namespace, topicName, partition.RangeStart, partition.RangeStop) + // Step 1: Find the broker that hosts this partition if err := c.findBrokerBalancer(); err != nil { + glog.V(2).Infof("Failed to find broker balancer: %v", err) // Return empty slice if we can't find broker - prevents double-counting return []*filer_pb.LogEntry{}, nil } + glog.V(2).Infof("Found broker at address: %s", c.brokerAddress) // Step 2: Connect to broker conn, err := grpc.Dial(c.brokerAddress, c.grpcDialOption) if err != nil { + glog.V(2).Infof("Failed to connect to broker %s: %v", c.brokerAddress, err) // Return empty slice if connection fails - prevents double-counting return []*filer_pb.LogEntry{}, nil } @@ -449,16 +439,20 @@ func (c *BrokerClient) GetUnflushedMessages(ctx context.Context, namespace, topi client := mq_pb.NewSeaweedMessagingClient(conn) - // Step 3: Get earliest buffer_start from disk files for precise deduplication + // Step 3: For unflushed messages, always start from 0 to get all in-memory data + // The buffer_start metadata in log files uses timestamp-based indices for uniqueness, + // but the broker's LogBuffer uses sequential indices internally (0, 1, 2, 3...) + // For unflushed data queries, we want all messages in the buffer regardless of their + // timestamp-based buffer indices, so we always use 0. topicObj := topic.Topic{Namespace: namespace, Name: topicName} partitionPath := topic.PartitionDir(topicObj, partition) - earliestBufferIndex, err := c.getEarliestBufferStart(ctx, partitionPath) - if err != nil { - // If we can't get buffer info, use 0 (get all unflushed data) - earliestBufferIndex = 0 - } + glog.V(2).Infof("Getting buffer start from partition path: %s", partitionPath) + + // Always use 0 for unflushed messages to ensure we get all in-memory data + earliestBufferOffset := int64(0) + glog.V(2).Infof("Using StartBufferOffset=0 for unflushed messages (buffer offsets are sequential internally)") - // Step 4: Prepare request using buffer index filtering only + // Step 4: Prepare request using buffer offset filtering only request := &mq_pb.GetUnflushedMessagesRequest{ Topic: &schema_pb.Topic{ Namespace: namespace, @@ -470,12 +464,14 @@ func (c *BrokerClient) GetUnflushedMessages(ctx context.Context, namespace, topi RangeStop: partition.RangeStop, UnixTimeNs: partition.UnixTimeNs, }, - StartBufferIndex: earliestBufferIndex, + StartBufferOffset: earliestBufferOffset, } // Step 5: Call the broker streaming API + glog.V(2).Infof("Calling GetUnflushedMessages gRPC with StartBufferOffset=%d", earliestBufferOffset) stream, err := client.GetUnflushedMessages(ctx, request) if err != nil { + glog.V(2).Infof("GetUnflushedMessages gRPC call failed: %v", err) // Return empty slice if gRPC call fails - prevents double-counting return []*filer_pb.LogEntry{}, nil } @@ -558,19 +554,6 @@ func (c *BrokerClient) getEarliestBufferStart(ctx context.Context, partitionPath return nil }) - // Debug: Show buffer_start determination logic in EXPLAIN mode - if isDebugMode(ctx) && len(bufferStartSources) > 0 { - if logFileCount == 0 && parquetFileCount > 0 { - fmt.Printf("Debug: Using Parquet buffer_start metadata (binary format, no log files) - sources: %v\n", bufferStartSources) - } else if logFileCount > 0 && parquetFileCount > 0 { - fmt.Printf("Debug: Using mixed sources for buffer_start (binary format) - log files: %d, Parquet files: %d, sources: %v\n", - logFileCount, parquetFileCount, bufferStartSources) - } else { - fmt.Printf("Debug: Using log file buffer_start metadata (binary format) - sources: %v\n", bufferStartSources) - } - fmt.Printf("Debug: Earliest buffer_start index: %d\n", earliestBufferIndex) - } - if err != nil { return 0, fmt.Errorf("failed to scan partition directory: %v", err) } |
