diff options
Diffstat (limited to 'weed/shell')
| -rw-r--r-- | weed/shell/command_ec_balance.go | 8 | ||||
| -rw-r--r-- | weed/shell/command_ec_rebuild.go | 2 | ||||
| -rw-r--r-- | weed/shell/command_fs_merge_volumes.go | 23 | ||||
| -rw-r--r-- | weed/shell/command_fs_verify.go | 203 | ||||
| -rw-r--r-- | weed/shell/command_remote_uncache.go | 5 | ||||
| -rw-r--r-- | weed/shell/command_s3_bucket_quota_check.go | 2 | ||||
| -rw-r--r-- | weed/shell/command_s3_clean_uploads.go | 3 | ||||
| -rw-r--r-- | weed/shell/command_volume_check_disk.go | 2 | ||||
| -rw-r--r-- | weed/shell/command_volume_fsck.go | 93 | ||||
| -rw-r--r-- | weed/shell/command_volume_grow.go | 64 | ||||
| -rw-r--r-- | weed/shell/command_volume_server_evacuate.go | 7 | ||||
| -rw-r--r-- | weed/shell/command_volume_tier_upload.go | 5 |
12 files changed, 281 insertions, 136 deletions
diff --git a/weed/shell/command_ec_balance.go b/weed/shell/command_ec_balance.go index 17ba63cfe..217e5750e 100644 --- a/weed/shell/command_ec_balance.go +++ b/weed/shell/command_ec_balance.go @@ -455,16 +455,20 @@ func doBalanceEcRack(commandEnv *CommandEnv, ecRack *EcRack, applyBalancing bool func pickOneEcNodeAndMoveOneShard(commandEnv *CommandEnv, averageShardsPerEcNode int, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error { sortEcNodesByFreeslotsDescending(possibleDestinationEcNodes) - + skipReason := "" for _, destEcNode := range possibleDestinationEcNodes { + if destEcNode.info.Id == existingLocation.info.Id { continue } if destEcNode.freeEcSlot <= 0 { + skipReason += fmt.Sprintf(" Skipping %s because it has no free slots\n", destEcNode.info.Id) continue } if findEcVolumeShards(destEcNode, vid).ShardIdCount() >= averageShardsPerEcNode { + skipReason += fmt.Sprintf(" Skipping %s because it %d >= avernageShards (%d)\n", + destEcNode.info.Id, findEcVolumeShards(destEcNode, vid).ShardIdCount(), averageShardsPerEcNode) continue } @@ -477,7 +481,7 @@ func pickOneEcNodeAndMoveOneShard(commandEnv *CommandEnv, averageShardsPerEcNode return nil } - + fmt.Printf("WARNING: Could not find suitable taget node for %d.%d:\n%s", vid, shardId, skipReason) return nil } diff --git a/weed/shell/command_ec_rebuild.go b/weed/shell/command_ec_rebuild.go index 2131c5649..a4dfac67c 100644 --- a/weed/shell/command_ec_rebuild.go +++ b/weed/shell/command_ec_rebuild.go @@ -224,7 +224,7 @@ func prepareDataToRecover(commandEnv *CommandEnv, rebuilder *EcNode, collection Collection: collection, ShardIds: []uint32{uint32(shardId)}, CopyEcxFile: needEcxFile, - CopyEcjFile: needEcxFile, + CopyEcjFile: true, CopyVifFile: needEcxFile, SourceDataNode: ecNodes[0].info.Id, }) diff --git a/weed/shell/command_fs_merge_volumes.go b/weed/shell/command_fs_merge_volumes.go index 4a6048a43..b77feb8e3 100644 --- a/weed/shell/command_fs_merge_volumes.go +++ b/weed/shell/command_fs_merge_volumes.go @@ -19,14 +19,10 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/util" -) - -var ( - client *http.Client + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) func init() { - client = &http.Client{} Commands = append(Commands, &commandFsMergeVolumes{}) } @@ -104,7 +100,7 @@ func (c *commandFsMergeVolumes) Do(args []string, commandEnv *CommandEnv, writer return nil } - defer client.CloseIdleConnections() + defer util_http.GetGlobalHttpClient().CloseIdleConnections() return commandEnv.WithFilerClient(false, func(filerClient filer_pb.SeaweedFilerClient) error { return filer_pb.TraverseBfs(commandEnv, util.FullPath(dir), func(parentPath util.FullPath, entry *filer_pb.Entry) { @@ -304,7 +300,7 @@ func moveChunk(chunk *filer_pb.FileChunk, toVolumeId needle.VolumeId, masterClie if err != nil { return err } - defer util.CloseResponse(resp) + defer util_http.CloseResponse(resp) defer reader.Close() var filename string @@ -322,7 +318,12 @@ func moveChunk(chunk *filer_pb.FileChunk, toVolumeId needle.VolumeId, masterClie isCompressed := resp.Header.Get("Content-Encoding") == "gzip" md5 := resp.Header.Get("Content-MD5") - _, err, _ = operation.Upload(reader, &operation.UploadOption{ + uploader, err := operation.NewUploader() + if err != nil { + return err + } + + _, err, _ = uploader.Upload(reader, &operation.UploadOption{ UploadUrl: uploadURL, Filename: filename, IsInputCompressed: isCompressed, @@ -342,18 +343,18 @@ func moveChunk(chunk *filer_pb.FileChunk, toVolumeId needle.VolumeId, masterClie func readUrl(fileUrl string) (*http.Response, io.ReadCloser, error) { - req, err := http.NewRequest("GET", fileUrl, nil) + req, err := http.NewRequest(http.MethodGet, fileUrl, nil) if err != nil { return nil, nil, err } req.Header.Add("Accept-Encoding", "gzip") - r, err := client.Do(req) + r, err := util_http.GetGlobalHttpClient().Do(req) if err != nil { return nil, nil, err } if r.StatusCode >= 400 { - util.CloseResponse(r) + util_http.CloseResponse(r) return nil, nil, fmt.Errorf("%s: %s", fileUrl, r.Status) } diff --git a/weed/shell/command_fs_verify.go b/weed/shell/command_fs_verify.go index 32d498202..47052cca0 100644 --- a/weed/shell/command_fs_verify.go +++ b/weed/shell/command_fs_verify.go @@ -1,6 +1,7 @@ package shell import ( + "bytes" "context" "flag" "fmt" @@ -30,6 +31,7 @@ type commandFsVerify struct { volumeServers []pb.ServerAddress volumeIds map[uint32][]pb.ServerAddress verbose *bool + metadataFromLog *bool concurrency *int modifyTimeAgoAtSec int64 writer io.Writer @@ -56,7 +58,7 @@ func (c *commandFsVerify) Do(args []string, commandEnv *CommandEnv, writer io.Wr c.verbose = fsVerifyCommand.Bool("v", false, "print out each processed files") modifyTimeAgo := fsVerifyCommand.Duration("modifyTimeAgo", 0, "only include files after this modify time to verify") c.concurrency = fsVerifyCommand.Int("concurrency", 0, "number of parallel verification per volume server") - + c.metadataFromLog = fsVerifyCommand.Bool("metadataFromLog", false, "Using filer log to get metadata") if err = fsVerifyCommand.Parse(args); err != nil { return err } @@ -88,14 +90,19 @@ func (c *commandFsVerify) Do(args []string, commandEnv *CommandEnv, writer io.Wr defer close(c.waitChan[volumeServerStr]) } } - - fCount, eConut, terr := c.verifyTraverseBfs(path) - if terr == nil { - fmt.Fprintf(writer, "verified %d files, error %d files \n", fCount, eConut) + var fCount, eCount uint64 + if *c.metadataFromLog { + var wg sync.WaitGroup + fCount, eCount, err = c.verifyProcessMetadata(path, &wg) + wg.Wait() + if err != nil { + return err + } + } else { + fCount, eCount, err = c.verifyTraverseBfs(path) } - - return terr - + fmt.Fprintf(writer, "verified %d files, error %d files \n", fCount, eCount) + return err } func (c *commandFsVerify) collectVolumeIds() error { @@ -117,7 +124,7 @@ func (c *commandFsVerify) collectVolumeIds() error { return nil } -func (c *commandFsVerify) verifyEntry(volumeServer pb.ServerAddress, fileId *filer_pb.FileId) error { +func (c *commandFsVerify) verifyChunk(volumeServer pb.ServerAddress, fileId *filer_pb.FileId) error { err := operation.WithVolumeServerClient(false, volumeServer, c.env.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error { _, err := client.VolumeNeedleStatus(context.Background(), @@ -138,6 +145,126 @@ type ItemEntry struct { path util.FullPath } +func (c *commandFsVerify) verifyProcessMetadata(path string, wg *sync.WaitGroup) (fileCount uint64, errCount uint64, err error) { + processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error { + message := resp.EventNotification + if resp.EventNotification.NewEntry == nil { + return nil + } + chunkCount := len(message.NewEntry.Chunks) + if chunkCount == 0 { + return nil + } + entryPath := fmt.Sprintf("%s/%s", message.NewParentPath, message.NewEntry.Name) + errorChunksCount := atomic.NewUint64(0) + if !c.verifyEntry(entryPath, message.NewEntry.Chunks, errorChunksCount, wg) { + if err = c.env.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + entryResp, errReq := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{ + Directory: message.NewParentPath, + Name: message.NewEntry.Name, + }) + if errReq != nil { + if strings.HasSuffix(errReq.Error(), "no entry is found in filer store") { + return nil + } + return errReq + } + if entryResp.Entry.Attributes.Mtime == message.NewEntry.Attributes.Mtime && + bytes.Equal(entryResp.Entry.Attributes.Md5, message.NewEntry.Attributes.Md5) { + fmt.Fprintf(c.writer, "file: %s needles:%d failed:%d\n", entryPath, chunkCount, errorChunksCount.Load()) + errCount++ + } + return nil + }); err != nil { + return err + } + return nil + } + if *c.verbose { + fmt.Fprintf(c.writer, "file: %s needles:%d verifed\n", entryPath, chunkCount) + } + fileCount++ + return nil + } + metadataFollowOption := &pb.MetadataFollowOption{ + ClientName: "shell_verify", + ClientId: util.RandomInt32(), + ClientEpoch: 0, + SelfSignature: 0, + PathPrefix: path, + AdditionalPathPrefixes: nil, + DirectoriesToWatch: nil, + StartTsNs: time.Now().Add(-1 * time.Second * time.Duration(c.modifyTimeAgoAtSec)).UnixNano(), + StopTsNs: time.Now().UnixNano(), + EventErrorType: pb.DontLogError, + } + return fileCount, errCount, pb.FollowMetadata(c.env.option.FilerAddress, c.env.option.GrpcDialOption, metadataFollowOption, processEventFn) +} + +func (c *commandFsVerify) verifyEntry(path string, chunks []*filer_pb.FileChunk, errorCount *atomic.Uint64, wg *sync.WaitGroup) bool { + fileMsg := fmt.Sprintf("file:%s", path) + itemIsVerifed := atomic.NewBool(true) + for _, chunk := range chunks { + if volumeIds, ok := c.volumeIds[chunk.Fid.VolumeId]; ok { + for _, volumeServer := range volumeIds { + if *c.concurrency == 0 { + if err := c.verifyChunk(volumeServer, chunk.Fid); err != nil { + if !(*c.metadataFromLog && strings.HasSuffix(err.Error(), "not found")) { + fmt.Fprintf(c.writer, "%s failed verify fileId %s: %+v\n", + fileMsg, chunk.GetFileIdString(), err) + } + if itemIsVerifed.Load() { + itemIsVerifed.Store(false) + errorCount.Add(1) + } + } + continue + } + c.waitChanLock.RLock() + waitChan, ok := c.waitChan[string(volumeServer)] + c.waitChanLock.RUnlock() + if !ok { + fmt.Fprintf(c.writer, "%s failed to get channel for %s fileId: %s\n", + string(volumeServer), fileMsg, chunk.GetFileIdString()) + if itemIsVerifed.Load() { + itemIsVerifed.Store(false) + errorCount.Add(1) + } + continue + } + wg.Add(1) + waitChan <- struct{}{} + go func(fChunk *filer_pb.FileChunk, path string, volumeServer pb.ServerAddress, msg string) { + defer wg.Done() + if err := c.verifyChunk(volumeServer, fChunk.Fid); err != nil { + if !(*c.metadataFromLog && strings.HasSuffix(err.Error(), "not found")) { + fmt.Fprintf(c.writer, "%s failed verify fileId %s: %+v\n", + msg, fChunk.GetFileIdString(), err) + } + if itemIsVerifed.Load() { + itemIsVerifed.Store(false) + errorCount.Add(1) + } + } + <-waitChan + }(chunk, path, volumeServer, fileMsg) + } + } else { + if !*c.metadataFromLog { + err := fmt.Errorf("volumeId %d not found", chunk.Fid.VolumeId) + fmt.Fprintf(c.writer, "%s failed verify fileId %s: %+v\n", + fileMsg, chunk.GetFileIdString(), err) + } + if itemIsVerifed.Load() { + itemIsVerifed.Store(false) + errorCount.Add(1) + } + break + } + } + return itemIsVerifed.Load() +} + func (c *commandFsVerify) verifyTraverseBfs(path string) (fileCount uint64, errCount uint64, err error) { timeNowAtSec := time.Now().Unix() return fileCount, errCount, doTraverseBfsAndSaving(c.env, c.writer, path, false, @@ -166,63 +293,9 @@ func (c *commandFsVerify) verifyTraverseBfs(path string) (fileCount uint64, errC for itemEntry := range outputChan { i := itemEntry.(*ItemEntry) itemPath := string(i.path) - fileMsg := fmt.Sprintf("file:%s", itemPath) - itemIsVerifed := atomic.NewBool(true) - for _, chunk := range i.chunks { - if volumeIds, ok := c.volumeIds[chunk.Fid.VolumeId]; ok { - for _, volumeServer := range volumeIds { - if *c.concurrency == 0 { - if err = c.verifyEntry(volumeServer, chunk.Fid); err != nil { - fmt.Fprintf(c.writer, "%s failed verify fileId %s: %+v\n", - fileMsg, chunk.GetFileIdString(), err) - if itemIsVerifed.Load() { - itemIsVerifed.Store(false) - itemErrCount.Add(1) - } - } - continue - } - c.waitChanLock.RLock() - waitChan, ok := c.waitChan[string(volumeServer)] - c.waitChanLock.RUnlock() - if !ok { - fmt.Fprintf(c.writer, "%s failed to get channel for %s fileId: %s: %+v\n", - string(volumeServer), fileMsg, chunk.GetFileIdString(), err) - if itemIsVerifed.Load() { - itemIsVerifed.Store(false) - itemErrCount.Add(1) - } - continue - } - wg.Add(1) - waitChan <- struct{}{} - go func(fChunk *filer_pb.FileChunk, path string, volumeServer pb.ServerAddress, msg string) { - defer wg.Done() - if err = c.verifyEntry(volumeServer, fChunk.Fid); err != nil { - fmt.Fprintf(c.writer, "%s failed verify fileId %s: %+v\n", - msg, fChunk.GetFileIdString(), err) - if itemIsVerifed.Load() { - itemIsVerifed.Store(false) - itemErrCount.Add(1) - } - } - <-waitChan - }(chunk, itemPath, volumeServer, fileMsg) - } - } else { - err = fmt.Errorf("volumeId %d not found", chunk.Fid.VolumeId) - fmt.Fprintf(c.writer, "%s failed verify fileId %s: %+v\n", - fileMsg, chunk.GetFileIdString(), err) - if itemIsVerifed.Load() { - itemIsVerifed.Store(false) - itemErrCount.Add(1) - } - break - } - } - if itemIsVerifed.Load() { + if c.verifyEntry(itemPath, i.chunks, itemErrCount, &wg) { if *c.verbose { - fmt.Fprintf(c.writer, "%s needles:%d verifed\n", fileMsg, len(i.chunks)) + fmt.Fprintf(c.writer, "file: %s needles:%d verifed\n", itemPath, len(i.chunks)) } fileCount++ } diff --git a/weed/shell/command_remote_uncache.go b/weed/shell/command_remote_uncache.go index 34269ce4e..25e51ff74 100644 --- a/weed/shell/command_remote_uncache.go +++ b/weed/shell/command_remote_uncache.go @@ -7,6 +7,7 @@ import ( "io" "path/filepath" "strings" + "time" "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" @@ -164,12 +165,12 @@ func (ff *FileFilter) matches(entry *filer_pb.Entry) bool { } } if *ff.minAge != -1 { - if entry.Attributes.Crtime < *ff.minAge { + if entry.Attributes.Crtime + *ff.minAge > time.Now().Unix() { return false } } if *ff.maxAge != -1 { - if entry.Attributes.Crtime > *ff.maxAge { + if entry.Attributes.Crtime + *ff.maxAge < time.Now().Unix() { return false } } diff --git a/weed/shell/command_s3_bucket_quota_check.go b/weed/shell/command_s3_bucket_quota_check.go index bc0d838f7..b130e4fad 100644 --- a/weed/shell/command_s3_bucket_quota_check.go +++ b/weed/shell/command_s3_bucket_quota_check.go @@ -130,7 +130,7 @@ func (c *commandS3BucketQuotaEnforce) processEachBucket(fc *filer.FilerConf, fil } else { fmt.Fprintf(writer, " changing bucket %s to writable.\n", entry.Name) } - fc.AddLocationConf(locConf) + fc.SetLocationConf(locConf) } return diff --git a/weed/shell/command_s3_clean_uploads.go b/weed/shell/command_s3_clean_uploads.go index 2be61f72a..accce60ba 100644 --- a/weed/shell/command_s3_clean_uploads.go +++ b/weed/shell/command_s3_clean_uploads.go @@ -12,6 +12,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) func init() { @@ -90,7 +91,7 @@ func (c *commandS3CleanUploads) cleanupUploads(commandEnv *CommandEnv, writer io deleteUrl := fmt.Sprintf("http://%s%s/%s?recursive=true&ignoreRecursiveError=true", commandEnv.option.FilerAddress.ToHttpAddress(), uploadsDir, staleUpload) fmt.Fprintf(writer, "purge %s\n", deleteUrl) - err = util.Delete(deleteUrl, string(encodedJwt)) + err = util_http.Delete(deleteUrl, string(encodedJwt)) if err != nil && err.Error() != "" { return fmt.Errorf("purge %s/%s: %v", uploadsDir, staleUpload, err) } diff --git a/weed/shell/command_volume_check_disk.go b/weed/shell/command_volume_check_disk.go index 3e2512bdd..0e76f6ac9 100644 --- a/weed/shell/command_volume_check_disk.go +++ b/weed/shell/command_volume_check_disk.go @@ -279,7 +279,7 @@ func doVolumeCheckDisk(minuend, subtrahend *needle_map.MemDb, source, target *Vo fmt.Fprintf(writer, "delete %s %s => %s\n", needleValue.Key.FileId(source.info.Id), source.location.dataNode.Id, target.location.dataNode.Id) } } - deleteResults, deleteErr := operation.DeleteFilesAtOneVolumeServer( + deleteResults, deleteErr := operation.DeleteFileIdsAtOneVolumeServer( pb.NewServerAddressFromDataNode(target.location.dataNode), grpcDialOption, fidList, false) if deleteErr != nil { diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go index 1d27fae1d..acb0ee5ad 100644 --- a/weed/shell/command_volume_fsck.go +++ b/weed/shell/command_volume_fsck.go @@ -7,6 +7,18 @@ import ( "errors" "flag" "fmt" + "io" + "math" + "net/http" + "net/url" + "os" + "path" + "path/filepath" + "strconv" + "strings" + "sync" + "time" + "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb" @@ -14,23 +26,12 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "github.com/seaweedfs/seaweedfs/weed/storage" - "github.com/seaweedfs/seaweedfs/weed/storage/idx" "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/needle_map" "github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/util" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" "golang.org/x/sync/errgroup" - "io" - "math" - "net/http" - "net/url" - "os" - "path" - "path/filepath" - "strconv" - "strings" - "sync" - "time" ) func init() { @@ -163,7 +164,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io. delete(volumeIdToVInfo, volumeId) continue } - err = c.collectOneVolumeFileIds(dataNodeId, volumeId, vinfo, uint64(collectModifyFromAtNs), uint64(collectCutoffFromAtNs)) + err = c.collectOneVolumeFileIds(dataNodeId, volumeId, vinfo) if err != nil { return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, err) } @@ -198,7 +199,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io. return fmt.Errorf("failed to collect file ids from filer: %v", err) } // volume file ids subtract filer file ids - if err = c.findExtraChunksInVolumeServers(dataNodeVolumeIdToVInfo, *applyPurging); err != nil { + if err = c.findExtraChunksInVolumeServers(dataNodeVolumeIdToVInfo, *applyPurging, uint64(collectModifyFromAtNs), uint64(collectCutoffFromAtNs)); err != nil { return fmt.Errorf("findExtraChunksInVolumeServers: %v", err) } } @@ -288,7 +289,7 @@ func (c *commandVolumeFsck) findFilerChunksMissingInVolumeServers(volumeIdToVInf return nil } -func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVInfo map[string]map[uint32]VInfo, applyPurging bool) error { +func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVInfo map[string]map[uint32]VInfo, applyPurging bool, modifyFrom, cutoffFrom uint64) error { var totalInUseCount, totalOrphanChunkCount, totalOrphanDataSize uint64 volumeIdOrphanFileIds := make(map[uint32]map[string]bool) @@ -298,7 +299,7 @@ func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVIn serverReplicas := make(map[uint32][]pb.ServerAddress) for dataNodeId, volumeIdToVInfo := range dataNodeVolumeIdToVInfo { for volumeId, vinfo := range volumeIdToVInfo { - inUseCount, orphanFileIds, orphanDataSize, checkErr := c.oneVolumeFileIdsSubtractFilerFileIds(dataNodeId, volumeId, &vinfo) + inUseCount, orphanFileIds, orphanDataSize, checkErr := c.oneVolumeFileIdsSubtractFilerFileIds(dataNodeId, volumeId, &vinfo, modifyFrom, cutoffFrom) if checkErr != nil { return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, checkErr) } @@ -394,7 +395,7 @@ func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVIn return nil } -func (c *commandVolumeFsck) collectOneVolumeFileIds(dataNodeId string, volumeId uint32, vinfo VInfo, modifyFrom uint64, cutoffFrom uint64) error { +func (c *commandVolumeFsck) collectOneVolumeFileIds(dataNodeId string, volumeId uint32, vinfo VInfo) error { if *c.verbose { fmt.Fprintf(c.writer, "collecting volume %d file ids from %s ...\n", volumeId, vinfo.server) @@ -431,29 +432,6 @@ func (c *commandVolumeFsck) collectOneVolumeFileIds(dataNodeId string, volumeId } buf.Write(resp.FileContent) } - if !vinfo.isReadOnly && (modifyFrom != 0 || cutoffFrom != 0) { - index, err := idx.FirstInvalidIndex(buf.Bytes(), - func(key types.NeedleId, offset types.Offset, size types.Size) (bool, error) { - resp, err := volumeServerClient.ReadNeedleMeta(context.Background(), &volume_server_pb.ReadNeedleMetaRequest{ - VolumeId: volumeId, - NeedleId: uint64(key), - Offset: offset.ToActualOffset(), - Size: int32(size), - }) - if err != nil { - return false, fmt.Errorf("read needle meta with id %d from volume %d: %v", key, volumeId, err) - } - if (modifyFrom == 0 || modifyFrom <= resp.AppendAtNs) && (cutoffFrom == 0 || resp.AppendAtNs <= cutoffFrom) { - return true, nil - } - return false, nil - }) - if err != nil { - fmt.Fprintf(c.writer, "Failed to search for last valid index on volume %d with error %v\n", volumeId, err) - } else { - buf.Truncate(index * types.NeedleMapEntrySize) - } - } idxFilename := getVolumeFileIdFile(c.tempFolder, dataNodeId, volumeId) err = writeToFile(buf.Bytes(), idxFilename) if err != nil { @@ -552,9 +530,7 @@ func (c *commandVolumeFsck) httpDelete(path util.FullPath) { fmt.Fprintf(c.writer, "HTTP delete request error: %v\n", err) } - client := &http.Client{} - - resp, err := client.Do(req) + resp, err := util_http.GetGlobalHttpClient().Do(req) if err != nil { fmt.Fprintf(c.writer, "DELETE fetch error: %v\n", err) } @@ -571,7 +547,7 @@ func (c *commandVolumeFsck) httpDelete(path util.FullPath) { } } -func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(dataNodeId string, volumeId uint32, vinfo *VInfo) (inUseCount uint64, orphanFileIds []string, orphanDataSize uint64, err error) { +func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(dataNodeId string, volumeId uint32, vinfo *VInfo, modifyFrom, cutoffFrom uint64) (inUseCount uint64, orphanFileIds []string, orphanDataSize uint64, err error) { volumeFileIdDb := needle_map.NewMemDb() defer volumeFileIdDb.Close() @@ -611,9 +587,30 @@ func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(dataNodeId stri if n.Size.IsDeleted() { return nil } - orphanFileIds = append(orphanFileIds, n.Key.FileId(volumeId)) - orphanFileCount++ - orphanDataSize += uint64(n.Size) + if cutoffFrom > 0 || modifyFrom > 0 { + return operation.WithVolumeServerClient(false, vinfo.server, c.env.option.GrpcDialOption, + func(volumeServerClient volume_server_pb.VolumeServerClient) error { + resp, err := volumeServerClient.ReadNeedleMeta(context.Background(), &volume_server_pb.ReadNeedleMetaRequest{ + VolumeId: volumeId, + NeedleId: types.NeedleIdToUint64(n.Key), + Offset: n.Offset.ToActualOffset(), + Size: int32(n.Size), + }) + if err != nil { + return fmt.Errorf("read needle meta with id %d from volume %d: %v", n.Key, volumeId, err) + } + if (modifyFrom == 0 || modifyFrom <= resp.AppendAtNs) && (cutoffFrom == 0 || resp.AppendAtNs <= cutoffFrom) { + orphanFileIds = append(orphanFileIds, n.Key.FileId(volumeId)) + orphanFileCount++ + orphanDataSize += uint64(n.Size) + } + return nil + }) + } else { + orphanFileIds = append(orphanFileIds, n.Key.FileId(volumeId)) + orphanFileCount++ + orphanDataSize += uint64(n.Size) + } return nil }); err != nil { err = fmt.Errorf("failed to AscendingVisit %+v", err) @@ -697,7 +694,7 @@ func (c *commandVolumeFsck) purgeFileIdsForOneVolume(volumeId uint32, fileIds [] go func(server pb.ServerAddress, fidList []string) { defer wg.Done() - if deleteResults, deleteErr := operation.DeleteFilesAtOneVolumeServer(server, c.env.option.GrpcDialOption, fidList, false); deleteErr != nil { + if deleteResults, deleteErr := operation.DeleteFileIdsAtOneVolumeServer(server, c.env.option.GrpcDialOption, fidList, false); deleteErr != nil { err = deleteErr } else if deleteResults != nil { resultChan <- deleteResults diff --git a/weed/shell/command_volume_grow.go b/weed/shell/command_volume_grow.go new file mode 100644 index 000000000..21d98dddd --- /dev/null +++ b/weed/shell/command_volume_grow.go @@ -0,0 +1,64 @@ +package shell + +import ( + "context" + "flag" + "fmt" + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "io" +) + +func init() { + Commands = append(Commands, &commandGrow{}) +} + +type commandGrow struct { +} + +func (c *commandGrow) Name() string { + return "volume.grow" +} + +func (c *commandGrow) Help() string { + return `grow volumes + + volume.grow [-collection=<collection name>] [-dataCenter=<data center name>] + +` +} + +func (c *commandGrow) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + + volumeVacuumCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + growCount := volumeVacuumCommand.Uint("count", 2, "") + collection := volumeVacuumCommand.String("collection", "", "grow this collection") + dataCenter := volumeVacuumCommand.String("dataCenter", "", "grow volumes only from the specified data center") + + if err = volumeVacuumCommand.Parse(args); err != nil { + return nil + } + + assignRequest := &master_pb.AssignRequest{ + Count: 0, + Collection: *collection, + WritableVolumeCount: uint32(*growCount), + } + if *dataCenter != "" { + assignRequest.DataCenter = *dataCenter + } + + err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { + _, err := client.Assign(context.Background(), assignRequest) + + if err != nil { + return fmt.Errorf("Assign: %v", err) + } + return nil + }) + + if err != nil { + return + } + + return nil +} diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go index 57eb6fc45..bad695cd7 100644 --- a/weed/shell/command_volume_server_evacuate.go +++ b/weed/shell/command_volume_server_evacuate.go @@ -3,14 +3,15 @@ package shell import ( "flag" "fmt" + "io" + "os" + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/super_block" "github.com/seaweedfs/seaweedfs/weed/storage/types" "golang.org/x/exp/slices" - "io" - "os" ) func init() { @@ -219,7 +220,7 @@ func moveAwayOneNormalVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][ }) for i := 0; i < len(otherNodes); i++ { emptyNode := otherNodes[i] - if freeVolumeCountfn(emptyNode.info) < 0 { + if freeVolumeCountfn(emptyNode.info) <= 0 { continue } hasMoved, err = maybeMoveOneVolume(commandEnv, volumeReplicas, thisNode, vol, emptyNode, applyChange) diff --git a/weed/shell/command_volume_tier_upload.go b/weed/shell/command_volume_tier_upload.go index 6932317ab..cb805b0cf 100644 --- a/weed/shell/command_volume_tier_upload.go +++ b/weed/shell/command_volume_tier_upload.go @@ -113,11 +113,14 @@ func doVolumeTierUpload(commandEnv *CommandEnv, writer io.Writer, collection str return fmt.Errorf("copy dat file for volume %d on %s to %s: %v", vid, existingLocations[0].Url, dest, err) } + if keepLocalDatFile { + return nil + } // now the first replica has the .idx and .vif files. // ask replicas on other volume server to delete its own local copy for i, location := range existingLocations { if i == 0 { - break + continue } fmt.Printf("delete volume %d from %s\n", vid, location.Url) err = deleteVolume(commandEnv.option.GrpcDialOption, vid, location.ServerAddress(), false) |
