aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKonstantin Lebedev <9497591+kmlebedev@users.noreply.github.com>2022-09-14 10:33:35 +0500
committerGitHub <noreply@github.com>2022-09-13 22:33:35 -0700
commitf8ef25099c63a5b0603f781e5d514e0c07bbf325 (patch)
tree37d75f295568c930d2a8a55215210aca93a1688f
parent0baf3d38c2e18e9477f835a470462ac6123562ea (diff)
downloadseaweedfs-f8ef25099c63a5b0603f781e5d514e0c07bbf325.tar.xz
seaweedfs-f8ef25099c63a5b0603f781e5d514e0c07bbf325.zip
Removing chunks on failed to write to replicas (#3591)
* Removing chunks on failed to write to replicas https://github.com/seaweedfs/seaweedfs/issues/3578 * put with in the util.Retry * just purge on any errors
-rw-r--r--weed/server/filer_server_handlers_write_upload.go27
-rw-r--r--weed/topology/store_replicate.go2
2 files changed, 19 insertions, 10 deletions
diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go
index 4dc588055..95920583d 100644
--- a/weed/server/filer_server_handlers_write_upload.go
+++ b/weed/server/filer_server_handlers_write_upload.go
@@ -107,7 +107,7 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
wg.Done()
}()
- chunk, toChunkErr := fs.dataToChunk(fileName, contentType, bytesBuffer.Bytes(), offset, so)
+ chunks, toChunkErr := fs.dataToChunk(fileName, contentType, bytesBuffer.Bytes(), offset, so)
if toChunkErr != nil {
uploadErrLock.Lock()
if uploadErr == nil {
@@ -115,12 +115,14 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
}
uploadErrLock.Unlock()
}
- if chunk != nil {
+ if chunks != nil {
fileChunksLock.Lock()
- fileChunks = append(fileChunks, chunk)
- fileChunksSize := len(fileChunks)
+ fileChunksSize := len(fileChunks) + len(chunks)
+ for _, chunk := range chunks {
+ fileChunks = append(fileChunks, chunk)
+ glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d)", fileName, fileChunksSize, chunk.FileId, offset, offset+int64(chunk.Size))
+ }
fileChunksLock.Unlock()
- glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d)", fileName, fileChunksSize, chunk.FileId, offset, offset+int64(chunk.Size))
}
}(chunkOffset)
@@ -169,7 +171,7 @@ func (fs *FilerServer) doUpload(urlLocation string, limitedReader io.Reader, fil
return uploadResult, err, data
}
-func (fs *FilerServer) dataToChunk(fileName, contentType string, data []byte, chunkOffset int64, so *operation.StorageOption) (*filer_pb.FileChunk, error) {
+func (fs *FilerServer) dataToChunk(fileName, contentType string, data []byte, chunkOffset int64, so *operation.StorageOption) ([]*filer_pb.FileChunk, error) {
dataReader := util.NewBytesReader(data)
// retry to assign a different file id
@@ -177,6 +179,7 @@ func (fs *FilerServer) dataToChunk(fileName, contentType string, data []byte, ch
var auth security.EncodedJwt
var uploadErr error
var uploadResult *operation.UploadResult
+ var failedFileChunks []*filer_pb.FileChunk
err := util.Retry("filerDataToChunk", func() error {
// assign one file id for one chunk
@@ -191,19 +194,25 @@ func (fs *FilerServer) dataToChunk(fileName, contentType string, data []byte, ch
if uploadErr != nil {
glog.V(4).Infof("retry later due to upload error: %v", uploadErr)
stats.FilerRequestCounter.WithLabelValues(stats.ChunkDoUploadRetry).Inc()
+ fid, _ := filer_pb.ToFileIdObject(fileId)
+ fileChunk := filer_pb.FileChunk{
+ FileId: fileId,
+ Offset: chunkOffset,
+ Fid: fid,
+ }
+ failedFileChunks = append(failedFileChunks, &fileChunk)
return uploadErr
}
return nil
})
if err != nil {
glog.Errorf("upload error: %v", err)
- return nil, err
+ return failedFileChunks, err
}
// if last chunk exhausted the reader exactly at the border
if uploadResult.Size == 0 {
return nil, nil
}
-
- return uploadResult.ToPbFileChunk(fileId, chunkOffset), nil
+ return []*filer_pb.FileChunk{uploadResult.ToPbFileChunk(fileId, chunkOffset)}, nil
}
diff --git a/weed/topology/store_replicate.go b/weed/topology/store_replicate.go
index 816f1fbe4..bbca859ac 100644
--- a/weed/topology/store_replicate.go
+++ b/weed/topology/store_replicate.go
@@ -197,7 +197,7 @@ func GetWritableRemoteReplications(s *storage.Store, grpcDialOption grpc.DialOpt
}
}
} else {
- err = fmt.Errorf("failed to lookup for %d: %v", volumeId, lookupErr)
+ err = fmt.Errorf("replicating lookup failed for %d: %v", volumeId, lookupErr)
return
}