aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/offset/integration.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/offset/integration.go')
-rw-r--r--weed/mq/offset/integration.go7
1 files changed, 7 insertions, 0 deletions
diff --git a/weed/mq/offset/integration.go b/weed/mq/offset/integration.go
index 4b9ee6183..53bc113e7 100644
--- a/weed/mq/offset/integration.go
+++ b/weed/mq/offset/integration.go
@@ -12,6 +12,7 @@ import (
// SMQOffsetIntegration provides integration between offset management and SMQ broker
type SMQOffsetIntegration struct {
mu sync.RWMutex
+ registry *PartitionOffsetRegistry
offsetAssigner *OffsetAssigner
offsetSubscriber *OffsetSubscriber
offsetSeeker *OffsetSeeker
@@ -23,12 +24,18 @@ func NewSMQOffsetIntegration(storage OffsetStorage) *SMQOffsetIntegration {
assigner := &OffsetAssigner{registry: registry}
return &SMQOffsetIntegration{
+ registry: registry,
offsetAssigner: assigner,
offsetSubscriber: NewOffsetSubscriber(registry),
offsetSeeker: NewOffsetSeeker(registry),
}
}
+// Close stops all background checkpoint goroutines and performs final checkpoints
+func (integration *SMQOffsetIntegration) Close() error {
+ return integration.registry.Close()
+}
+
// PublishRecord publishes a record and assigns it an offset
func (integration *SMQOffsetIntegration) PublishRecord(
namespace, topicName string,