diff options
Diffstat (limited to 'weed/query/engine/broker_client.go')
| -rw-r--r-- | weed/query/engine/broker_client.go | 603 |
1 files changed, 603 insertions, 0 deletions
diff --git a/weed/query/engine/broker_client.go b/weed/query/engine/broker_client.go new file mode 100644 index 000000000..9b5f9819c --- /dev/null +++ b/weed/query/engine/broker_client.go @@ -0,0 +1,603 @@ +package engine + +import ( + "context" + "encoding/binary" + "fmt" + "io" + "strconv" + "strings" + "time" + + "github.com/seaweedfs/seaweedfs/weed/cluster" + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" + "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" + "github.com/seaweedfs/seaweedfs/weed/util" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + jsonpb "google.golang.org/protobuf/encoding/protojson" +) + +// BrokerClient handles communication with SeaweedFS MQ broker +// Implements BrokerClientInterface for production use +// Assumptions: +// 1. Service discovery via master server (discovers filers and brokers) +// 2. gRPC connection with default timeout of 30 seconds +// 3. Topics and namespaces are managed via SeaweedMessaging service +type BrokerClient struct { + masterAddress string + filerAddress string + brokerAddress string + grpcDialOption grpc.DialOption +} + +// NewBrokerClient creates a new MQ broker client +// Uses master HTTP address and converts it to gRPC address for service discovery +func NewBrokerClient(masterHTTPAddress string) *BrokerClient { + // Convert HTTP address to gRPC address (typically HTTP port + 10000) + masterGRPCAddress := convertHTTPToGRPC(masterHTTPAddress) + + return &BrokerClient{ + masterAddress: masterGRPCAddress, + grpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()), + } +} + +// convertHTTPToGRPC converts HTTP address to gRPC address +// Follows SeaweedFS convention: gRPC port = HTTP port + 10000 +func convertHTTPToGRPC(httpAddress string) string { + if strings.Contains(httpAddress, ":") { + parts := strings.Split(httpAddress, ":") + if len(parts) == 2 { + if port, err := strconv.Atoi(parts[1]); err == nil { + return fmt.Sprintf("%s:%d", parts[0], port+10000) + } + } + } + // Fallback: return original address if conversion fails + return httpAddress +} + +// discoverFiler finds a filer from the master server +func (c *BrokerClient) discoverFiler() error { + if c.filerAddress != "" { + return nil // already discovered + } + + conn, err := grpc.Dial(c.masterAddress, c.grpcDialOption) + if err != nil { + return fmt.Errorf("failed to connect to master at %s: %v", c.masterAddress, err) + } + defer conn.Close() + + client := master_pb.NewSeaweedClient(conn) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + resp, err := client.ListClusterNodes(ctx, &master_pb.ListClusterNodesRequest{ + ClientType: cluster.FilerType, + }) + if err != nil { + return fmt.Errorf("failed to list filers from master: %v", err) + } + + if len(resp.ClusterNodes) == 0 { + return fmt.Errorf("no filers found in cluster") + } + + // Use the first available filer and convert HTTP address to gRPC + filerHTTPAddress := resp.ClusterNodes[0].Address + c.filerAddress = convertHTTPToGRPC(filerHTTPAddress) + + return nil +} + +// findBrokerBalancer discovers the broker balancer using filer lock mechanism +// First discovers filer from master, then uses filer to find broker balancer +func (c *BrokerClient) findBrokerBalancer() error { + if c.brokerAddress != "" { + return nil // already found + } + + // First discover filer from master + if err := c.discoverFiler(); err != nil { + return fmt.Errorf("failed to discover filer: %v", err) + } + + conn, err := grpc.Dial(c.filerAddress, c.grpcDialOption) + if err != nil { + return fmt.Errorf("failed to connect to filer at %s: %v", c.filerAddress, err) + } + defer conn.Close() + + client := filer_pb.NewSeaweedFilerClient(conn) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + resp, err := client.FindLockOwner(ctx, &filer_pb.FindLockOwnerRequest{ + Name: pub_balancer.LockBrokerBalancer, + }) + if err != nil { + return fmt.Errorf("failed to find broker balancer: %v", err) + } + + c.brokerAddress = resp.Owner + return nil +} + +// GetFilerClient creates a filer client for accessing MQ data files +// Discovers filer from master if not already known +func (c *BrokerClient) GetFilerClient() (filer_pb.FilerClient, error) { + // Ensure filer is discovered + if err := c.discoverFiler(); err != nil { + return nil, fmt.Errorf("failed to discover filer: %v", err) + } + + return &filerClientImpl{ + filerAddress: c.filerAddress, + grpcDialOption: c.grpcDialOption, + }, nil +} + +// filerClientImpl implements filer_pb.FilerClient interface for MQ data access +type filerClientImpl struct { + filerAddress string + grpcDialOption grpc.DialOption +} + +// WithFilerClient executes a function with a connected filer client +func (f *filerClientImpl) WithFilerClient(followRedirect bool, fn func(client filer_pb.SeaweedFilerClient) error) error { + conn, err := grpc.Dial(f.filerAddress, f.grpcDialOption) + if err != nil { + return fmt.Errorf("failed to connect to filer at %s: %v", f.filerAddress, err) + } + defer conn.Close() + + client := filer_pb.NewSeaweedFilerClient(conn) + return fn(client) +} + +// AdjustedUrl implements the FilerClient interface (placeholder implementation) +func (f *filerClientImpl) AdjustedUrl(location *filer_pb.Location) string { + return location.Url +} + +// GetDataCenter implements the FilerClient interface (placeholder implementation) +func (f *filerClientImpl) GetDataCenter() string { + // Return empty string as we don't have data center information for this simple client + return "" +} + +// ListNamespaces retrieves all MQ namespaces (databases) from the filer +// RESOLVED: Now queries actual topic directories instead of hardcoded values +func (c *BrokerClient) ListNamespaces(ctx context.Context) ([]string, error) { + // Get filer client to list directories under /topics + filerClient, err := c.GetFilerClient() + if err != nil { + return []string{}, fmt.Errorf("failed to get filer client: %v", err) + } + + var namespaces []string + err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + // List directories under /topics to get namespaces + request := &filer_pb.ListEntriesRequest{ + Directory: "/topics", // filer.TopicsDir constant value + } + + stream, streamErr := client.ListEntries(ctx, request) + if streamErr != nil { + return fmt.Errorf("failed to list topics directory: %v", streamErr) + } + + for { + resp, recvErr := stream.Recv() + if recvErr != nil { + if recvErr == io.EOF { + break // End of stream + } + return fmt.Errorf("failed to receive entry: %v", recvErr) + } + + // Only include directories (namespaces), skip files + if resp.Entry != nil && resp.Entry.IsDirectory { + namespaces = append(namespaces, resp.Entry.Name) + } + } + + return nil + }) + + if err != nil { + return []string{}, fmt.Errorf("failed to list namespaces from /topics: %v", err) + } + + // Return actual namespaces found (may be empty if no topics exist) + return namespaces, nil +} + +// ListTopics retrieves all topics in a namespace from the filer +// RESOLVED: Now queries actual topic directories instead of hardcoded values +func (c *BrokerClient) ListTopics(ctx context.Context, namespace string) ([]string, error) { + // Get filer client to list directories under /topics/{namespace} + filerClient, err := c.GetFilerClient() + if err != nil { + // Return empty list if filer unavailable - no fallback sample data + return []string{}, nil + } + + var topics []string + err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + // List directories under /topics/{namespace} to get topics + namespaceDir := fmt.Sprintf("/topics/%s", namespace) + request := &filer_pb.ListEntriesRequest{ + Directory: namespaceDir, + } + + stream, streamErr := client.ListEntries(ctx, request) + if streamErr != nil { + return fmt.Errorf("failed to list namespace directory %s: %v", namespaceDir, streamErr) + } + + for { + resp, recvErr := stream.Recv() + if recvErr != nil { + if recvErr == io.EOF { + break // End of stream + } + return fmt.Errorf("failed to receive entry: %v", recvErr) + } + + // Only include directories (topics), skip files + if resp.Entry != nil && resp.Entry.IsDirectory { + topics = append(topics, resp.Entry.Name) + } + } + + return nil + }) + + if err != nil { + // Return empty list if directory listing fails - no fallback sample data + return []string{}, nil + } + + // Return actual topics found (may be empty if no topics exist in namespace) + return topics, nil +} + +// GetTopicSchema retrieves schema information for a specific topic +// Reads the actual schema from topic configuration stored in filer +func (c *BrokerClient) GetTopicSchema(ctx context.Context, namespace, topicName string) (*schema_pb.RecordType, error) { + // Get filer client to read topic configuration + filerClient, err := c.GetFilerClient() + if err != nil { + return nil, fmt.Errorf("failed to get filer client: %v", err) + } + + var recordType *schema_pb.RecordType + err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + // Read topic.conf file from /topics/{namespace}/{topic}/topic.conf + topicDir := fmt.Sprintf("/topics/%s/%s", namespace, topicName) + + // First check if topic directory exists + _, err := client.LookupDirectoryEntry(ctx, &filer_pb.LookupDirectoryEntryRequest{ + Directory: topicDir, + Name: "topic.conf", + }) + if err != nil { + return fmt.Errorf("topic %s.%s not found: %v", namespace, topicName, err) + } + + // Read the topic.conf file content + data, err := filer.ReadInsideFiler(client, topicDir, "topic.conf") + if err != nil { + return fmt.Errorf("failed to read topic.conf for %s.%s: %v", namespace, topicName, err) + } + + // Parse the configuration + conf := &mq_pb.ConfigureTopicResponse{} + if err = jsonpb.Unmarshal(data, conf); err != nil { + return fmt.Errorf("failed to unmarshal topic %s.%s configuration: %v", namespace, topicName, err) + } + + // Extract the record type (schema) + if conf.RecordType != nil { + recordType = conf.RecordType + } else { + return fmt.Errorf("no schema found for topic %s.%s", namespace, topicName) + } + + return nil + }) + + if err != nil { + return nil, err + } + + if recordType == nil { + return nil, fmt.Errorf("no record type found for topic %s.%s", namespace, topicName) + } + + return recordType, nil +} + +// ConfigureTopic creates or modifies a topic configuration +// Assumption: Uses existing ConfigureTopic gRPC method for topic management +func (c *BrokerClient) ConfigureTopic(ctx context.Context, namespace, topicName string, partitionCount int32, recordType *schema_pb.RecordType) error { + if err := c.findBrokerBalancer(); err != nil { + return err + } + + conn, err := grpc.Dial(c.brokerAddress, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return fmt.Errorf("failed to connect to broker at %s: %v", c.brokerAddress, err) + } + defer conn.Close() + + client := mq_pb.NewSeaweedMessagingClient(conn) + + // Create topic configuration + _, err = client.ConfigureTopic(ctx, &mq_pb.ConfigureTopicRequest{ + Topic: &schema_pb.Topic{ + Namespace: namespace, + Name: topicName, + }, + PartitionCount: partitionCount, + RecordType: recordType, + }) + if err != nil { + return fmt.Errorf("failed to configure topic %s.%s: %v", namespace, topicName, err) + } + + return nil +} + +// DeleteTopic removes a topic and all its data +// Assumption: There's a delete/drop topic method (may need to be implemented in broker) +func (c *BrokerClient) DeleteTopic(ctx context.Context, namespace, topicName string) error { + if err := c.findBrokerBalancer(); err != nil { + return err + } + + // TODO: Implement topic deletion + // This may require a new gRPC method in the broker service + + return fmt.Errorf("topic deletion not yet implemented in broker - need to add DeleteTopic gRPC method") +} + +// ListTopicPartitions discovers the actual partitions for a given topic via MQ broker +func (c *BrokerClient) ListTopicPartitions(ctx context.Context, namespace, topicName string) ([]topic.Partition, error) { + if err := c.findBrokerBalancer(); err != nil { + // Fallback to default partition when broker unavailable + return []topic.Partition{{RangeStart: 0, RangeStop: 1000}}, nil + } + + // Get topic configuration to determine actual partitions + topicObj := topic.Topic{Namespace: namespace, Name: topicName} + + // Use filer client to read topic configuration + filerClient, err := c.GetFilerClient() + if err != nil { + // Fallback to default partition + return []topic.Partition{{RangeStart: 0, RangeStop: 1000}}, nil + } + + var topicConf *mq_pb.ConfigureTopicResponse + err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + topicConf, err = topicObj.ReadConfFile(client) + return err + }) + + if err != nil { + // Topic doesn't exist or can't read config, use default + return []topic.Partition{{RangeStart: 0, RangeStop: 1000}}, nil + } + + // Generate partitions based on topic configuration + partitionCount := int32(4) // Default partition count for topics + if len(topicConf.BrokerPartitionAssignments) > 0 { + partitionCount = int32(len(topicConf.BrokerPartitionAssignments)) + } + + // Create partition ranges - simplified approach + // Each partition covers an equal range of the hash space + rangeSize := topic.PartitionCount / partitionCount + var partitions []topic.Partition + + for i := int32(0); i < partitionCount; i++ { + rangeStart := i * rangeSize + rangeStop := (i + 1) * rangeSize + if i == partitionCount-1 { + // Last partition covers remaining range + rangeStop = topic.PartitionCount + } + + partitions = append(partitions, topic.Partition{ + RangeStart: rangeStart, + RangeStop: rangeStop, + RingSize: topic.PartitionCount, + UnixTimeNs: time.Now().UnixNano(), + }) + } + + return partitions, nil +} + +// GetUnflushedMessages returns only messages that haven't been flushed to disk yet +// Uses buffer_start metadata from disk files for precise deduplication +// This prevents double-counting when combining with disk-based data +func (c *BrokerClient) GetUnflushedMessages(ctx context.Context, namespace, topicName string, partition topic.Partition, startTimeNs int64) ([]*filer_pb.LogEntry, error) { + // Step 1: Find the broker that hosts this partition + if err := c.findBrokerBalancer(); err != nil { + // Return empty slice if we can't find broker - prevents double-counting + return []*filer_pb.LogEntry{}, nil + } + + // Step 2: Connect to broker + conn, err := grpc.Dial(c.brokerAddress, c.grpcDialOption) + if err != nil { + // Return empty slice if connection fails - prevents double-counting + return []*filer_pb.LogEntry{}, nil + } + defer conn.Close() + + client := mq_pb.NewSeaweedMessagingClient(conn) + + // Step 3: Get earliest buffer_start from disk files for precise deduplication + topicObj := topic.Topic{Namespace: namespace, Name: topicName} + partitionPath := topic.PartitionDir(topicObj, partition) + earliestBufferIndex, err := c.getEarliestBufferStart(ctx, partitionPath) + if err != nil { + // If we can't get buffer info, use 0 (get all unflushed data) + earliestBufferIndex = 0 + } + + // Step 4: Prepare request using buffer index filtering only + request := &mq_pb.GetUnflushedMessagesRequest{ + Topic: &schema_pb.Topic{ + Namespace: namespace, + Name: topicName, + }, + Partition: &schema_pb.Partition{ + RingSize: partition.RingSize, + RangeStart: partition.RangeStart, + RangeStop: partition.RangeStop, + UnixTimeNs: partition.UnixTimeNs, + }, + StartBufferIndex: earliestBufferIndex, + } + + // Step 5: Call the broker streaming API + stream, err := client.GetUnflushedMessages(ctx, request) + if err != nil { + // Return empty slice if gRPC call fails - prevents double-counting + return []*filer_pb.LogEntry{}, nil + } + + // Step 5: Receive streaming responses + var logEntries []*filer_pb.LogEntry + for { + response, err := stream.Recv() + if err != nil { + // End of stream or error - return what we have to prevent double-counting + break + } + + // Handle error messages + if response.Error != "" { + // Log the error but return empty slice - prevents double-counting + // (In debug mode, this would be visible) + return []*filer_pb.LogEntry{}, nil + } + + // Check for end of stream + if response.EndOfStream { + break + } + + // Convert and collect the message + if response.Message != nil { + logEntries = append(logEntries, &filer_pb.LogEntry{ + TsNs: response.Message.TsNs, + Key: response.Message.Key, + Data: response.Message.Data, + PartitionKeyHash: int32(response.Message.PartitionKeyHash), // Convert uint32 to int32 + }) + } + } + + return logEntries, nil +} + +// getEarliestBufferStart finds the earliest buffer_start index from disk files in the partition +// +// This method handles three scenarios for seamless broker querying: +// 1. Live log files exist: Uses their buffer_start metadata (most recent boundaries) +// 2. Only Parquet files exist: Uses Parquet buffer_start metadata (preserved from archived sources) +// 3. Mixed files: Uses earliest buffer_start from all sources for comprehensive coverage +// +// This ensures continuous real-time querying capability even after log file compaction/archival +func (c *BrokerClient) getEarliestBufferStart(ctx context.Context, partitionPath string) (int64, error) { + filerClient, err := c.GetFilerClient() + if err != nil { + return 0, fmt.Errorf("failed to get filer client: %v", err) + } + + var earliestBufferIndex int64 = -1 // -1 means no buffer_start found + var logFileCount, parquetFileCount int + var bufferStartSources []string // Track which files provide buffer_start + + err = filer_pb.ReadDirAllEntries(ctx, filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error { + // Skip directories + if entry.IsDirectory { + return nil + } + + // Count file types for scenario detection + if strings.HasSuffix(entry.Name, ".parquet") { + parquetFileCount++ + } else { + logFileCount++ + } + + // Extract buffer_start from file extended attributes (both log files and parquet files) + bufferStart := c.getBufferStartFromEntry(entry) + if bufferStart != nil && bufferStart.StartIndex > 0 { + if earliestBufferIndex == -1 || bufferStart.StartIndex < earliestBufferIndex { + earliestBufferIndex = bufferStart.StartIndex + } + bufferStartSources = append(bufferStartSources, entry.Name) + } + + return nil + }) + + // Debug: Show buffer_start determination logic in EXPLAIN mode + if isDebugMode(ctx) && len(bufferStartSources) > 0 { + if logFileCount == 0 && parquetFileCount > 0 { + fmt.Printf("Debug: Using Parquet buffer_start metadata (binary format, no log files) - sources: %v\n", bufferStartSources) + } else if logFileCount > 0 && parquetFileCount > 0 { + fmt.Printf("Debug: Using mixed sources for buffer_start (binary format) - log files: %d, Parquet files: %d, sources: %v\n", + logFileCount, parquetFileCount, bufferStartSources) + } else { + fmt.Printf("Debug: Using log file buffer_start metadata (binary format) - sources: %v\n", bufferStartSources) + } + fmt.Printf("Debug: Earliest buffer_start index: %d\n", earliestBufferIndex) + } + + if err != nil { + return 0, fmt.Errorf("failed to scan partition directory: %v", err) + } + + if earliestBufferIndex == -1 { + return 0, fmt.Errorf("no buffer_start metadata found in partition") + } + + return earliestBufferIndex, nil +} + +// getBufferStartFromEntry extracts LogBufferStart from file entry metadata +// Only supports binary format (used by both log files and Parquet files) +func (c *BrokerClient) getBufferStartFromEntry(entry *filer_pb.Entry) *LogBufferStart { + if entry.Extended == nil { + return nil + } + + if startData, exists := entry.Extended["buffer_start"]; exists { + // Only support binary format + if len(startData) == 8 { + startIndex := int64(binary.BigEndian.Uint64(startData)) + if startIndex > 0 { + return &LogBufferStart{StartIndex: startIndex} + } + } + } + + return nil +} |
