aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/filer/filechunks.go15
-rw-r--r--weed/replication/sink/filersink/filer_sink.go2
-rw-r--r--weed/s3api/s3api_object_multipart_handlers.go4
-rw-r--r--weed/storage/store_vacuum.go3
-rw-r--r--weed/storage/volume_vacuum.go9
5 files changed, 30 insertions, 3 deletions
diff --git a/weed/filer/filechunks.go b/weed/filer/filechunks.go
index be18d45ac..d18d06f2c 100644
--- a/weed/filer/filechunks.go
+++ b/weed/filer/filechunks.go
@@ -101,6 +101,21 @@ func DoMinusChunks(as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk) {
return
}
+func DoMinusChunksBySourceFileId(as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk) {
+
+ fileIds := make(map[string]bool)
+ for _, interval := range bs {
+ fileIds[interval.GetFileIdString()] = true
+ }
+ for _, chunk := range as {
+ if _, found := fileIds[chunk.GetSourceFileId()]; !found {
+ delta = append(delta, chunk)
+ }
+ }
+
+ return
+}
+
type ChunkView struct {
FileId string
Offset int64
diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go
index c48ab2368..345c7f13b 100644
--- a/weed/replication/sink/filersink/filer_sink.go
+++ b/weed/replication/sink/filersink/filer_sink.go
@@ -199,7 +199,7 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent
// delete the chunks that are deleted from the source
if deleteIncludeChunks {
// remove the deleted chunks. Actual data deletion happens in filer UpdateEntry FindUnusedFileChunks
- existingEntry.Chunks = filer.DoMinusChunks(existingEntry.Chunks, deletedChunks)
+ existingEntry.Chunks = filer.DoMinusChunksBySourceFileId(existingEntry.Chunks, deletedChunks)
}
// replicate the chunks that are new in the source
diff --git a/weed/s3api/s3api_object_multipart_handlers.go b/weed/s3api/s3api_object_multipart_handlers.go
index 8cbaf9393..99c280e13 100644
--- a/weed/s3api/s3api_object_multipart_handlers.go
+++ b/weed/s3api/s3api_object_multipart_handlers.go
@@ -163,13 +163,13 @@ func (s3a *S3ApiServer) ListObjectPartsHandler(w http.ResponseWriter, r *http.Re
UploadId: aws.String(uploadID),
})
- glog.V(2).Infof("ListObjectPartsHandler %s count=%d", string(s3err.EncodeXMLResponse(response)), len(response.Part))
-
if errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode)
return
}
+ glog.V(2).Infof("ListObjectPartsHandler %s count=%d", string(s3err.EncodeXMLResponse(response)), len(response.Part))
+
writeSuccessResponseXML(w, r, response)
}
diff --git a/weed/storage/store_vacuum.go b/weed/storage/store_vacuum.go
index 0d6e0b0f1..cbd716b32 100644
--- a/weed/storage/store_vacuum.go
+++ b/weed/storage/store_vacuum.go
@@ -26,6 +26,9 @@ func (s *Store) CompactVolume(vid needle.VolumeId, preallocate int64, compaction
return fmt.Errorf("volume id %d is not found during compact", vid)
}
func (s *Store) CommitCompactVolume(vid needle.VolumeId) (bool, error) {
+ if s.isStopping {
+ return false, fmt.Errorf("volume id %d skips compact because volume is stopping", vid)
+ }
if v := s.findVolume(vid); v != nil {
return v.IsReadOnly(), v.CommitCompact()
}
diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go
index d686e2b09..56e8beddb 100644
--- a/weed/storage/volume_vacuum.go
+++ b/weed/storage/volume_vacuum.go
@@ -289,6 +289,9 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
return fmt.Errorf("ReadNeedleBlob %s key %d offset %d size %d failed: %v", oldDatFile.Name(), key, increIdxEntry.offset.ToActualOffset(), increIdxEntry.size, err)
}
dstDatBackend.Write(needleBytes)
+ if err := dstDatBackend.Sync(); err != nil {
+ return fmt.Errorf("cannot sync needle %s: %v", dstDatBackend.File.Name(), err)
+ }
util.Uint32toBytes(idxEntryBytes[8:12], uint32(offset/NeedlePaddingSize))
} else { //deleted needle
//fakeDelNeedle 's default Data field is nil
@@ -308,6 +311,12 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
newIdxFileName, err)
}
_, err = idx.Write(idxEntryBytes)
+ if err != nil {
+ return fmt.Errorf("cannot write indexfile %s: %v", newIdxFileName, err)
+ }
+ if err := idx.Sync(); err != nil {
+ return fmt.Errorf("cannot sync indexfile %s: %v", newIdxFileName, err)
+ }
}
return nil