aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.github/workflows/binaries_dev.yml3
-rw-r--r--weed/server/volume_grpc_copy.go1
-rw-r--r--weed/shell/command_volume_check_disk.go4
-rw-r--r--weed/shell/command_volume_fsck.go8
-rw-r--r--weed/storage/disk_location.go2
-rw-r--r--weed/storage/volume.go18
-rw-r--r--weed/storage/volume_vacuum.go4
-rw-r--r--weed/storage/volume_write.go2
8 files changed, 33 insertions, 9 deletions
diff --git a/.github/workflows/binaries_dev.yml b/.github/workflows/binaries_dev.yml
index 29b1cf6ab..b897732f6 100644
--- a/.github/workflows/binaries_dev.yml
+++ b/.github/workflows/binaries_dev.yml
@@ -82,7 +82,8 @@ jobs:
strategy:
matrix:
goos: [darwin]
- goarch: [amd64, arm64]
+ # goarch: [amd64, arm64]
+ goarch: [amd64]
steps:
diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go
index 6ed2724f1..e3ec5b724 100644
--- a/weed/server/volume_grpc_copy.go
+++ b/weed/server/volume_grpc_copy.go
@@ -279,6 +279,7 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v
if uint32(v.CompactionRevision) != req.CompactionRevision && req.CompactionRevision != math.MaxUint32 {
return fmt.Errorf("volume %d is compacted", req.VolumeId)
}
+ v.SyncToDisk()
fileName = v.FileName(req.Ext)
} else {
baseFileName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId)) + req.Ext
diff --git a/weed/shell/command_volume_check_disk.go b/weed/shell/command_volume_check_disk.go
index 53284096d..daa64c1bd 100644
--- a/weed/shell/command_volume_check_disk.go
+++ b/weed/shell/command_volume_check_disk.go
@@ -45,6 +45,7 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write
fsckCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
slowMode := fsckCommand.Bool("slow", false, "slow mode checks all replicas even file counts are the same")
verbose := fsckCommand.Bool("v", false, "verbose mode")
+ volumeId := fsckCommand.Uint("volumeId", 0, "the volume id")
applyChanges := fsckCommand.Bool("force", false, "apply the fix")
nonRepairThreshold := fsckCommand.Float64("nonRepairThreshold", 0.3, "repair when missing keys is not more than this limit")
if err = fsckCommand.Parse(args); err != nil {
@@ -70,6 +71,9 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write
}
for _, replicas := range volumeReplicas {
+ if *volumeId > 0 && replicas[0].info.Id != uint32(*volumeId) {
+ continue
+ }
slices.SortFunc(replicas, func(a, b *VolumeReplica) bool {
return fileCount(a) > fileCount(b)
})
diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go
index 1aa33e054..584ce722b 100644
--- a/weed/shell/command_volume_fsck.go
+++ b/weed/shell/command_volume_fsck.go
@@ -33,7 +33,8 @@ func init() {
}
type commandVolumeFsck struct {
- env *CommandEnv
+ env *CommandEnv
+ forcePurging *bool
}
func (c *commandVolumeFsck) Name() string {
@@ -68,6 +69,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
findMissingChunksInFilerPath := fsckCommand.String("findMissingChunksInFilerPath", "/", "used together with findMissingChunksInFiler")
findMissingChunksInVolumeId := fsckCommand.Int("findMissingChunksInVolumeId", 0, "used together with findMissingChunksInFiler")
applyPurging := fsckCommand.Bool("reallyDeleteFromVolume", false, "<expert only!> after detection, delete missing data from volumes / delete missing file entries from filer")
+ c.forcePurging = fsckCommand.Bool("forcePurging", false, "delete missing data from volumes in one replica used together with applyPurging")
purgeAbsent := fsckCommand.Bool("reallyDeleteFilerEntries", false, "<expert only!> delete missing file entries from filer if the corresponding volume is missing for any reason, please ensure all still existing/expected volumes are connected! used together with findMissingChunksInFiler")
tempPath := fsckCommand.String("tempPath", path.Join(os.TempDir()), "path for temporary idx files")
@@ -293,7 +295,7 @@ func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVIn
}
orphanFileIds := []string{}
for fid, foundInAllReplicas := range orphanReplicaFileIds {
- if !isSeveralReplicas[volumeId] || (isSeveralReplicas[volumeId] && foundInAllReplicas) {
+ if !isSeveralReplicas[volumeId] || *c.forcePurging || (isSeveralReplicas[volumeId] && foundInAllReplicas) {
orphanFileIds = append(orphanFileIds, fid)
}
}
@@ -301,7 +303,7 @@ func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVIn
continue
}
if verbose {
- fmt.Fprintf(writer, "purging process for volume %d", volumeId)
+ fmt.Fprintf(writer, "purging process for volume %d.\n", volumeId)
}
if isEcVolumeReplicas[volumeId] {
diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go
index d618db296..e92810022 100644
--- a/weed/storage/disk_location.go
+++ b/weed/storage/disk_location.go
@@ -283,7 +283,7 @@ func (l *DiskLocation) UnloadVolume(vid needle.VolumeId) error {
func (l *DiskLocation) unmountVolumeByCollection(collectionName string) map[needle.VolumeId]*Volume {
deltaVols := make(map[needle.VolumeId]*Volume, 0)
for k, v := range l.volumes {
- if v.Collection == collectionName && !v.isCompacting {
+ if v.Collection == collectionName && !v.isCompacting && !v.isCommitCompacting {
deltaVols[k] = v
}
}
diff --git a/weed/storage/volume.go b/weed/storage/volume.go
index 2dfba55c8..bebeded54 100644
--- a/weed/storage/volume.go
+++ b/weed/storage/volume.go
@@ -42,7 +42,8 @@ type Volume struct {
lastCompactIndexOffset uint64
lastCompactRevision uint16
- isCompacting bool
+ isCompacting bool
+ isCommitCompacting bool
volumeInfo *volume_server_pb.VolumeInfo
location *DiskLocation
@@ -190,6 +191,21 @@ func (v *Volume) SetStopping() {
}
}
+func (v *Volume) SyncToDisk() {
+ v.dataFileAccessLock.Lock()
+ defer v.dataFileAccessLock.Unlock()
+ if v.nm != nil {
+ if err := v.nm.Sync(); err != nil {
+ glog.Warningf("Volume Close fail to sync volume idx %d", v.Id)
+ }
+ }
+ if v.DataBackend != nil {
+ if err := v.DataBackend.Sync(); err != nil {
+ glog.Warningf("Volume Close fail to sync volume %d", v.Id)
+ }
+ }
+}
+
// Close cleanly shuts down this volume
func (v *Volume) Close() {
v.dataFileAccessLock.Lock()
diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go
index 7651420aa..ac6f8f8d5 100644
--- a/weed/storage/volume_vacuum.go
+++ b/weed/storage/volume_vacuum.go
@@ -94,9 +94,9 @@ func (v *Volume) CommitCompact() error {
}
glog.V(0).Infof("Committing volume %d vacuuming...", v.Id)
- v.isCompacting = true
+ v.isCommitCompacting = true
defer func() {
- v.isCompacting = false
+ v.isCommitCompacting = false
}()
v.dataFileAccessLock.Lock()
diff --git a/weed/storage/volume_write.go b/weed/storage/volume_write.go
index 794b1c125..7a96c9695 100644
--- a/weed/storage/volume_write.go
+++ b/weed/storage/volume_write.go
@@ -54,7 +54,7 @@ func (v *Volume) isFileUnchanged(n *needle.Needle) bool {
// Destroy removes everything related to this volume
func (v *Volume) Destroy() (err error) {
- if v.isCompacting {
+ if v.isCompacting || v.isCommitCompacting {
err = fmt.Errorf("volume %d is compacting", v.Id)
return
}