diff options
| author | chrislu <chris.lu@gmail.com> | 2024-06-04 21:12:30 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2024-06-04 21:12:30 -0700 |
| commit | 9f02bf4ede081d59dd12f5203518e61e689b1ddb (patch) | |
| tree | 2cba26b062dfdb2dcd58f475bb736da68287405e /weed/shell | |
| parent | 41cc825ddbeaea3659e6899f1753be83554ae9fe (diff) | |
| parent | 1f2dc54647f8ab84f2e8e9cd9aa3632760cee869 (diff) | |
| download | seaweedfs-9f02bf4ede081d59dd12f5203518e61e689b1ddb.tar.xz seaweedfs-9f02bf4ede081d59dd12f5203518e61e689b1ddb.zip | |
Merge branch 'master' into mq
Diffstat (limited to 'weed/shell')
| -rw-r--r-- | weed/shell/command_ec_encode.go | 4 | ||||
| -rw-r--r-- | weed/shell/command_volume_fsck.go | 51 | ||||
| -rw-r--r-- | weed/shell/command_volume_tier_upload.go | 6 |
3 files changed, 43 insertions, 18 deletions
diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index c2b2074e4..16de2ce73 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -304,6 +304,10 @@ func collectVolumeIdsForEcEncode(commandEnv *CommandEnv, selectedCollection stri eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { for _, diskInfo := range dn.DiskInfos { for _, v := range diskInfo.VolumeInfos { + // ignore remote volumes + if v.RemoteStorageName != "" && v.RemoteStorageKey != "" { + continue + } if v.Collection == selectedCollection && v.ModifiedAtSecond+quietSeconds < nowUnixSeconds { if float64(v.Size) > fullPercentage/100*float64(volumeSizeLimitMb)*1024*1024 { vidMap[v.Id] = true diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go index 8916e90bd..d85a9e13f 100644 --- a/weed/shell/command_volume_fsck.go +++ b/weed/shell/command_volume_fsck.go @@ -19,6 +19,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/storage/needle_map" "github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/util" + "golang.org/x/sync/errgroup" "io" "math" "net/http" @@ -137,32 +138,46 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io. return fmt.Errorf("read filer buckets path: %v", err) } - collectCutoffFromAtNs := time.Now().Add(-*cutoffTimeAgo).UnixNano() + var collectCutoffFromAtNs int64 = 0 + if cutoffTimeAgo.Seconds() != 0 { + collectCutoffFromAtNs = time.Now().Add(-*cutoffTimeAgo).UnixNano() + } var collectModifyFromAtNs int64 = 0 if modifyTimeAgo.Seconds() != 0 { collectModifyFromAtNs = time.Now().Add(-*modifyTimeAgo).UnixNano() } // collect each volume file ids - for dataNodeId, volumeIdToVInfo := range dataNodeVolumeIdToVInfo { - for volumeId, vinfo := range volumeIdToVInfo { - if len(c.volumeIds) > 0 { - if _, ok := c.volumeIds[volumeId]; !ok { + eg, gCtx := errgroup.WithContext(context.Background()) + _ = gCtx + for _dataNodeId, _volumeIdToVInfo := range dataNodeVolumeIdToVInfo { + dataNodeId, volumeIdToVInfo := _dataNodeId, _volumeIdToVInfo + eg.Go(func() error { + for volumeId, vinfo := range volumeIdToVInfo { + if len(c.volumeIds) > 0 { + if _, ok := c.volumeIds[volumeId]; !ok { + delete(volumeIdToVInfo, volumeId) + continue + } + } + if *c.collection != "" && vinfo.collection != *c.collection { delete(volumeIdToVInfo, volumeId) continue } + err = c.collectOneVolumeFileIds(dataNodeId, volumeId, vinfo, uint64(collectModifyFromAtNs), uint64(collectCutoffFromAtNs)) + if err != nil { + return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, err) + } } - if *c.collection != "" && vinfo.collection != *c.collection { - delete(volumeIdToVInfo, volumeId) - continue - } - err = c.collectOneVolumeFileIds(dataNodeId, volumeId, vinfo, uint64(collectModifyFromAtNs), uint64(collectCutoffFromAtNs)) - if err != nil { - return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, err) + if *c.verbose { + fmt.Fprintf(c.writer, "dn %+v filtred %d volumes and locations.\n", dataNodeId, len(dataNodeVolumeIdToVInfo[dataNodeId])) } - } - if *c.verbose { - fmt.Fprintf(c.writer, "dn %+v filtred %d volumes and locations.\n", dataNodeId, len(dataNodeVolumeIdToVInfo[dataNodeId])) - } + return nil + }) + } + err = eg.Wait() + if err != nil { + fmt.Fprintf(c.writer, "got error: %v", err) + return err } if *c.findMissingChunksInFiler { @@ -416,7 +431,7 @@ func (c *commandVolumeFsck) collectOneVolumeFileIds(dataNodeId string, volumeId } buf.Write(resp.FileContent) } - if !vinfo.isReadOnly { + if !vinfo.isReadOnly && (modifyFrom != 0 || cutoffFrom != 0) { index, err := idx.FirstInvalidIndex(buf.Bytes(), func(key types.NeedleId, offset types.Offset, size types.Size) (bool, error) { resp, err := volumeServerClient.ReadNeedleMeta(context.Background(), &volume_server_pb.ReadNeedleMetaRequest{ @@ -428,7 +443,7 @@ func (c *commandVolumeFsck) collectOneVolumeFileIds(dataNodeId string, volumeId if err != nil { return false, fmt.Errorf("read needle meta with id %d from volume %d: %v", key, volumeId, err) } - if (modifyFrom == 0 || modifyFrom <= resp.AppendAtNs) && (resp.AppendAtNs <= cutoffFrom) { + if (modifyFrom == 0 || modifyFrom <= resp.AppendAtNs) && (cutoffFrom == 0 || resp.AppendAtNs <= cutoffFrom) { return true, nil } return false, nil diff --git a/weed/shell/command_volume_tier_upload.go b/weed/shell/command_volume_tier_upload.go index c109d59d8..6932317ab 100644 --- a/weed/shell/command_volume_tier_upload.go +++ b/weed/shell/command_volume_tier_upload.go @@ -139,6 +139,12 @@ func uploadDatToRemoteTier(grpcDialOption grpc.DialOption, writer io.Writer, vol KeepLocalDatFile: keepLocalDatFile, }) + if stream == nil && copyErr == nil { + // when the volume is already uploaded, VolumeTierMoveDatToRemote will return nil stream and nil error + // so we should directly return in this case + fmt.Fprintf(writer, "volume %v already uploaded", volumeId) + return nil + } var lastProcessed int64 for { resp, recvErr := stream.Recv() |
