diff options
| -rw-r--r-- | .github/workflows/binaries_dev.yml | 3 | ||||
| -rw-r--r-- | weed/server/volume_grpc_copy.go | 1 | ||||
| -rw-r--r-- | weed/shell/command_volume_check_disk.go | 4 | ||||
| -rw-r--r-- | weed/shell/command_volume_fsck.go | 8 | ||||
| -rw-r--r-- | weed/storage/disk_location.go | 2 | ||||
| -rw-r--r-- | weed/storage/volume.go | 18 | ||||
| -rw-r--r-- | weed/storage/volume_vacuum.go | 4 | ||||
| -rw-r--r-- | weed/storage/volume_write.go | 2 |
8 files changed, 33 insertions, 9 deletions
diff --git a/.github/workflows/binaries_dev.yml b/.github/workflows/binaries_dev.yml index 29b1cf6ab..b897732f6 100644 --- a/.github/workflows/binaries_dev.yml +++ b/.github/workflows/binaries_dev.yml @@ -82,7 +82,8 @@ jobs: strategy: matrix: goos: [darwin] - goarch: [amd64, arm64] + # goarch: [amd64, arm64] + goarch: [amd64] steps: diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go index 6ed2724f1..e3ec5b724 100644 --- a/weed/server/volume_grpc_copy.go +++ b/weed/server/volume_grpc_copy.go @@ -279,6 +279,7 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v if uint32(v.CompactionRevision) != req.CompactionRevision && req.CompactionRevision != math.MaxUint32 { return fmt.Errorf("volume %d is compacted", req.VolumeId) } + v.SyncToDisk() fileName = v.FileName(req.Ext) } else { baseFileName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId)) + req.Ext diff --git a/weed/shell/command_volume_check_disk.go b/weed/shell/command_volume_check_disk.go index 53284096d..daa64c1bd 100644 --- a/weed/shell/command_volume_check_disk.go +++ b/weed/shell/command_volume_check_disk.go @@ -45,6 +45,7 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write fsckCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) slowMode := fsckCommand.Bool("slow", false, "slow mode checks all replicas even file counts are the same") verbose := fsckCommand.Bool("v", false, "verbose mode") + volumeId := fsckCommand.Uint("volumeId", 0, "the volume id") applyChanges := fsckCommand.Bool("force", false, "apply the fix") nonRepairThreshold := fsckCommand.Float64("nonRepairThreshold", 0.3, "repair when missing keys is not more than this limit") if err = fsckCommand.Parse(args); err != nil { @@ -70,6 +71,9 @@ func (c *commandVolumeCheckDisk) Do(args []string, commandEnv *CommandEnv, write } for _, replicas := range volumeReplicas { + if *volumeId > 0 && replicas[0].info.Id != uint32(*volumeId) { + continue + } slices.SortFunc(replicas, func(a, b *VolumeReplica) bool { return fileCount(a) > fileCount(b) }) diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go index 1aa33e054..584ce722b 100644 --- a/weed/shell/command_volume_fsck.go +++ b/weed/shell/command_volume_fsck.go @@ -33,7 +33,8 @@ func init() { } type commandVolumeFsck struct { - env *CommandEnv + env *CommandEnv + forcePurging *bool } func (c *commandVolumeFsck) Name() string { @@ -68,6 +69,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io. findMissingChunksInFilerPath := fsckCommand.String("findMissingChunksInFilerPath", "/", "used together with findMissingChunksInFiler") findMissingChunksInVolumeId := fsckCommand.Int("findMissingChunksInVolumeId", 0, "used together with findMissingChunksInFiler") applyPurging := fsckCommand.Bool("reallyDeleteFromVolume", false, "<expert only!> after detection, delete missing data from volumes / delete missing file entries from filer") + c.forcePurging = fsckCommand.Bool("forcePurging", false, "delete missing data from volumes in one replica used together with applyPurging") purgeAbsent := fsckCommand.Bool("reallyDeleteFilerEntries", false, "<expert only!> delete missing file entries from filer if the corresponding volume is missing for any reason, please ensure all still existing/expected volumes are connected! used together with findMissingChunksInFiler") tempPath := fsckCommand.String("tempPath", path.Join(os.TempDir()), "path for temporary idx files") @@ -293,7 +295,7 @@ func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVIn } orphanFileIds := []string{} for fid, foundInAllReplicas := range orphanReplicaFileIds { - if !isSeveralReplicas[volumeId] || (isSeveralReplicas[volumeId] && foundInAllReplicas) { + if !isSeveralReplicas[volumeId] || *c.forcePurging || (isSeveralReplicas[volumeId] && foundInAllReplicas) { orphanFileIds = append(orphanFileIds, fid) } } @@ -301,7 +303,7 @@ func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVIn continue } if verbose { - fmt.Fprintf(writer, "purging process for volume %d", volumeId) + fmt.Fprintf(writer, "purging process for volume %d.\n", volumeId) } if isEcVolumeReplicas[volumeId] { diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go index d618db296..e92810022 100644 --- a/weed/storage/disk_location.go +++ b/weed/storage/disk_location.go @@ -283,7 +283,7 @@ func (l *DiskLocation) UnloadVolume(vid needle.VolumeId) error { func (l *DiskLocation) unmountVolumeByCollection(collectionName string) map[needle.VolumeId]*Volume { deltaVols := make(map[needle.VolumeId]*Volume, 0) for k, v := range l.volumes { - if v.Collection == collectionName && !v.isCompacting { + if v.Collection == collectionName && !v.isCompacting && !v.isCommitCompacting { deltaVols[k] = v } } diff --git a/weed/storage/volume.go b/weed/storage/volume.go index 2dfba55c8..bebeded54 100644 --- a/weed/storage/volume.go +++ b/weed/storage/volume.go @@ -42,7 +42,8 @@ type Volume struct { lastCompactIndexOffset uint64 lastCompactRevision uint16 - isCompacting bool + isCompacting bool + isCommitCompacting bool volumeInfo *volume_server_pb.VolumeInfo location *DiskLocation @@ -190,6 +191,21 @@ func (v *Volume) SetStopping() { } } +func (v *Volume) SyncToDisk() { + v.dataFileAccessLock.Lock() + defer v.dataFileAccessLock.Unlock() + if v.nm != nil { + if err := v.nm.Sync(); err != nil { + glog.Warningf("Volume Close fail to sync volume idx %d", v.Id) + } + } + if v.DataBackend != nil { + if err := v.DataBackend.Sync(); err != nil { + glog.Warningf("Volume Close fail to sync volume %d", v.Id) + } + } +} + // Close cleanly shuts down this volume func (v *Volume) Close() { v.dataFileAccessLock.Lock() diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index 7651420aa..ac6f8f8d5 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -94,9 +94,9 @@ func (v *Volume) CommitCompact() error { } glog.V(0).Infof("Committing volume %d vacuuming...", v.Id) - v.isCompacting = true + v.isCommitCompacting = true defer func() { - v.isCompacting = false + v.isCommitCompacting = false }() v.dataFileAccessLock.Lock() diff --git a/weed/storage/volume_write.go b/weed/storage/volume_write.go index 794b1c125..7a96c9695 100644 --- a/weed/storage/volume_write.go +++ b/weed/storage/volume_write.go @@ -54,7 +54,7 @@ func (v *Volume) isFileUnchanged(n *needle.Needle) bool { // Destroy removes everything related to this volume func (v *Volume) Destroy() (err error) { - if v.isCompacting { + if v.isCompacting || v.isCommitCompacting { err = fmt.Errorf("volume %d is compacting", v.Id) return } |
