aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/command/filer_sync.go18
-rw-r--r--weed/filer/filer_notify.go9
-rw-r--r--weed/replication/sink/filersink/fetch_write.go16
-rw-r--r--weed/replication/sink/filersink/filer_sink.go8
4 files changed, 39 insertions, 12 deletions
diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go
index 00afab535..992b9dd4e 100644
--- a/weed/command/filer_sync.go
+++ b/weed/command/filer_sync.go
@@ -399,7 +399,11 @@ func genProcessFunction(sourcePath string, targetPath string, excludePaths []str
return nil
}
key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
- return dataSink.CreateEntry(key, message.NewEntry, message.Signatures)
+ if err := dataSink.CreateEntry(key, message.NewEntry, message.Signatures); err != nil {
+ return fmt.Errorf("create entry1 : %v", err)
+ } else {
+ return nil
+ }
}
// this is something special?
@@ -427,7 +431,11 @@ func genProcessFunction(sourcePath string, targetPath string, excludePaths []str
}
// create the new entry
newKey := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
- return dataSink.CreateEntry(newKey, message.NewEntry, message.Signatures)
+ if err := dataSink.CreateEntry(newKey, message.NewEntry, message.Signatures); err != nil {
+ return fmt.Errorf("create entry2 : %v", err)
+ } else {
+ return nil
+ }
} else {
// new key is outside of the watched directory
@@ -441,7 +449,11 @@ func genProcessFunction(sourcePath string, targetPath string, excludePaths []str
if strings.HasPrefix(string(sourceNewKey), sourcePath) {
// new key is in the watched directory
key := buildKey(dataSink, message, targetPath, sourceNewKey, sourcePath)
- return dataSink.CreateEntry(key, message.NewEntry, message.Signatures)
+ if err := dataSink.CreateEntry(key, message.NewEntry, message.Signatures); err != nil {
+ return fmt.Errorf("create entry3 : %v", err)
+ } else {
+ return nil
+ }
} else {
// new key is also outside of the watched directory
// skip
diff --git a/weed/filer/filer_notify.go b/weed/filer/filer_notify.go
index 77b659288..f8a1dd603 100644
--- a/weed/filer/filer_notify.go
+++ b/weed/filer/filer_notify.go
@@ -5,6 +5,7 @@ import (
"fmt"
"io"
"math"
+ "regexp"
"strings"
"time"
@@ -108,6 +109,10 @@ func (f *Filer) logFlushFunc(startTime, stopTime time.Time, buf []byte) {
}
}
+var (
+ VolumeNotFoundPattern = regexp.MustCompile(`volume \d+? not found`)
+)
+
func (f *Filer) ReadPersistedLogBuffer(startTime time.Time, stopTsNs int64, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (lastTsNs int64, isDone bool, err error) {
startTime = startTime.UTC()
@@ -159,6 +164,10 @@ func (f *Filer) ReadPersistedLogBuffer(startTime time.Time, stopTsNs int64, each
if err == io.EOF {
continue
}
+ if VolumeNotFoundPattern.MatchString(err.Error()) {
+ glog.Warningf("skipping reading %s: %v", hourMinuteEntry.FullPath, err)
+ continue
+ }
return lastTsNs, isDone, fmt.Errorf("reading %s: %v", hourMinuteEntry.FullPath, err)
}
chunkedFileReader.Close()
diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go
index cd961f147..7d526513e 100644
--- a/weed/replication/sink/filersink/fetch_write.go
+++ b/weed/replication/sink/filersink/fetch_write.go
@@ -27,12 +27,16 @@ func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk, path st
index, source := chunkIndex, sourceChunk
fs.executor.Execute(func() {
defer wg.Done()
- replicatedChunk, e := fs.replicateOneChunk(source, path)
- if e != nil {
- err = e
- return
- }
- replicatedChunks[index] = replicatedChunk
+ util.Retry("replicate chunks", func() error {
+ replicatedChunk, e := fs.replicateOneChunk(source, path)
+ if e != nil {
+ err = e
+ return e
+ }
+ replicatedChunks[index] = replicatedChunk
+ err = nil
+ return nil
+ })
})
}
wg.Wait()
diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go
index de5ff55cc..35b6ffa73 100644
--- a/weed/replication/sink/filersink/filer_sink.go
+++ b/weed/replication/sink/filersink/filer_sink.go
@@ -112,7 +112,7 @@ func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [
Directory: dir,
Name: name,
}
- glog.V(1).Infof("lookup: %v", lookupRequest)
+ // glog.V(1).Infof("lookup: %v", lookupRequest)
if resp, err := filer_pb.LookupEntry(client, lookupRequest); err == nil {
if filer.ETag(resp.Entry) == filer.ETag(entry) {
glog.V(3).Infof("already replicated %s", key)
@@ -125,9 +125,10 @@ func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [
if err != nil {
// only warning here since the source chunk may have been deleted already
glog.Warningf("replicate entry chunks %s: %v", key, err)
+ return nil
}
- glog.V(4).Infof("replicated %s %+v ===> %+v", key, entry.GetChunks(), replicatedChunks)
+ // glog.V(4).Infof("replicated %s %+v ===> %+v", key, entry.GetChunks(), replicatedChunks)
request := &filer_pb.CreateEntryRequest{
Directory: dir,
@@ -205,7 +206,8 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent
// replicate the chunks that are new in the source
replicatedChunks, err := fs.replicateChunks(newChunks, key)
if err != nil {
- return true, fmt.Errorf("replicate %s chunks error: %v", key, err)
+ glog.Warningf("replicate entry chunks %s: %v", key, err)
+ return true, nil
}
existingEntry.Chunks = append(existingEntry.GetChunks(), replicatedChunks...)
existingEntry.Attributes = newEntry.Attributes