diff options
Diffstat (limited to 'weed/filer')
| -rw-r--r-- | weed/filer/filechunk_manifest.go | 41 | ||||
| -rw-r--r-- | weed/filer/filer.go | 4 | ||||
| -rw-r--r-- | weed/filer/filer_notify.go | 16 | ||||
| -rw-r--r-- | weed/filer/meta_aggregator.go | 5 | ||||
| -rw-r--r-- | weed/filer/reader_cache.go | 3 | ||||
| -rw-r--r-- | weed/filer/stream.go | 2 |
6 files changed, 18 insertions, 53 deletions
diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go index d9d0331be..60a5c538b 100644 --- a/weed/filer/filechunk_manifest.go +++ b/weed/filer/filechunk_manifest.go @@ -5,8 +5,6 @@ import ( "fmt" "io" "math" - "net/url" - "strings" "sync" "time" @@ -122,44 +120,7 @@ func fetchChunkRange(buffer []byte, lookupFileIdFn wdclient.LookupFileIdFunction glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err) return 0, err } - return retriedFetchChunkData(buffer, urlStrings, cipherKey, isGzipped, false, offset) -} - -func retriedFetchChunkData(buffer []byte, urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64) (n int, err error) { - - var shouldRetry bool - - for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 { - for _, urlString := range urlStrings { - n = 0 - if strings.Contains(urlString, "%") { - urlString = url.PathEscape(urlString) - } - shouldRetry, err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, len(buffer), func(data []byte) { - if n < len(buffer) { - x := copy(buffer[n:], data) - n += x - } - }) - if !shouldRetry { - break - } - if err != nil { - glog.V(0).Infof("read %s failed, err: %v", urlString, err) - } else { - break - } - } - if err != nil && shouldRetry { - glog.V(0).Infof("retry reading in %v", waitTime) - time.Sleep(waitTime) - } else { - break - } - } - - return n, err - + return util.RetriedFetchChunkData(buffer, urlStrings, cipherKey, isGzipped, false, offset) } func retriedStreamFetchChunkData(writer io.Writer, urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) (err error) { diff --git a/weed/filer/filer.go b/weed/filer/filer.go index 1c6b3c338..9be8d5259 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -69,7 +69,7 @@ func NewFiler(masters pb.ServerDiscovery, grpcDialOption grpc.DialOption, filerH f.UniqueFilerId = -f.UniqueFilerId } - f.LocalMetaLogBuffer = log_buffer.NewLogBuffer("local", LogFlushInterval, f.logFlushFunc, notifyFn) + f.LocalMetaLogBuffer = log_buffer.NewLogBuffer("local", LogFlushInterval, f.logFlushFunc, nil, notifyFn) f.metaLogCollection = collection f.metaLogReplication = replication @@ -377,6 +377,6 @@ func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, sta } func (f *Filer) Shutdown() { - f.LocalMetaLogBuffer.Shutdown() + f.LocalMetaLogBuffer.ShutdownLogBuffer() f.Store.Shutdown() } diff --git a/weed/filer/filer_notify.go b/weed/filer/filer_notify.go index f8a1dd603..db78b3d3d 100644 --- a/weed/filer/filer_notify.go +++ b/weed/filer/filer_notify.go @@ -3,6 +3,7 @@ package filer import ( "context" "fmt" + "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" "io" "math" "regexp" @@ -86,7 +87,7 @@ func (f *Filer) logMetaEvent(ctx context.Context, fullpath string, eventNotifica } -func (f *Filer) logFlushFunc(startTime, stopTime time.Time, buf []byte) { +func (f *Filer) logFlushFunc(logBuffer *log_buffer.LogBuffer, startTime, stopTime time.Time, buf []byte) { if len(buf) == 0 { return @@ -113,11 +114,10 @@ var ( VolumeNotFoundPattern = regexp.MustCompile(`volume \d+? not found`) ) -func (f *Filer) ReadPersistedLogBuffer(startTime time.Time, stopTsNs int64, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (lastTsNs int64, isDone bool, err error) { +func (f *Filer) ReadPersistedLogBuffer(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastTsNs int64, isDone bool, err error) { - startTime = startTime.UTC() - startDate := fmt.Sprintf("%04d-%02d-%02d", startTime.Year(), startTime.Month(), startTime.Day()) - startHourMinute := fmt.Sprintf("%02d-%02d", startTime.Hour(), startTime.Minute()) + startDate := fmt.Sprintf("%04d-%02d-%02d", startPosition.Year(), startPosition.Month(), startPosition.Day()) + startHourMinute := fmt.Sprintf("%02d-%02d", startPosition.Hour(), startPosition.Minute()) var stopDate, stopHourMinute string if stopTsNs != 0 { stopTime := time.Unix(0, stopTsNs+24*60*60*int64(time.Nanosecond)).UTC() @@ -126,7 +126,7 @@ func (f *Filer) ReadPersistedLogBuffer(startTime time.Time, stopTsNs int64, each } sizeBuf := make([]byte, 4) - startTsNs := startTime.UnixNano() + startTsNs := startPosition.UnixNano() dayEntries, _, listDayErr := f.ListDirectoryEntries(context.Background(), SystemLogDir, startDate, true, math.MaxInt32, "", "", "") if listDayErr != nil { @@ -177,7 +177,7 @@ func (f *Filer) ReadPersistedLogBuffer(startTime time.Time, stopTsNs int64, each return lastTsNs, isDone, nil } -func ReadEachLogEntry(r io.Reader, sizeBuf []byte, startTsNs, stopTsNs int64, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (lastTsNs int64, err error) { +func ReadEachLogEntry(r io.Reader, sizeBuf []byte, startTsNs, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastTsNs int64, err error) { for { n, err := r.Read(sizeBuf) if err != nil { @@ -207,7 +207,7 @@ func ReadEachLogEntry(r io.Reader, sizeBuf []byte, startTsNs, stopTsNs int64, ea return lastTsNs, err } // println("each log: ", logEntry.TsNs) - if err := eachLogEntryFn(logEntry); err != nil { + if _, err := eachLogEntryFn(logEntry); err != nil { return lastTsNs, err } else { lastTsNs = logEntry.TsNs diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go index e18e69216..663fdfe9f 100644 --- a/weed/filer/meta_aggregator.go +++ b/weed/filer/meta_aggregator.go @@ -43,7 +43,7 @@ func NewMetaAggregator(filer *Filer, self pb.ServerAddress, grpcDialOption grpc. peerChans: make(map[pb.ServerAddress]chan struct{}), } t.ListenersCond = sync.NewCond(&t.ListenersLock) - t.MetaLogBuffer = log_buffer.NewLogBuffer("aggr", LogFlushInterval, nil, func() { + t.MetaLogBuffer = log_buffer.NewLogBuffer("aggr", LogFlushInterval, nil, nil, func() { t.ListenersCond.Broadcast() }) return t @@ -188,6 +188,7 @@ func (ma *MetaAggregator) doSubscribeToOneFiler(f *Filer, self pb.ServerAddress, ClientEpoch: atomic.LoadInt32(&ma.filer.UniqueFilerEpoch), }) if err != nil { + glog.V(0).Infof("SubscribeLocalMetadata %v: %v", peer, err) return fmt.Errorf("subscribe: %v", err) } @@ -197,10 +198,12 @@ func (ma *MetaAggregator) doSubscribeToOneFiler(f *Filer, self pb.ServerAddress, return nil } if listenErr != nil { + glog.V(0).Infof("SubscribeLocalMetadata stream %v: %v", peer, listenErr) return listenErr } if err := processEventFn(resp); err != nil { + glog.V(0).Infof("SubscribeLocalMetadata process %v: %v", resp, err) return fmt.Errorf("process %v: %v", resp, err) } diff --git a/weed/filer/reader_cache.go b/weed/filer/reader_cache.go index aabef6e1c..7be54b193 100644 --- a/weed/filer/reader_cache.go +++ b/weed/filer/reader_cache.go @@ -2,6 +2,7 @@ package filer import ( "fmt" + "github.com/seaweedfs/seaweedfs/weed/util" "sync" "sync/atomic" "time" @@ -170,7 +171,7 @@ func (s *SingleChunkCacher) startCaching() { s.data = mem.Allocate(s.chunkSize) - _, s.err = retriedFetchChunkData(s.data, urlStrings, s.cipherKey, s.isGzipped, true, 0) + _, s.err = util.RetriedFetchChunkData(s.data, urlStrings, s.cipherKey, s.isGzipped, true, 0) if s.err != nil { mem.Free(s.data) s.data = nil diff --git a/weed/filer/stream.go b/weed/filer/stream.go index a402bc30c..2686fd833 100644 --- a/weed/filer/stream.go +++ b/weed/filer/stream.go @@ -187,7 +187,7 @@ func ReadAll(buffer []byte, masterClient *wdclient.MasterClient, chunks []*filer return err } - n, err := retriedFetchChunkData(buffer[idx:idx+int(chunkView.ViewSize)], urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk) + n, err := util.RetriedFetchChunkData(buffer[idx:idx+int(chunkView.ViewSize)], urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk) if err != nil { return err } |
