aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-03-07 10:50:09 -0800
committerchrislu <chris.lu@gmail.com>2024-03-07 10:50:09 -0800
commit34f2b600ac5d4850e5f4f6d9d0ce0273150542cf (patch)
tree7c7592ea7415a85b0afba5ca00172156af041ca1
parent62397f23715062b6e8e710568dc8f88b0bab50d8 (diff)
downloadseaweedfs-34f2b600ac5d4850e5f4f6d9d0ce0273150542cf.tar.xz
seaweedfs-34f2b600ac5d4850e5f4f6d9d0ce0273150542cf.zip
each log function adds a "done" return parameter
-rw-r--r--weed/filer/filer_notify.go8
-rw-r--r--weed/mq/broker/broker_grpc_sub.go6
-rw-r--r--weed/mq/broker/broker_topic_partition_read_write.go4
-rw-r--r--weed/server/filer_grpc_server_sub_meta.go10
-rw-r--r--weed/util/log_buffer/log_buffer.go6
-rw-r--r--weed/util/log_buffer/log_read.go8
6 files changed, 23 insertions, 19 deletions
diff --git a/weed/filer/filer_notify.go b/weed/filer/filer_notify.go
index 8fa3eec2d..db78b3d3d 100644
--- a/weed/filer/filer_notify.go
+++ b/weed/filer/filer_notify.go
@@ -87,7 +87,7 @@ func (f *Filer) logMetaEvent(ctx context.Context, fullpath string, eventNotifica
}
-func (f *Filer) logFlushFunc(startTime, stopTime time.Time, buf []byte) {
+func (f *Filer) logFlushFunc(logBuffer *log_buffer.LogBuffer, startTime, stopTime time.Time, buf []byte) {
if len(buf) == 0 {
return
@@ -114,7 +114,7 @@ var (
VolumeNotFoundPattern = regexp.MustCompile(`volume \d+? not found`)
)
-func (f *Filer) ReadPersistedLogBuffer(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (lastTsNs int64, isDone bool, err error) {
+func (f *Filer) ReadPersistedLogBuffer(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastTsNs int64, isDone bool, err error) {
startDate := fmt.Sprintf("%04d-%02d-%02d", startPosition.Year(), startPosition.Month(), startPosition.Day())
startHourMinute := fmt.Sprintf("%02d-%02d", startPosition.Hour(), startPosition.Minute())
@@ -177,7 +177,7 @@ func (f *Filer) ReadPersistedLogBuffer(startPosition log_buffer.MessagePosition,
return lastTsNs, isDone, nil
}
-func ReadEachLogEntry(r io.Reader, sizeBuf []byte, startTsNs, stopTsNs int64, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (lastTsNs int64, err error) {
+func ReadEachLogEntry(r io.Reader, sizeBuf []byte, startTsNs, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastTsNs int64, err error) {
for {
n, err := r.Read(sizeBuf)
if err != nil {
@@ -207,7 +207,7 @@ func ReadEachLogEntry(r io.Reader, sizeBuf []byte, startTsNs, stopTsNs int64, ea
return lastTsNs, err
}
// println("each log: ", logEntry.TsNs)
- if err := eachLogEntryFn(logEntry); err != nil {
+ if _, err := eachLogEntryFn(logEntry); err != nil {
return lastTsNs, err
} else {
lastTsNs = logEntry.TsNs
diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go
index da61ed05d..ade94caf3 100644
--- a/weed/mq/broker/broker_grpc_sub.go
+++ b/weed/mq/broker/broker_grpc_sub.go
@@ -105,7 +105,7 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest
}
return true
- }, func(logEntry *filer_pb.LogEntry) error {
+ }, func(logEntry *filer_pb.LogEntry) (bool, error) {
// reset the sleep interval count
sleepIntervalCount = 0
@@ -118,10 +118,10 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest
},
}}); err != nil {
glog.Errorf("Error sending setup response: %v", err)
- return err
+ return false, err
}
counter++
- return nil
+ return false, nil
})
}
diff --git a/weed/mq/broker/broker_topic_partition_read_write.go b/weed/mq/broker/broker_topic_partition_read_write.go
index 0f53c28c3..4ebb62000 100644
--- a/weed/mq/broker/broker_topic_partition_read_write.go
+++ b/weed/mq/broker/broker_topic_partition_read_write.go
@@ -19,7 +19,7 @@ func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, partition *mq_pb.Par
partitionGeneration := time.Unix(0, partition.UnixTimeNs).UTC().Format(topic.TIME_FORMAT)
partitionDir := fmt.Sprintf("%s/%s/%04d-%04d", topicDir, partitionGeneration, partition.RangeStart, partition.RangeStop)
- return func(startTime, stopTime time.Time, buf []byte) {
+ return func(logBuffer *log_buffer.LogBuffer, startTime, stopTime time.Time, buf []byte) {
if len(buf) == 0 {
return
}
@@ -75,7 +75,7 @@ func (b *MessageQueueBroker) genLogOnDiskReadFunc(t topic.Topic, partition *mq_p
return
}
- if err = eachLogEntryFn(logEntry); err != nil {
+ if _, err = eachLogEntryFn(logEntry); err != nil {
err = fmt.Errorf("process log entry %v: %v", logEntry, err)
return
}
diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go
index 22929879e..8e8b4e5c4 100644
--- a/weed/server/filer_grpc_server_sub_meta.go
+++ b/weed/server/filer_grpc_server_sub_meta.go
@@ -178,19 +178,19 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
}
-func eachLogEntryFn(eachEventNotificationFn func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error) func(logEntry *filer_pb.LogEntry) error {
- return func(logEntry *filer_pb.LogEntry) error {
+func eachLogEntryFn(eachEventNotificationFn func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error) log_buffer.EachLogEntryFuncType {
+ return func(logEntry *filer_pb.LogEntry) (bool, error) {
event := &filer_pb.SubscribeMetadataResponse{}
if err := proto.Unmarshal(logEntry.Data, event); err != nil {
glog.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err)
- return fmt.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err)
+ return false, fmt.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err)
}
if err := eachEventNotificationFn(event.Directory, event.EventNotification, event.TsNs); err != nil {
- return err
+ return false, err
}
- return nil
+ return false, nil
}
}
diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go
index 273df5593..cfd6c94cd 100644
--- a/weed/util/log_buffer/log_buffer.go
+++ b/weed/util/log_buffer/log_buffer.go
@@ -22,8 +22,8 @@ type dataToFlush struct {
data *bytes.Buffer
}
-type EachLogEntryFuncType func(logEntry *filer_pb.LogEntry) error
-type LogFlushFuncType func(startTime, stopTime time.Time, buf []byte)
+type EachLogEntryFuncType func(logEntry *filer_pb.LogEntry) (isDone bool, err error)
+type LogFlushFuncType func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte)
type LogReadFromDiskFuncType func(startPosition MessagePosition, stopTsNs int64, eachLogEntryFn EachLogEntryFuncType) (lastReadPosition MessagePosition, isDone bool, err error)
type LogBuffer struct {
@@ -146,7 +146,7 @@ func (logBuffer *LogBuffer) loopFlush() {
for d := range logBuffer.flushChan {
if d != nil {
// glog.V(4).Infof("%s flush [%v, %v] size %d", m.name, d.startTime, d.stopTime, len(d.data.Bytes()))
- logBuffer.flushFn(d.startTime, d.stopTime, d.data.Bytes())
+ logBuffer.flushFn(logBuffer, d.startTime, d.stopTime, d.data.Bytes())
d.releaseMemory()
// local logbuffer is different from aggregate logbuffer here
logBuffer.lastFlushTime = d.stopTime
diff --git a/weed/util/log_buffer/log_read.go b/weed/util/log_buffer/log_read.go
index 8a4d2d851..5529a6691 100644
--- a/weed/util/log_buffer/log_read.go
+++ b/weed/util/log_buffer/log_read.go
@@ -30,7 +30,7 @@ func NewMessagePosition(tsNs int64, batchIndex int64) MessagePosition {
}
func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startPosition MessagePosition, stopTsNs int64,
- waitForDataFn func() bool, eachLogDataFn func(logEntry *filer_pb.LogEntry) error) (lastReadPosition MessagePosition, isDone bool, err error) {
+ waitForDataFn func() bool, eachLogDataFn EachLogEntryFuncType) (lastReadPosition MessagePosition, isDone bool, err error) {
// loop through all messages
var bytesBuf *bytes.Buffer
var batchIndex int64
@@ -69,6 +69,7 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startPosition
if waitForDataFn() {
continue
} else {
+ isDone = true
return
}
}
@@ -101,10 +102,13 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startPosition
}
lastReadPosition = NewMessagePosition(logEntry.TsNs, batchIndex)
- if err = eachLogDataFn(logEntry); err != nil {
+ if isDone, err = eachLogDataFn(logEntry); err != nil {
glog.Errorf("LoopProcessLogData: %s process log entry %d %v: %v", readerName, batchSize+1, logEntry, err)
return
}
+ if isDone {
+ return
+ }
pos += 4 + int(size)
batchSize++