aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/filer/filer.go2
-rw-r--r--weed/filer/filer_notify.go12
-rw-r--r--weed/messaging/broker/broker_grpc_server_subscribe.go6
-rw-r--r--weed/util/file_util.go8
4 files changed, 21 insertions, 7 deletions
diff --git a/weed/filer/filer.go b/weed/filer/filer.go
index f13782031..76d2f3f47 100644
--- a/weed/filer/filer.go
+++ b/weed/filer/filer.go
@@ -44,6 +44,7 @@ type Filer struct {
Signature int32
FilerConf *FilerConf
RemoteStorage *FilerRemoteStorage
+ UniqueFileId uint32
}
func NewFiler(masters []pb.ServerAddress, grpcDialOption grpc.DialOption,
@@ -54,6 +55,7 @@ func NewFiler(masters []pb.ServerAddress, grpcDialOption grpc.DialOption,
GrpcDialOption: grpcDialOption,
FilerConf: NewFilerConf(),
RemoteStorage: NewFilerRemoteStorage(),
+ UniqueFileId: uint32(util.RandomInt32()),
}
f.LocalMetaLogBuffer = log_buffer.NewLogBuffer("local", LogFlushInterval, f.logFlushFunc, notifyFn)
f.metaLogCollection = collection
diff --git a/weed/filer/filer_notify.go b/weed/filer/filer_notify.go
index 7ab101102..e44ddfd59 100644
--- a/weed/filer/filer_notify.go
+++ b/weed/filer/filer_notify.go
@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
+ "math"
"strings"
"time"
@@ -92,8 +93,8 @@ func (f *Filer) logFlushFunc(startTime, stopTime time.Time, buf []byte) {
startTime, stopTime = startTime.UTC(), stopTime.UTC()
- targetFile := fmt.Sprintf("%s/%04d-%02d-%02d/%02d-%02d.segment", SystemLogDir,
- startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(),
+ targetFile := fmt.Sprintf("%s/%04d-%02d-%02d/%02d-%02d.%08x", SystemLogDir,
+ startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), startTime.Minute(), f.UniqueFileId,
// startTime.Second(), startTime.Nanosecond(),
)
@@ -111,7 +112,7 @@ func (f *Filer) ReadPersistedLogBuffer(startTime time.Time, eachLogEntryFn func(
startTime = startTime.UTC()
startDate := fmt.Sprintf("%04d-%02d-%02d", startTime.Year(), startTime.Month(), startTime.Day())
- startHourMinute := fmt.Sprintf("%02d-%02d.segment", startTime.Hour(), startTime.Minute())
+ startHourMinute := fmt.Sprintf("%02d-%02d", startTime.Hour(), startTime.Minute())
sizeBuf := make([]byte, 4)
startTsNs := startTime.UnixNano()
@@ -122,14 +123,15 @@ func (f *Filer) ReadPersistedLogBuffer(startTime time.Time, eachLogEntryFn func(
}
for _, dayEntry := range dayEntries {
// println("checking day", dayEntry.FullPath)
- hourMinuteEntries, _, listHourMinuteErr := f.ListDirectoryEntries(context.Background(), util.NewFullPath(SystemLogDir, dayEntry.Name()), "", false, 24*60, "", "", "")
+ hourMinuteEntries, _, listHourMinuteErr := f.ListDirectoryEntries(context.Background(), util.NewFullPath(SystemLogDir, dayEntry.Name()), "", false, math.MaxInt32, "", "", "")
if listHourMinuteErr != nil {
return lastTsNs, fmt.Errorf("fail to list log %s by day: %v", dayEntry.Name(), listHourMinuteErr)
}
for _, hourMinuteEntry := range hourMinuteEntries {
// println("checking hh-mm", hourMinuteEntry.FullPath)
if dayEntry.Name() == startDate {
- if strings.Compare(hourMinuteEntry.Name(), startHourMinute) < 0 {
+ hourMinute := util.FileNameBase(hourMinuteEntry.Name())
+ if strings.Compare(hourMinute, startHourMinute) < 0 {
continue
}
}
diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go
index d21fb351f..f07a961db 100644
--- a/weed/messaging/broker/broker_grpc_server_subscribe.go
+++ b/weed/messaging/broker/broker_grpc_server_subscribe.go
@@ -2,6 +2,7 @@ package broker
import (
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/util/log_buffer"
"io"
"strings"
@@ -141,7 +142,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTime time.Time, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (err error) {
startTime = startTime.UTC()
startDate := fmt.Sprintf("%04d-%02d-%02d", startTime.Year(), startTime.Month(), startTime.Day())
- startHourMinute := fmt.Sprintf("%02d-%02d.segment", startTime.Hour(), startTime.Minute())
+ startHourMinute := fmt.Sprintf("%02d-%02d", startTime.Hour(), startTime.Minute())
sizeBuf := make([]byte, 4)
startTsNs := startTime.UnixNano()
@@ -153,7 +154,8 @@ func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTim
dayDir := fmt.Sprintf("%s/%s", topicDir, dayEntry.Name)
return filer_pb.List(broker, dayDir, "", func(hourMinuteEntry *filer_pb.Entry, isLast bool) error {
if dayEntry.Name == startDate {
- if strings.Compare(hourMinuteEntry.Name, startHourMinute) < 0 {
+ hourMinute := util.FileNameBase(hourMinuteEntry.Name)
+ if strings.Compare(hourMinute, startHourMinute) < 0 {
return nil
}
}
diff --git a/weed/util/file_util.go b/weed/util/file_util.go
index f83f80265..f9cc4f70b 100644
--- a/weed/util/file_util.go
+++ b/weed/util/file_util.go
@@ -87,3 +87,11 @@ func ResolvePath(path string) string {
return path
}
+
+func FileNameBase(filename string) string {
+ lastDotIndex := strings.LastIndex(filename, ".")
+ if lastDotIndex < 0 {
+ return filename
+ }
+ return filename[:lastDotIndex]
+}