aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/kafka/protocol/offset_management.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/kafka/protocol/offset_management.go')
-rw-r--r--weed/mq/kafka/protocol/offset_management.go89
1 files changed, 62 insertions, 27 deletions
diff --git a/weed/mq/kafka/protocol/offset_management.go b/weed/mq/kafka/protocol/offset_management.go
index 0a6e724fb..72ad13267 100644
--- a/weed/mq/kafka/protocol/offset_management.go
+++ b/weed/mq/kafka/protocol/offset_management.go
@@ -5,6 +5,7 @@ import (
"fmt"
"time"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/kafka/consumer"
)
@@ -114,11 +115,10 @@ func (h *Handler) handleOffsetCommit(correlationID uint32, apiVersion uint16, re
return h.buildOffsetCommitErrorResponse(correlationID, ErrorCodeInvalidGroupID, apiVersion), nil
}
- // Get consumer group
- group := h.groupCoordinator.GetGroup(req.GroupID)
- if group == nil {
- return h.buildOffsetCommitErrorResponse(correlationID, ErrorCodeInvalidGroupID, apiVersion), nil
- }
+ // Get or create consumer group
+ // Some Kafka clients (like kafka-go Reader) commit offsets without formally joining
+ // the group via JoinGroup/SyncGroup. We need to support these "simple consumer" use cases.
+ group := h.groupCoordinator.GetOrCreateGroup(req.GroupID)
group.Mu.Lock()
defer group.Mu.Unlock()
@@ -126,8 +126,14 @@ func (h *Handler) handleOffsetCommit(correlationID uint32, apiVersion uint16, re
// Update group's last activity
group.LastActivity = time.Now()
- // Require matching generation to store commits; return IllegalGeneration otherwise
- generationMatches := (req.GenerationID == group.Generation)
+ // Check generation compatibility
+ // Allow commits for empty groups (no active members) to support simple consumers
+ // that commit offsets without formal group membership
+ groupIsEmpty := len(group.Members) == 0
+ generationMatches := groupIsEmpty || (req.GenerationID == group.Generation)
+
+ glog.V(3).Infof("[OFFSET_COMMIT] Group check: id=%s reqGen=%d groupGen=%d members=%d empty=%v matches=%v",
+ req.GroupID, req.GenerationID, group.Generation, len(group.Members), groupIsEmpty, generationMatches)
// Process offset commits
resp := OffsetCommitResponse{
@@ -143,7 +149,7 @@ func (h *Handler) handleOffsetCommit(correlationID uint32, apiVersion uint16, re
for _, p := range t.Partitions {
- // Create consumer offset key for SMQ storage
+ // Create consumer offset key for SMQ storage (not used immediately)
key := ConsumerOffsetKey{
Topic: t.Name,
Partition: p.Index,
@@ -151,16 +157,33 @@ func (h *Handler) handleOffsetCommit(correlationID uint32, apiVersion uint16, re
ConsumerGroupInstance: req.GroupInstanceID,
}
- // Commit offset using SMQ storage (persistent to filer)
+ // Commit offset synchronously for immediate consistency
var errCode int16 = ErrorCodeNone
if generationMatches {
- if err := h.commitOffsetToSMQ(key, p.Offset, p.Metadata); err != nil {
+ // Store in in-memory map for immediate response
+ // This is the primary committed offset position for consumers
+ if err := h.commitOffset(group, t.Name, p.Index, p.Offset, p.Metadata); err != nil {
errCode = ErrorCodeOffsetMetadataTooLarge
+ glog.V(2).Infof("[OFFSET_COMMIT] Failed to commit offset: group=%s topic=%s partition=%d offset=%d err=%v",
+ req.GroupID, t.Name, p.Index, p.Offset, err)
} else {
+ // Also persist to SMQ storage for durability across broker restarts
+ // This is done synchronously to ensure offset is not lost
+ if err := h.commitOffsetToSMQ(key, p.Offset, p.Metadata); err != nil {
+ // Log the error but don't fail the commit
+ // In-memory commit is the source of truth for active consumers
+ // SMQ persistence is best-effort for crash recovery
+ glog.V(3).Infof("[OFFSET_COMMIT] SMQ persist failed (non-fatal): group=%s topic=%s partition=%d offset=%d err=%v",
+ req.GroupID, t.Name, p.Index, p.Offset, err)
+ }
+ glog.V(3).Infof("[OFFSET_COMMIT] Committed: group=%s topic=%s partition=%d offset=%d gen=%d",
+ req.GroupID, t.Name, p.Index, p.Offset, group.Generation)
}
} else {
// Do not store commit if generation mismatch
errCode = 22 // IllegalGeneration
+ glog.V(2).Infof("[OFFSET_COMMIT] Rejected - generation mismatch: group=%s expected=%d got=%d members=%d",
+ req.GroupID, group.Generation, req.GenerationID, len(group.Members))
}
topicResp.Partitions = append(topicResp.Partitions, OffsetCommitPartitionResponse{
@@ -187,15 +210,17 @@ func (h *Handler) handleOffsetFetch(correlationID uint32, apiVersion uint16, req
return h.buildOffsetFetchErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil
}
- // Get consumer group
- group := h.groupCoordinator.GetGroup(request.GroupID)
- if group == nil {
- return h.buildOffsetFetchErrorResponse(correlationID, ErrorCodeInvalidGroupID), nil
- }
+ // Get or create consumer group
+ // IMPORTANT: Use GetOrCreateGroup (not GetGroup) to allow fetching persisted offsets
+ // even if the group doesn't exist in memory yet. This is critical for consumer restarts.
+ // Kafka allows offset fetches for groups that haven't joined yet (e.g., simple consumers).
+ group := h.groupCoordinator.GetOrCreateGroup(request.GroupID)
group.Mu.RLock()
defer group.Mu.RUnlock()
+ glog.V(4).Infof("[OFFSET_FETCH] Request: group=%s topics=%d", request.GroupID, len(request.Topics))
+
// Build response
response := OffsetFetchResponse{
CorrelationID: correlationID,
@@ -222,25 +247,35 @@ func (h *Handler) handleOffsetFetch(correlationID uint32, apiVersion uint16, req
// Fetch offsets for requested partitions
for _, partition := range partitionsToFetch {
- // Create consumer offset key for SMQ storage
- key := ConsumerOffsetKey{
- Topic: topic.Name,
- Partition: partition,
- ConsumerGroup: request.GroupID,
- ConsumerGroupInstance: request.GroupInstanceID,
- }
-
var fetchedOffset int64 = -1
var metadata string = ""
var errorCode int16 = ErrorCodeNone
- // Fetch offset directly from SMQ storage (persistent storage)
- // No cache needed - offset fetching is infrequent compared to commits
- if off, meta, err := h.fetchOffsetFromSMQ(key); err == nil && off >= 0 {
+ // Try fetching from in-memory cache first (works for both mock and SMQ backends)
+ if off, meta, err := h.fetchOffset(group, topic.Name, partition); err == nil && off >= 0 {
fetchedOffset = off
metadata = meta
+ glog.V(4).Infof("[OFFSET_FETCH] Found in memory: group=%s topic=%s partition=%d offset=%d",
+ request.GroupID, topic.Name, partition, off)
} else {
- // No offset found in persistent storage (-1 indicates no committed offset)
+ // Fallback: try fetching from SMQ persistent storage
+ // This handles cases where offsets are stored in SMQ but not yet loaded into memory
+ key := ConsumerOffsetKey{
+ Topic: topic.Name,
+ Partition: partition,
+ ConsumerGroup: request.GroupID,
+ ConsumerGroupInstance: request.GroupInstanceID,
+ }
+ if off, meta, err := h.fetchOffsetFromSMQ(key); err == nil && off >= 0 {
+ fetchedOffset = off
+ metadata = meta
+ glog.V(3).Infof("[OFFSET_FETCH] Found in storage: group=%s topic=%s partition=%d offset=%d",
+ request.GroupID, topic.Name, partition, off)
+ } else {
+ glog.V(3).Infof("[OFFSET_FETCH] No offset found: group=%s topic=%s partition=%d (will start from auto.offset.reset)",
+ request.GroupID, topic.Name, partition)
+ }
+ // No offset found in either location (-1 indicates no committed offset)
}
partitionResponse := OffsetFetchPartitionResponse{