diff options
| -rw-r--r-- | weed/filer/filechunks.go | 15 | ||||
| -rw-r--r-- | weed/replication/sink/filersink/filer_sink.go | 2 | ||||
| -rw-r--r-- | weed/s3api/s3api_object_multipart_handlers.go | 4 | ||||
| -rw-r--r-- | weed/storage/store_vacuum.go | 3 | ||||
| -rw-r--r-- | weed/storage/volume_vacuum.go | 9 |
5 files changed, 30 insertions, 3 deletions
diff --git a/weed/filer/filechunks.go b/weed/filer/filechunks.go index be18d45ac..d18d06f2c 100644 --- a/weed/filer/filechunks.go +++ b/weed/filer/filechunks.go @@ -101,6 +101,21 @@ func DoMinusChunks(as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk) { return } +func DoMinusChunksBySourceFileId(as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk) { + + fileIds := make(map[string]bool) + for _, interval := range bs { + fileIds[interval.GetFileIdString()] = true + } + for _, chunk := range as { + if _, found := fileIds[chunk.GetSourceFileId()]; !found { + delta = append(delta, chunk) + } + } + + return +} + type ChunkView struct { FileId string Offset int64 diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go index c48ab2368..345c7f13b 100644 --- a/weed/replication/sink/filersink/filer_sink.go +++ b/weed/replication/sink/filersink/filer_sink.go @@ -199,7 +199,7 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent // delete the chunks that are deleted from the source if deleteIncludeChunks { // remove the deleted chunks. Actual data deletion happens in filer UpdateEntry FindUnusedFileChunks - existingEntry.Chunks = filer.DoMinusChunks(existingEntry.Chunks, deletedChunks) + existingEntry.Chunks = filer.DoMinusChunksBySourceFileId(existingEntry.Chunks, deletedChunks) } // replicate the chunks that are new in the source diff --git a/weed/s3api/s3api_object_multipart_handlers.go b/weed/s3api/s3api_object_multipart_handlers.go index 8cbaf9393..99c280e13 100644 --- a/weed/s3api/s3api_object_multipart_handlers.go +++ b/weed/s3api/s3api_object_multipart_handlers.go @@ -163,13 +163,13 @@ func (s3a *S3ApiServer) ListObjectPartsHandler(w http.ResponseWriter, r *http.Re UploadId: aws.String(uploadID), }) - glog.V(2).Infof("ListObjectPartsHandler %s count=%d", string(s3err.EncodeXMLResponse(response)), len(response.Part)) - if errCode != s3err.ErrNone { s3err.WriteErrorResponse(w, r, errCode) return } + glog.V(2).Infof("ListObjectPartsHandler %s count=%d", string(s3err.EncodeXMLResponse(response)), len(response.Part)) + writeSuccessResponseXML(w, r, response) } diff --git a/weed/storage/store_vacuum.go b/weed/storage/store_vacuum.go index 0d6e0b0f1..cbd716b32 100644 --- a/weed/storage/store_vacuum.go +++ b/weed/storage/store_vacuum.go @@ -26,6 +26,9 @@ func (s *Store) CompactVolume(vid needle.VolumeId, preallocate int64, compaction return fmt.Errorf("volume id %d is not found during compact", vid) } func (s *Store) CommitCompactVolume(vid needle.VolumeId) (bool, error) { + if s.isStopping { + return false, fmt.Errorf("volume id %d skips compact because volume is stopping", vid) + } if v := s.findVolume(vid); v != nil { return v.IsReadOnly(), v.CommitCompact() } diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index d686e2b09..56e8beddb 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -289,6 +289,9 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI return fmt.Errorf("ReadNeedleBlob %s key %d offset %d size %d failed: %v", oldDatFile.Name(), key, increIdxEntry.offset.ToActualOffset(), increIdxEntry.size, err) } dstDatBackend.Write(needleBytes) + if err := dstDatBackend.Sync(); err != nil { + return fmt.Errorf("cannot sync needle %s: %v", dstDatBackend.File.Name(), err) + } util.Uint32toBytes(idxEntryBytes[8:12], uint32(offset/NeedlePaddingSize)) } else { //deleted needle //fakeDelNeedle 's default Data field is nil @@ -308,6 +311,12 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI newIdxFileName, err) } _, err = idx.Write(idxEntryBytes) + if err != nil { + return fmt.Errorf("cannot write indexfile %s: %v", newIdxFileName, err) + } + if err := idx.Sync(); err != nil { + return fmt.Errorf("cannot sync indexfile %s: %v", newIdxFileName, err) + } } return nil |
