aboutsummaryrefslogtreecommitdiff
path: root/weed/filer
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer')
-rw-r--r--weed/filer/filechunk_manifest.go41
-rw-r--r--weed/filer/filer.go4
-rw-r--r--weed/filer/filer_notify.go16
-rw-r--r--weed/filer/meta_aggregator.go5
-rw-r--r--weed/filer/reader_cache.go3
-rw-r--r--weed/filer/stream.go2
6 files changed, 18 insertions, 53 deletions
diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go
index d9d0331be..60a5c538b 100644
--- a/weed/filer/filechunk_manifest.go
+++ b/weed/filer/filechunk_manifest.go
@@ -5,8 +5,6 @@ import (
"fmt"
"io"
"math"
- "net/url"
- "strings"
"sync"
"time"
@@ -122,44 +120,7 @@ func fetchChunkRange(buffer []byte, lookupFileIdFn wdclient.LookupFileIdFunction
glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err)
return 0, err
}
- return retriedFetchChunkData(buffer, urlStrings, cipherKey, isGzipped, false, offset)
-}
-
-func retriedFetchChunkData(buffer []byte, urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64) (n int, err error) {
-
- var shouldRetry bool
-
- for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 {
- for _, urlString := range urlStrings {
- n = 0
- if strings.Contains(urlString, "%") {
- urlString = url.PathEscape(urlString)
- }
- shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, len(buffer), func(data []byte) {
- if n < len(buffer) {
- x := copy(buffer[n:], data)
- n += x
- }
- })
- if !shouldRetry {
- break
- }
- if err != nil {
- glog.V(0).Infof("read %s failed, err: %v", urlString, err)
- } else {
- break
- }
- }
- if err != nil && shouldRetry {
- glog.V(0).Infof("retry reading in %v", waitTime)
- time.Sleep(waitTime)
- } else {
- break
- }
- }
-
- return n, err
-
+ return util.RetriedFetchChunkData(buffer, urlStrings, cipherKey, isGzipped, false, offset)
}
func retriedStreamFetchChunkData(writer io.Writer, urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) (err error) {
diff --git a/weed/filer/filer.go b/weed/filer/filer.go
index 1c6b3c338..9be8d5259 100644
--- a/weed/filer/filer.go
+++ b/weed/filer/filer.go
@@ -69,7 +69,7 @@ func NewFiler(masters pb.ServerDiscovery, grpcDialOption grpc.DialOption, filerH
f.UniqueFilerId = -f.UniqueFilerId
}
- f.LocalMetaLogBuffer = log_buffer.NewLogBuffer("local", LogFlushInterval, f.logFlushFunc, notifyFn)
+ f.LocalMetaLogBuffer = log_buffer.NewLogBuffer("local", LogFlushInterval, f.logFlushFunc, nil, notifyFn)
f.metaLogCollection = collection
f.metaLogReplication = replication
@@ -377,6 +377,6 @@ func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, sta
}
func (f *Filer) Shutdown() {
- f.LocalMetaLogBuffer.Shutdown()
+ f.LocalMetaLogBuffer.ShutdownLogBuffer()
f.Store.Shutdown()
}
diff --git a/weed/filer/filer_notify.go b/weed/filer/filer_notify.go
index f8a1dd603..db78b3d3d 100644
--- a/weed/filer/filer_notify.go
+++ b/weed/filer/filer_notify.go
@@ -3,6 +3,7 @@ package filer
import (
"context"
"fmt"
+ "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
"io"
"math"
"regexp"
@@ -86,7 +87,7 @@ func (f *Filer) logMetaEvent(ctx context.Context, fullpath string, eventNotifica
}
-func (f *Filer) logFlushFunc(startTime, stopTime time.Time, buf []byte) {
+func (f *Filer) logFlushFunc(logBuffer *log_buffer.LogBuffer, startTime, stopTime time.Time, buf []byte) {
if len(buf) == 0 {
return
@@ -113,11 +114,10 @@ var (
VolumeNotFoundPattern = regexp.MustCompile(`volume \d+? not found`)
)
-func (f *Filer) ReadPersistedLogBuffer(startTime time.Time, stopTsNs int64, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (lastTsNs int64, isDone bool, err error) {
+func (f *Filer) ReadPersistedLogBuffer(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastTsNs int64, isDone bool, err error) {
- startTime = startTime.UTC()
- startDate := fmt.Sprintf("%04d-%02d-%02d", startTime.Year(), startTime.Month(), startTime.Day())
- startHourMinute := fmt.Sprintf("%02d-%02d", startTime.Hour(), startTime.Minute())
+ startDate := fmt.Sprintf("%04d-%02d-%02d", startPosition.Year(), startPosition.Month(), startPosition.Day())
+ startHourMinute := fmt.Sprintf("%02d-%02d", startPosition.Hour(), startPosition.Minute())
var stopDate, stopHourMinute string
if stopTsNs != 0 {
stopTime := time.Unix(0, stopTsNs+24*60*60*int64(time.Nanosecond)).UTC()
@@ -126,7 +126,7 @@ func (f *Filer) ReadPersistedLogBuffer(startTime time.Time, stopTsNs int64, each
}
sizeBuf := make([]byte, 4)
- startTsNs := startTime.UnixNano()
+ startTsNs := startPosition.UnixNano()
dayEntries, _, listDayErr := f.ListDirectoryEntries(context.Background(), SystemLogDir, startDate, true, math.MaxInt32, "", "", "")
if listDayErr != nil {
@@ -177,7 +177,7 @@ func (f *Filer) ReadPersistedLogBuffer(startTime time.Time, stopTsNs int64, each
return lastTsNs, isDone, nil
}
-func ReadEachLogEntry(r io.Reader, sizeBuf []byte, startTsNs, stopTsNs int64, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (lastTsNs int64, err error) {
+func ReadEachLogEntry(r io.Reader, sizeBuf []byte, startTsNs, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastTsNs int64, err error) {
for {
n, err := r.Read(sizeBuf)
if err != nil {
@@ -207,7 +207,7 @@ func ReadEachLogEntry(r io.Reader, sizeBuf []byte, startTsNs, stopTsNs int64, ea
return lastTsNs, err
}
// println("each log: ", logEntry.TsNs)
- if err := eachLogEntryFn(logEntry); err != nil {
+ if _, err := eachLogEntryFn(logEntry); err != nil {
return lastTsNs, err
} else {
lastTsNs = logEntry.TsNs
diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go
index e18e69216..663fdfe9f 100644
--- a/weed/filer/meta_aggregator.go
+++ b/weed/filer/meta_aggregator.go
@@ -43,7 +43,7 @@ func NewMetaAggregator(filer *Filer, self pb.ServerAddress, grpcDialOption grpc.
peerChans: make(map[pb.ServerAddress]chan struct{}),
}
t.ListenersCond = sync.NewCond(&t.ListenersLock)
- t.MetaLogBuffer = log_buffer.NewLogBuffer("aggr", LogFlushInterval, nil, func() {
+ t.MetaLogBuffer = log_buffer.NewLogBuffer("aggr", LogFlushInterval, nil, nil, func() {
t.ListenersCond.Broadcast()
})
return t
@@ -188,6 +188,7 @@ func (ma *MetaAggregator) doSubscribeToOneFiler(f *Filer, self pb.ServerAddress,
ClientEpoch: atomic.LoadInt32(&ma.filer.UniqueFilerEpoch),
})
if err != nil {
+ glog.V(0).Infof("SubscribeLocalMetadata %v: %v", peer, err)
return fmt.Errorf("subscribe: %v", err)
}
@@ -197,10 +198,12 @@ func (ma *MetaAggregator) doSubscribeToOneFiler(f *Filer, self pb.ServerAddress,
return nil
}
if listenErr != nil {
+ glog.V(0).Infof("SubscribeLocalMetadata stream %v: %v", peer, listenErr)
return listenErr
}
if err := processEventFn(resp); err != nil {
+ glog.V(0).Infof("SubscribeLocalMetadata process %v: %v", resp, err)
return fmt.Errorf("process %v: %v", resp, err)
}
diff --git a/weed/filer/reader_cache.go b/weed/filer/reader_cache.go
index aabef6e1c..7be54b193 100644
--- a/weed/filer/reader_cache.go
+++ b/weed/filer/reader_cache.go
@@ -2,6 +2,7 @@ package filer
import (
"fmt"
+ "github.com/seaweedfs/seaweedfs/weed/util"
"sync"
"sync/atomic"
"time"
@@ -170,7 +171,7 @@ func (s *SingleChunkCacher) startCaching() {
s.data = mem.Allocate(s.chunkSize)
- _, s.err = retriedFetchChunkData(s.data, urlStrings, s.cipherKey, s.isGzipped, true, 0)
+ _, s.err = util.RetriedFetchChunkData(s.data, urlStrings, s.cipherKey, s.isGzipped, true, 0)
if s.err != nil {
mem.Free(s.data)
s.data = nil
diff --git a/weed/filer/stream.go b/weed/filer/stream.go
index a402bc30c..2686fd833 100644
--- a/weed/filer/stream.go
+++ b/weed/filer/stream.go
@@ -187,7 +187,7 @@ func ReadAll(buffer []byte, masterClient *wdclient.MasterClient, chunks []*filer
return err
}
- n, err := retriedFetchChunkData(buffer[idx:idx+int(chunkView.ViewSize)], urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk)
+ n, err := util.RetriedFetchChunkData(buffer[idx:idx+int(chunkView.ViewSize)], urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk)
if err != nil {
return err
}