aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/filer/filer_notify_read.go26
-rw-r--r--weed/server/filer_grpc_server_sub_meta.go19
-rw-r--r--weed/util/log_buffer/log_buffer.go6
-rw-r--r--weed/util/time.go13
4 files changed, 50 insertions, 14 deletions
diff --git a/weed/filer/filer_notify_read.go b/weed/filer/filer_notify_read.go
index 115a925e9..ac2c763e6 100644
--- a/weed/filer/filer_notify_read.go
+++ b/weed/filer/filer_notify_read.go
@@ -4,15 +4,16 @@ import (
"container/heap"
"context"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
- "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
- "github.com/seaweedfs/seaweedfs/weed/wdclient"
- "google.golang.org/protobuf/proto"
"io"
"math"
"strings"
"time"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
+ "github.com/seaweedfs/seaweedfs/weed/wdclient"
+ "google.golang.org/protobuf/proto"
+
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/util"
)
@@ -39,6 +40,19 @@ func (f *Filer) collectPersistedLogBuffer(startPosition log_buffer.MessagePositi
}
+func (f *Filer) HasPersistedLogFiles(startPosition log_buffer.MessagePosition) (bool, error) {
+ startDate := fmt.Sprintf("%04d-%02d-%02d", startPosition.Year(), startPosition.Month(), startPosition.Day())
+ dayEntries, _, listDayErr := f.ListDirectoryEntries(context.Background(), SystemLogDir, startDate, true, 1, "", "", "")
+
+ if listDayErr != nil {
+ return false, fmt.Errorf("fail to list log by day: %v", listDayErr)
+ }
+ if len(dayEntries) == 0 {
+ return false, nil
+ }
+ return true, nil
+}
+
// ----------
type LogEntryItem struct {
Entry *filer_pb.LogEntry
@@ -103,7 +117,7 @@ func (o *OrderedLogVisitor) GetNext() (logEntry *filer_pb.LogEntry, err error) {
if nextErr != nil {
if nextErr == io.EOF {
// do nothing since the filer has no more log entries
- }else {
+ } else {
return nil, fmt.Errorf("failed to get next log entry: %v", nextErr)
}
} else {
@@ -230,7 +244,7 @@ func (c *LogFileEntryCollector) collectMore(v *OrderedLogVisitor) (err error) {
if nextErr != nil {
if nextErr == io.EOF {
// do nothing since the filer has no more log entries
- }else {
+ } else {
return fmt.Errorf("failed to get next log entry for %v: %v", entryName, err)
}
} else {
diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go
index 436d6746a..f4c6bfe9d 100644
--- a/weed/server/filer_grpc_server_sub_meta.go
+++ b/weed/server/filer_grpc_server_sub_meta.go
@@ -2,11 +2,12 @@ package weed_server
import (
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/stats"
"strings"
"sync/atomic"
"time"
+ "github.com/seaweedfs/seaweedfs/weed/stats"
+
"google.golang.org/protobuf/proto"
"github.com/seaweedfs/seaweedfs/weed/filer"
@@ -62,8 +63,19 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
return nil
}
+ glog.V(4).Infof("processed to %v: %v", clientName, processedTsNs)
if processedTsNs != 0 {
lastReadTime = log_buffer.NewMessagePosition(processedTsNs, -2)
+ } else {
+ nextDayTs := util.GetNextDayTsNano(lastReadTime.UnixNano())
+ position := log_buffer.NewMessagePosition(nextDayTs, -2)
+ found, err := fs.filer.HasPersistedLogFiles(position)
+ if err != nil {
+ return fmt.Errorf("checking persisted log files: %v", err)
+ }
+ if found {
+ lastReadTime = position
+ }
}
glog.V(4).Infof("read in memory %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
@@ -72,10 +84,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
fs.filer.MetaAggregator.ListenersLock.Lock()
fs.filer.MetaAggregator.ListenersCond.Wait()
fs.filer.MetaAggregator.ListenersLock.Unlock()
- if !fs.hasClient(req.ClientId, req.ClientEpoch) {
- return false
- }
- return true
+ return fs.hasClient(req.ClientId, req.ClientEpoch)
}, eachLogEntryFn)
if readInMemoryLogErr != nil {
if readInMemoryLogErr == log_buffer.ResumeFromDiskError {
diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go
index efe42176e..30498f92d 100644
--- a/weed/util/log_buffer/log_buffer.go
+++ b/weed/util/log_buffer/log_buffer.go
@@ -2,7 +2,6 @@ package log_buffer
import (
"bytes"
- "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"sync"
"sync/atomic"
"time"
@@ -11,11 +10,12 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
)
-const BufferSize = 4 * 1024 * 1024
-const PreviousBufferCount = 3
+const BufferSize = 8 * 1024 * 1024
+const PreviousBufferCount = 32
type dataToFlush struct {
startTime time.Time
diff --git a/weed/util/time.go b/weed/util/time.go
new file mode 100644
index 000000000..8e237b72d
--- /dev/null
+++ b/weed/util/time.go
@@ -0,0 +1,13 @@
+package util
+
+import (
+ "time"
+)
+
+func GetNextDayTsNano(curTs int64) int64 {
+ curTime := time.Unix(0, curTs)
+ nextDay := curTime.AddDate(0, 0, 1).Truncate(24 * time.Hour)
+ nextDayNano := nextDay.UnixNano()
+
+ return nextDayNano
+}