aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-09-25 01:18:44 -0700
committerChris Lu <chris.lu@gmail.com>2021-09-25 01:18:44 -0700
commit2baed2e1e995ad331985a7b8c359e732b223ad3a (patch)
treeb891a9330cb355d55cd8e35424d1eb0101c4b47e
parenta814f3f0a80ac511132bd3ac97356f333f128b1c (diff)
downloadseaweedfs-2baed2e1e995ad331985a7b8c359e732b223ad3a.tar.xz
seaweedfs-2baed2e1e995ad331985a7b8c359e732b223ad3a.zip
avoid possible metadata subscription data loss
Previous implementation append filer logs into one file. So one file is not always sorted, which can lead to miss reading some entries, especially when different filers have different write throughput.
-rw-r--r--weed/filer/filer.go2
-rw-r--r--weed/filer/filer_notify.go12
-rw-r--r--weed/messaging/broker/broker_grpc_server_subscribe.go6
-rw-r--r--weed/util/file_util.go8
4 files changed, 21 insertions, 7 deletions
diff --git a/weed/filer/filer.go b/weed/filer/filer.go
index f13782031..76d2f3f47 100644
--- a/weed/filer/filer.go
+++ b/weed/filer/filer.go
@@ -44,6 +44,7 @@ type Filer struct {
Signature int32
FilerConf *FilerConf
RemoteStorage *FilerRemoteStorage
+ UniqueFileId uint32
}
func NewFiler(masters []pb.ServerAddress, grpcDialOption grpc.DialOption,
@@ -54,6 +55,7 @@ func NewFiler(masters []pb.ServerAddress, grpcDialOption grpc.DialOption,
GrpcDialOption: grpcDialOption,
FilerConf: NewFilerConf(),
RemoteStorage: NewFilerRemoteStorage(),
+ UniqueFileId: uint32(util.RandomInt32()),
}
f.LocalMetaLogBuffer = log_buffer.NewLogBuffer("local", LogFlushInterval, f.logFlushFunc, notifyFn)
f.metaLogCollection = collection
diff --git a/weed/filer/filer_notify.go b/weed/filer/filer_notify.go
index 7ab101102..e44ddfd59 100644
--- a/weed/filer/filer_notify.go
+++ b/weed/filer/filer_notify.go
@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
+ "math"
"strings"
"time"
@@ -92,8 +93,8 @@ func (f *Filer) logFlushFunc(startTime, stopTime time.Time, buf []byte) {
startTime, stopTime = startTime.UTC(), stopTime.UTC()
- targetFile := fmt.Sprintf("%s/%04d-%02d-%02d/%02d-%02d.segment", SystemLogDir,
- startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(),
+ targetFile := fmt.Sprintf("%s/%04d-%02d-%02d/%02d-%02d.%08x", SystemLogDir,
+ startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(), f.UniqueFileId,
// startTime.Second(), startTime.Nanosecond(),
)
@@ -111,7 +112,7 @@ func (f *Filer) ReadPersistedLogBuffer(startTime time.Time, eachLogEntryFn func(
startTime = startTime.UTC()
startDate := fmt.Sprintf("%04d-%02d-%02d", startTime.Year(), startTime.Month(), startTime.Day())
- startHourMinute := fmt.Sprintf("%02d-%02d.segment", startTime.Hour(), startTime.Minute())
+ startHourMinute := fmt.Sprintf("%02d-%02d", startTime.Hour(), startTime.Minute())
sizeBuf := make([]byte, 4)
startTsNs := startTime.UnixNano()
@@ -122,14 +123,15 @@ func (f *Filer) ReadPersistedLogBuffer(startTime time.Time, eachLogEntryFn func(
}
for _, dayEntry := range dayEntries {
// println("checking day", dayEntry.FullPath)
- hourMinuteEntries, _, listHourMinuteErr := f.ListDirectoryEntries(context.Background(), util.NewFullPath(SystemLogDir, dayEntry.Name()), "", false, 24*60, "", "", "")
+ hourMinuteEntries, _, listHourMinuteErr := f.ListDirectoryEntries(context.Background(), util.NewFullPath(SystemLogDir, dayEntry.Name()), "", false, math.MaxInt32, "", "", "")
if listHourMinuteErr != nil {
return lastTsNs, fmt.Errorf("fail to list log %s by day: %v", dayEntry.Name(), listHourMinuteErr)
}
for _, hourMinuteEntry := range hourMinuteEntries {
// println("checking hh-mm", hourMinuteEntry.FullPath)
if dayEntry.Name() == startDate {
- if strings.Compare(hourMinuteEntry.Name(), startHourMinute) < 0 {
+ hourMinute := util.FileNameBase(hourMinuteEntry.Name())
+ if strings.Compare(hourMinute, startHourMinute) < 0 {
continue
}
}
diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go
index d21fb351f..f07a961db 100644
--- a/weed/messaging/broker/broker_grpc_server_subscribe.go
+++ b/weed/messaging/broker/broker_grpc_server_subscribe.go
@@ -2,6 +2,7 @@ package broker
import (
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/util/log_buffer"
"io"
"strings"
@@ -141,7 +142,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTime time.Time, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (err error) {
startTime = startTime.UTC()
startDate := fmt.Sprintf("%04d-%02d-%02d", startTime.Year(), startTime.Month(), startTime.Day())
- startHourMinute := fmt.Sprintf("%02d-%02d.segment", startTime.Hour(), startTime.Minute())
+ startHourMinute := fmt.Sprintf("%02d-%02d", startTime.Hour(), startTime.Minute())
sizeBuf := make([]byte, 4)
startTsNs := startTime.UnixNano()
@@ -153,7 +154,8 @@ func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTim
dayDir := fmt.Sprintf("%s/%s", topicDir, dayEntry.Name)
return filer_pb.List(broker, dayDir, "", func(hourMinuteEntry *filer_pb.Entry, isLast bool) error {
if dayEntry.Name == startDate {
- if strings.Compare(hourMinuteEntry.Name, startHourMinute) < 0 {
+ hourMinute := util.FileNameBase(hourMinuteEntry.Name)
+ if strings.Compare(hourMinute, startHourMinute) < 0 {
return nil
}
}
diff --git a/weed/util/file_util.go b/weed/util/file_util.go
index f83f80265..f9cc4f70b 100644
--- a/weed/util/file_util.go
+++ b/weed/util/file_util.go
@@ -87,3 +87,11 @@ func ResolvePath(path string) string {
return path
}
+
+func FileNameBase(filename string) string {
+ lastDotIndex := strings.LastIndex(filename, ".")
+ if lastDotIndex < 0 {
+ return filename
+ }
+ return filename[:lastDotIndex]
+}