diff options
Diffstat (limited to 'weed/mq/offset/integration.go')
| -rw-r--r-- | weed/mq/offset/integration.go | 7 |
1 files changed, 7 insertions, 0 deletions
diff --git a/weed/mq/offset/integration.go b/weed/mq/offset/integration.go index 4b9ee6183..53bc113e7 100644 --- a/weed/mq/offset/integration.go +++ b/weed/mq/offset/integration.go @@ -12,6 +12,7 @@ import ( // SMQOffsetIntegration provides integration between offset management and SMQ broker type SMQOffsetIntegration struct { mu sync.RWMutex + registry *PartitionOffsetRegistry offsetAssigner *OffsetAssigner offsetSubscriber *OffsetSubscriber offsetSeeker *OffsetSeeker @@ -23,12 +24,18 @@ func NewSMQOffsetIntegration(storage OffsetStorage) *SMQOffsetIntegration { assigner := &OffsetAssigner{registry: registry} return &SMQOffsetIntegration{ + registry: registry, offsetAssigner: assigner, offsetSubscriber: NewOffsetSubscriber(registry), offsetSeeker: NewOffsetSeeker(registry), } } +// Close stops all background checkpoint goroutines and performs final checkpoints +func (integration *SMQOffsetIntegration) Close() error { + return integration.registry.Close() +} + // PublishRecord publishes a record and assigns it an offset func (integration *SMQOffsetIntegration) PublishRecord( namespace, topicName string, |
