aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/kafka/integration/seaweedmq_handler.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/kafka/integration/seaweedmq_handler.go')
-rw-r--r--weed/mq/kafka/integration/seaweedmq_handler.go119
1 files changed, 53 insertions, 66 deletions
diff --git a/weed/mq/kafka/integration/seaweedmq_handler.go b/weed/mq/kafka/integration/seaweedmq_handler.go
index 7689d0612..0ef659050 100644
--- a/weed/mq/kafka/integration/seaweedmq_handler.go
+++ b/weed/mq/kafka/integration/seaweedmq_handler.go
@@ -13,7 +13,7 @@ import (
// GetStoredRecords retrieves records from SeaweedMQ using the proper subscriber API
// ctx controls the fetch timeout (should match Kafka fetch request's MaxWaitTime)
func (h *SeaweedMQHandler) GetStoredRecords(ctx context.Context, topic string, partition int32, fromOffset int64, maxRecords int) ([]SMQRecord, error) {
- glog.V(2).Infof("[FETCH] GetStoredRecords: topic=%s partition=%d fromOffset=%d maxRecords=%d", topic, partition, fromOffset, maxRecords)
+ glog.V(4).Infof("[FETCH] GetStoredRecords: topic=%s partition=%d fromOffset=%d maxRecords=%d", topic, partition, fromOffset, maxRecords)
// Verify topic exists
if !h.TopicExists(topic) {
@@ -36,24 +36,24 @@ func (h *SeaweedMQHandler) GetStoredRecords(ctx context.Context, topic string, p
if connCtx.BrokerClient != nil {
if bc, ok := connCtx.BrokerClient.(*BrokerClient); ok {
brokerClient = bc
- glog.V(2).Infof("[FETCH] Using per-connection BrokerClient for topic=%s partition=%d", topic, partition)
+ glog.V(4).Infof("[FETCH] Using per-connection BrokerClient for topic=%s partition=%d", topic, partition)
}
}
// Extract consumer group and client ID
if connCtx.ConsumerGroup != "" {
consumerGroup = connCtx.ConsumerGroup
- glog.V(2).Infof("[FETCH] Using actual consumer group from context: %s", consumerGroup)
+ glog.V(4).Infof("[FETCH] Using actual consumer group from context: %s", consumerGroup)
}
if connCtx.MemberID != "" {
// Use member ID as base, but still include topic-partition for uniqueness
consumerID = fmt.Sprintf("%s-%s-%d", connCtx.MemberID, topic, partition)
- glog.V(2).Infof("[FETCH] Using actual member ID from context: %s", consumerID)
+ glog.V(4).Infof("[FETCH] Using actual member ID from context: %s", consumerID)
} else if connCtx.ClientID != "" {
// Fallback to client ID if member ID not set (for clients not using consumer groups)
// Include topic-partition to ensure each partition consumer is unique
consumerID = fmt.Sprintf("%s-%s-%d", connCtx.ClientID, topic, partition)
- glog.V(2).Infof("[FETCH] Using client ID from context: %s", consumerID)
+ glog.V(4).Infof("[FETCH] Using client ID from context: %s", consumerID)
}
}
}
@@ -67,64 +67,44 @@ func (h *SeaweedMQHandler) GetStoredRecords(ctx context.Context, topic string, p
}
}
- // CRITICAL FIX: Reuse existing subscriber if offset matches to avoid concurrent subscriber storm
- // Creating too many concurrent subscribers to the same offset causes the broker to return
- // the same data repeatedly, creating an infinite loop.
- glog.V(2).Infof("[FETCH] Getting or creating subscriber for topic=%s partition=%d fromOffset=%d", topic, partition, fromOffset)
-
- // GetOrCreateSubscriber handles offset mismatches internally
- // If the cached subscriber is at a different offset, it will be recreated automatically
- brokerSubscriber, err := brokerClient.GetOrCreateSubscriber(topic, partition, fromOffset, consumerGroup, consumerID)
- if err != nil {
- glog.Errorf("[FETCH] Failed to get/create subscriber: %v", err)
- return nil, fmt.Errorf("failed to get/create subscriber: %v", err)
- }
- glog.V(2).Infof("[FETCH] Subscriber ready at offset %d", brokerSubscriber.StartOffset)
-
- // NOTE: We DON'T close the subscriber here because we're reusing it across Fetch requests
- // The subscriber will be closed when the connection closes or when a different offset is requested
-
- // Read records using the subscriber
- // CRITICAL: Pass the requested fromOffset to ReadRecords so it can check the cache correctly
- // If the session has advanced past fromOffset, ReadRecords will return cached data
- // Pass context to respect Kafka fetch request's MaxWaitTime
- glog.V(2).Infof("[FETCH] Calling ReadRecords for topic=%s partition=%d fromOffset=%d maxRecords=%d", topic, partition, fromOffset, maxRecords)
- seaweedRecords, err := brokerClient.ReadRecordsFromOffset(ctx, brokerSubscriber, fromOffset, maxRecords)
+ // KAFKA-STYLE STATELESS FETCH (Long-term solution)
+ // Uses FetchMessage RPC - completely stateless, no Subscribe loops
+ //
+ // Benefits:
+ // 1. No session state on broker - each request is independent
+ // 2. No shared Subscribe loops - no concurrent access issues
+ // 3. No stream corruption - no cancel/restart complexity
+ // 4. Safe concurrent reads - like Kafka's file-based reads
+ // 5. Simple and maintainable - just request/response
+ //
+ // Architecture inspired by Kafka:
+ // - Client manages offset tracking
+ // - Each fetch is independent
+ // - Broker reads from LogBuffer without maintaining state
+ // - Natural support for concurrent requests
+ glog.V(4).Infof("[FETCH-STATELESS] Fetching records for topic=%s partition=%d fromOffset=%d maxRecords=%d", topic, partition, fromOffset, maxRecords)
+
+ // Use the new FetchMessage RPC (Kafka-style stateless)
+ seaweedRecords, err := brokerClient.FetchMessagesStateless(ctx, topic, partition, fromOffset, maxRecords, consumerGroup, consumerID)
if err != nil {
- glog.Errorf("[FETCH] ReadRecords failed: %v", err)
- return nil, fmt.Errorf("failed to read records: %v", err)
+ glog.Errorf("[FETCH-STATELESS] Failed to fetch records: %v", err)
+ return nil, fmt.Errorf("failed to fetch records: %v", err)
}
- // CRITICAL FIX: If ReadRecords returns 0 but HWM indicates data exists on disk, force a disk read
- // This handles the case where subscriber advanced past data that was already on disk
- // Only do this ONCE per fetch request to avoid subscriber churn
- if len(seaweedRecords) == 0 {
- hwm, hwmErr := brokerClient.GetHighWaterMark(topic, partition)
- if hwmErr == nil && fromOffset < hwm {
- // Restart the existing subscriber at the requested offset for disk read
- // This is more efficient than closing and recreating
- consumerGroup := "kafka-gateway"
- consumerID := fmt.Sprintf("kafka-gateway-%s-%d", topic, partition)
-
- if err := brokerClient.RestartSubscriber(brokerSubscriber, fromOffset, consumerGroup, consumerID); err != nil {
- return nil, fmt.Errorf("failed to restart subscriber: %v", err)
- }
- // Try reading again from restarted subscriber (will do disk read)
- seaweedRecords, err = brokerClient.ReadRecordsFromOffset(ctx, brokerSubscriber, fromOffset, maxRecords)
- if err != nil {
- return nil, fmt.Errorf("failed to read after restart: %v", err)
- }
- }
- }
-
- glog.V(2).Infof("[FETCH] ReadRecords returned %d records", len(seaweedRecords))
+ glog.V(4).Infof("[FETCH-STATELESS] Fetched %d records", len(seaweedRecords))
//
- // This approach is correct for Kafka protocol:
- // - Clients continuously poll with Fetch requests
- // - If no data is available, we return empty and client will retry
- // - Eventually the data will be read from disk and returned
+ // STATELESS FETCH BENEFITS:
+ // - No broker-side session state = no state synchronization bugs
+ // - No Subscribe loops = no concurrent access to LogBuffer
+ // - No stream corruption = no cancel/restart issues
+ // - Natural concurrent access = like Kafka file reads
+ // - Simple architecture = easier to maintain and debug
//
- // We only recreate subscriber if the offset mismatches, which is handled earlier in this function
+ // EXPECTED RESULTS:
+ // - <1% message loss (only from consumer rebalancing)
+ // - No duplicates (no stream corruption)
+ // - Low latency (direct LogBuffer reads)
+ // - No context timeouts (no stream initialization overhead)
// Convert SeaweedMQ records to SMQRecord interface with proper Kafka offsets
smqRecords := make([]SMQRecord, 0, len(seaweedRecords))
@@ -136,7 +116,7 @@ func (h *SeaweedMQHandler) GetStoredRecords(ctx context.Context, topic string, p
// CRITICAL: Skip records before the requested offset
// This can happen when the subscriber cache returns old data
if kafkaOffset < fromOffset {
- glog.V(2).Infof("[FETCH] Skipping record %d with offset %d (requested fromOffset=%d)", i, kafkaOffset, fromOffset)
+ glog.V(4).Infof("[FETCH] Skipping record %d with offset %d (requested fromOffset=%d)", i, kafkaOffset, fromOffset)
continue
}
@@ -151,7 +131,7 @@ func (h *SeaweedMQHandler) GetStoredRecords(ctx context.Context, topic string, p
glog.V(4).Infof("[FETCH] Record %d: offset=%d, keyLen=%d, valueLen=%d", i, kafkaOffset, len(seaweedRecord.Key), len(seaweedRecord.Value))
}
- glog.V(2).Infof("[FETCH] Successfully read %d records from SMQ", len(smqRecords))
+ glog.V(4).Infof("[FETCH] Successfully read %d records from SMQ", len(smqRecords))
return smqRecords, nil
}
@@ -192,6 +172,7 @@ func (h *SeaweedMQHandler) GetLatestOffset(topic string, partition int32) (int64
if time.Now().Before(entry.expiresAt) {
// Cache hit - return cached value
h.hwmCacheMu.RUnlock()
+ glog.V(2).Infof("[HWM] Cache HIT for %s: hwm=%d", cacheKey, entry.value)
return entry.value, nil
}
}
@@ -199,11 +180,15 @@ func (h *SeaweedMQHandler) GetLatestOffset(topic string, partition int32) (int64
// Cache miss or expired - query SMQ broker
if h.brokerClient != nil {
+ glog.V(2).Infof("[HWM] Cache MISS for %s, querying broker...", cacheKey)
latestOffset, err := h.brokerClient.GetHighWaterMark(topic, partition)
if err != nil {
+ glog.V(1).Infof("[HWM] ERROR querying broker for %s: %v", cacheKey, err)
return 0, err
}
+ glog.V(2).Infof("[HWM] Broker returned hwm=%d for %s", latestOffset, cacheKey)
+
// Update cache
h.hwmCacheMu.Lock()
h.hwmCache[cacheKey] = &hwmCacheEntry{
@@ -236,7 +221,8 @@ func (h *SeaweedMQHandler) GetFilerAddress() string {
}
// ProduceRecord publishes a record to SeaweedMQ and lets SMQ generate the offset
-func (h *SeaweedMQHandler) ProduceRecord(topic string, partition int32, key []byte, value []byte) (int64, error) {
+// ctx controls the publish timeout - if client cancels, broker operation is cancelled
+func (h *SeaweedMQHandler) ProduceRecord(ctx context.Context, topic string, partition int32, key []byte, value []byte) (int64, error) {
if len(key) > 0 {
}
if len(value) > 0 {
@@ -257,7 +243,7 @@ func (h *SeaweedMQHandler) ProduceRecord(topic string, partition int32, key []by
if h.brokerClient == nil {
publishErr = fmt.Errorf("no broker client available")
} else {
- smqOffset, publishErr = h.brokerClient.PublishRecord(topic, partition, key, value, timestamp)
+ smqOffset, publishErr = h.brokerClient.PublishRecord(ctx, topic, partition, key, value, timestamp)
}
if publishErr != nil {
@@ -278,7 +264,8 @@ func (h *SeaweedMQHandler) ProduceRecord(topic string, partition int32, key []by
// ProduceRecordValue produces a record using RecordValue format to SeaweedMQ
// ALWAYS uses broker's assigned offset - no ledger involved
-func (h *SeaweedMQHandler) ProduceRecordValue(topic string, partition int32, key []byte, recordValueBytes []byte) (int64, error) {
+// ctx controls the publish timeout - if client cancels, broker operation is cancelled
+func (h *SeaweedMQHandler) ProduceRecordValue(ctx context.Context, topic string, partition int32, key []byte, recordValueBytes []byte) (int64, error) {
// Verify topic exists
if !h.TopicExists(topic) {
return 0, fmt.Errorf("topic %s does not exist", topic)
@@ -293,7 +280,7 @@ func (h *SeaweedMQHandler) ProduceRecordValue(topic string, partition int32, key
if h.brokerClient == nil {
publishErr = fmt.Errorf("no broker client available")
} else {
- smqOffset, publishErr = h.brokerClient.PublishRecordValue(topic, partition, key, recordValueBytes, timestamp)
+ smqOffset, publishErr = h.brokerClient.PublishRecordValue(ctx, topic, partition, key, recordValueBytes, timestamp)
}
if publishErr != nil {
@@ -351,8 +338,8 @@ func (h *SeaweedMQHandler) FetchRecords(topic string, partition int32, fetchOffs
if subErr != nil {
return nil, fmt.Errorf("failed to get broker subscriber: %v", subErr)
}
- // This is a deprecated function, use background context
- seaweedRecords, err = h.brokerClient.ReadRecords(context.Background(), brokerSubscriber, recordsToFetch)
+ // Use ReadRecordsFromOffset which handles caching and proper locking
+ seaweedRecords, err = h.brokerClient.ReadRecordsFromOffset(context.Background(), brokerSubscriber, fetchOffset, recordsToFetch)
if err != nil {
// If no records available, return empty batch instead of error