diff options
| author | Chris Lu <chrislusf@users.noreply.github.com> | 2025-03-09 23:49:42 -0700 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-03-09 23:49:42 -0700 |
| commit | 02773a61074d1a130419318d05d4d0b027cac4b4 (patch) | |
| tree | 590918137bc7edfd23653e377249c45145ec7e54 /weed/util | |
| parent | 14cb8a24c68ce3fd0d3df716295805a8c5c1b8ef (diff) | |
| download | seaweedfs-02773a61074d1a130419318d05d4d0b027cac4b4.tar.xz seaweedfs-02773a61074d1a130419318d05d4d0b027cac4b4.zip | |
Accumulated changes for message queue (#6600)
* rename
* set agent address
* refactor
* add agent sub
* pub messages
* grpc new client
* can publish records via agent
* send init message with session id
* fmt
* check cancelled request while waiting
* use sessionId
* handle possible nil stream
* subscriber process messages
* separate debug port
* use atomic int64
* less logs
* minor
* skip io.EOF
* rename
* remove unused
* use saved offsets
* do not reuse session, since always session id is new after restart
remove last active ts from SessionEntry
* simplify printing
* purge unused
* just proxy the subscription, skipping the session step
* adjust offset types
* subscribe offset type and possible value
* start after the known tsns
* avoid wrongly set startPosition
* move
* remove
* refactor
* typo
* fix
* fix changed path
Diffstat (limited to 'weed/util')
| -rw-r--r-- | weed/util/log_buffer/log_buffer.go | 8 | ||||
| -rw-r--r-- | weed/util/log_buffer/log_read.go | 11 |
2 files changed, 6 insertions, 13 deletions
diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index 30498f92d..fb1f8dc2f 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -46,7 +46,7 @@ type LogBuffer struct { isStopping *atomic.Bool isAllFlushed bool flushChan chan *dataToFlush - LastTsNs int64 + LastTsNs atomic.Int64 sync.RWMutex } @@ -95,12 +95,12 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin } else { ts = time.Unix(0, processingTsNs) } - if logBuffer.LastTsNs >= processingTsNs { + if logBuffer.LastTsNs.Load() >= processingTsNs { // this is unlikely to happen, but just in case - processingTsNs = logBuffer.LastTsNs + 1 + processingTsNs = logBuffer.LastTsNs.Add(1) ts = time.Unix(0, processingTsNs) } - logBuffer.LastTsNs = processingTsNs + logBuffer.LastTsNs.Store(processingTsNs) logEntry := &filer_pb.LogEntry{ TsNs: processingTsNs, PartitionKeyHash: util.HashToInt32(partitionKey), diff --git a/weed/util/log_buffer/log_read.go b/weed/util/log_buffer/log_read.go index 0d044fc14..cf83de1e5 100644 --- a/weed/util/log_buffer/log_read.go +++ b/weed/util/log_buffer/log_read.go @@ -66,17 +66,10 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startPosition isDone = true return } - logBuffer.RLock() - lastTsNs := logBuffer.LastTsNs - logBuffer.RUnlock() - loopTsNs := lastTsNs // make a copy + lastTsNs := logBuffer.LastTsNs.Load() - for lastTsNs == loopTsNs { + for lastTsNs == logBuffer.LastTsNs.Load() { if waitForDataFn() { - // Update loopTsNs and loop again - logBuffer.RLock() - loopTsNs = logBuffer.LastTsNs - logBuffer.RUnlock() continue } else { isDone = true |
