aboutsummaryrefslogtreecommitdiff
path: root/weed/shell
diff options
context:
space:
mode:
Diffstat (limited to 'weed/shell')
-rw-r--r--weed/shell/command_ec_balance.go8
-rw-r--r--weed/shell/command_ec_rebuild.go2
-rw-r--r--weed/shell/command_fs_merge_volumes.go23
-rw-r--r--weed/shell/command_fs_verify.go203
-rw-r--r--weed/shell/command_remote_uncache.go5
-rw-r--r--weed/shell/command_s3_bucket_quota_check.go2
-rw-r--r--weed/shell/command_s3_clean_uploads.go3
-rw-r--r--weed/shell/command_volume_check_disk.go2
-rw-r--r--weed/shell/command_volume_fsck.go93
-rw-r--r--weed/shell/command_volume_grow.go64
-rw-r--r--weed/shell/command_volume_server_evacuate.go7
-rw-r--r--weed/shell/command_volume_tier_upload.go5
12 files changed, 281 insertions, 136 deletions
diff --git a/weed/shell/command_ec_balance.go b/weed/shell/command_ec_balance.go
index 17ba63cfe..217e5750e 100644
--- a/weed/shell/command_ec_balance.go
+++ b/weed/shell/command_ec_balance.go
@@ -455,16 +455,20 @@ func doBalanceEcRack(commandEnv *CommandEnv, ecRack *EcRack, applyBalancing bool
func pickOneEcNodeAndMoveOneShard(commandEnv *CommandEnv, averageShardsPerEcNode int, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, possibleDestinationEcNodes []*EcNode, applyBalancing bool) error {
sortEcNodesByFreeslotsDescending(possibleDestinationEcNodes)
-
+ skipReason := ""
for _, destEcNode := range possibleDestinationEcNodes {
+
if destEcNode.info.Id == existingLocation.info.Id {
continue
}
if destEcNode.freeEcSlot <= 0 {
+ skipReason += fmt.Sprintf(" Skipping %s because it has no free slots\n", destEcNode.info.Id)
continue
}
if findEcVolumeShards(destEcNode, vid).ShardIdCount() >= averageShardsPerEcNode {
+ skipReason += fmt.Sprintf(" Skipping %s because it %d >= avernageShards (%d)\n",
+ destEcNode.info.Id, findEcVolumeShards(destEcNode, vid).ShardIdCount(), averageShardsPerEcNode)
continue
}
@@ -477,7 +481,7 @@ func pickOneEcNodeAndMoveOneShard(commandEnv *CommandEnv, averageShardsPerEcNode
return nil
}
-
+ fmt.Printf("WARNING: Could not find suitable taget node for %d.%d:\n%s", vid, shardId, skipReason)
return nil
}
diff --git a/weed/shell/command_ec_rebuild.go b/weed/shell/command_ec_rebuild.go
index 2131c5649..a4dfac67c 100644
--- a/weed/shell/command_ec_rebuild.go
+++ b/weed/shell/command_ec_rebuild.go
@@ -224,7 +224,7 @@ func prepareDataToRecover(commandEnv *CommandEnv, rebuilder *EcNode, collection
Collection: collection,
ShardIds: []uint32{uint32(shardId)},
CopyEcxFile: needEcxFile,
- CopyEcjFile: needEcxFile,
+ CopyEcjFile: true,
CopyVifFile: needEcxFile,
SourceDataNode: ecNodes[0].info.Id,
})
diff --git a/weed/shell/command_fs_merge_volumes.go b/weed/shell/command_fs_merge_volumes.go
index 4a6048a43..b77feb8e3 100644
--- a/weed/shell/command_fs_merge_volumes.go
+++ b/weed/shell/command_fs_merge_volumes.go
@@ -19,14 +19,10 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
-)
-
-var (
- client *http.Client
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
func init() {
- client = &http.Client{}
Commands = append(Commands, &commandFsMergeVolumes{})
}
@@ -104,7 +100,7 @@ func (c *commandFsMergeVolumes) Do(args []string, commandEnv *CommandEnv, writer
return nil
}
- defer client.CloseIdleConnections()
+ defer util_http.GetGlobalHttpClient().CloseIdleConnections()
return commandEnv.WithFilerClient(false, func(filerClient filer_pb.SeaweedFilerClient) error {
return filer_pb.TraverseBfs(commandEnv, util.FullPath(dir), func(parentPath util.FullPath, entry *filer_pb.Entry) {
@@ -304,7 +300,7 @@ func moveChunk(chunk *filer_pb.FileChunk, toVolumeId needle.VolumeId, masterClie
if err != nil {
return err
}
- defer util.CloseResponse(resp)
+ defer util_http.CloseResponse(resp)
defer reader.Close()
var filename string
@@ -322,7 +318,12 @@ func moveChunk(chunk *filer_pb.FileChunk, toVolumeId needle.VolumeId, masterClie
isCompressed := resp.Header.Get("Content-Encoding") == "gzip"
md5 := resp.Header.Get("Content-MD5")
- _, err, _ = operation.Upload(reader, &operation.UploadOption{
+ uploader, err := operation.NewUploader()
+ if err != nil {
+ return err
+ }
+
+ _, err, _ = uploader.Upload(reader, &operation.UploadOption{
UploadUrl: uploadURL,
Filename: filename,
IsInputCompressed: isCompressed,
@@ -342,18 +343,18 @@ func moveChunk(chunk *filer_pb.FileChunk, toVolumeId needle.VolumeId, masterClie
func readUrl(fileUrl string) (*http.Response, io.ReadCloser, error) {
- req, err := http.NewRequest("GET", fileUrl, nil)
+ req, err := http.NewRequest(http.MethodGet, fileUrl, nil)
if err != nil {
return nil, nil, err
}
req.Header.Add("Accept-Encoding", "gzip")
- r, err := client.Do(req)
+ r, err := util_http.GetGlobalHttpClient().Do(req)
if err != nil {
return nil, nil, err
}
if r.StatusCode >= 400 {
- util.CloseResponse(r)
+ util_http.CloseResponse(r)
return nil, nil, fmt.Errorf("%s: %s", fileUrl, r.Status)
}
diff --git a/weed/shell/command_fs_verify.go b/weed/shell/command_fs_verify.go
index 32d498202..47052cca0 100644
--- a/weed/shell/command_fs_verify.go
+++ b/weed/shell/command_fs_verify.go
@@ -1,6 +1,7 @@
package shell
import (
+ "bytes"
"context"
"flag"
"fmt"
@@ -30,6 +31,7 @@ type commandFsVerify struct {
volumeServers []pb.ServerAddress
volumeIds map[uint32][]pb.ServerAddress
verbose *bool
+ metadataFromLog *bool
concurrency *int
modifyTimeAgoAtSec int64
writer io.Writer
@@ -56,7 +58,7 @@ func (c *commandFsVerify) Do(args []string, commandEnv *CommandEnv, writer io.Wr
c.verbose = fsVerifyCommand.Bool("v", false, "print out each processed files")
modifyTimeAgo := fsVerifyCommand.Duration("modifyTimeAgo", 0, "only include files after this modify time to verify")
c.concurrency = fsVerifyCommand.Int("concurrency", 0, "number of parallel verification per volume server")
-
+ c.metadataFromLog = fsVerifyCommand.Bool("metadataFromLog", false, "Using filer log to get metadata")
if err = fsVerifyCommand.Parse(args); err != nil {
return err
}
@@ -88,14 +90,19 @@ func (c *commandFsVerify) Do(args []string, commandEnv *CommandEnv, writer io.Wr
defer close(c.waitChan[volumeServerStr])
}
}
-
- fCount, eConut, terr := c.verifyTraverseBfs(path)
- if terr == nil {
- fmt.Fprintf(writer, "verified %d files, error %d files \n", fCount, eConut)
+ var fCount, eCount uint64
+ if *c.metadataFromLog {
+ var wg sync.WaitGroup
+ fCount, eCount, err = c.verifyProcessMetadata(path, &wg)
+ wg.Wait()
+ if err != nil {
+ return err
+ }
+ } else {
+ fCount, eCount, err = c.verifyTraverseBfs(path)
}
-
- return terr
-
+ fmt.Fprintf(writer, "verified %d files, error %d files \n", fCount, eCount)
+ return err
}
func (c *commandFsVerify) collectVolumeIds() error {
@@ -117,7 +124,7 @@ func (c *commandFsVerify) collectVolumeIds() error {
return nil
}
-func (c *commandFsVerify) verifyEntry(volumeServer pb.ServerAddress, fileId *filer_pb.FileId) error {
+func (c *commandFsVerify) verifyChunk(volumeServer pb.ServerAddress, fileId *filer_pb.FileId) error {
err := operation.WithVolumeServerClient(false, volumeServer, c.env.option.GrpcDialOption,
func(client volume_server_pb.VolumeServerClient) error {
_, err := client.VolumeNeedleStatus(context.Background(),
@@ -138,6 +145,126 @@ type ItemEntry struct {
path util.FullPath
}
+func (c *commandFsVerify) verifyProcessMetadata(path string, wg *sync.WaitGroup) (fileCount uint64, errCount uint64, err error) {
+ processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
+ message := resp.EventNotification
+ if resp.EventNotification.NewEntry == nil {
+ return nil
+ }
+ chunkCount := len(message.NewEntry.Chunks)
+ if chunkCount == 0 {
+ return nil
+ }
+ entryPath := fmt.Sprintf("%s/%s", message.NewParentPath, message.NewEntry.Name)
+ errorChunksCount := atomic.NewUint64(0)
+ if !c.verifyEntry(entryPath, message.NewEntry.Chunks, errorChunksCount, wg) {
+ if err = c.env.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ entryResp, errReq := client.LookupDirectoryEntry(context.Background(), &filer_pb.LookupDirectoryEntryRequest{
+ Directory: message.NewParentPath,
+ Name: message.NewEntry.Name,
+ })
+ if errReq != nil {
+ if strings.HasSuffix(errReq.Error(), "no entry is found in filer store") {
+ return nil
+ }
+ return errReq
+ }
+ if entryResp.Entry.Attributes.Mtime == message.NewEntry.Attributes.Mtime &&
+ bytes.Equal(entryResp.Entry.Attributes.Md5, message.NewEntry.Attributes.Md5) {
+ fmt.Fprintf(c.writer, "file: %s needles:%d failed:%d\n", entryPath, chunkCount, errorChunksCount.Load())
+ errCount++
+ }
+ return nil
+ }); err != nil {
+ return err
+ }
+ return nil
+ }
+ if *c.verbose {
+ fmt.Fprintf(c.writer, "file: %s needles:%d verifed\n", entryPath, chunkCount)
+ }
+ fileCount++
+ return nil
+ }
+ metadataFollowOption := &pb.MetadataFollowOption{
+ ClientName: "shell_verify",
+ ClientId: util.RandomInt32(),
+ ClientEpoch: 0,
+ SelfSignature: 0,
+ PathPrefix: path,
+ AdditionalPathPrefixes: nil,
+ DirectoriesToWatch: nil,
+ StartTsNs: time.Now().Add(-1 * time.Second * time.Duration(c.modifyTimeAgoAtSec)).UnixNano(),
+ StopTsNs: time.Now().UnixNano(),
+ EventErrorType: pb.DontLogError,
+ }
+ return fileCount, errCount, pb.FollowMetadata(c.env.option.FilerAddress, c.env.option.GrpcDialOption, metadataFollowOption, processEventFn)
+}
+
+func (c *commandFsVerify) verifyEntry(path string, chunks []*filer_pb.FileChunk, errorCount *atomic.Uint64, wg *sync.WaitGroup) bool {
+ fileMsg := fmt.Sprintf("file:%s", path)
+ itemIsVerifed := atomic.NewBool(true)
+ for _, chunk := range chunks {
+ if volumeIds, ok := c.volumeIds[chunk.Fid.VolumeId]; ok {
+ for _, volumeServer := range volumeIds {
+ if *c.concurrency == 0 {
+ if err := c.verifyChunk(volumeServer, chunk.Fid); err != nil {
+ if !(*c.metadataFromLog && strings.HasSuffix(err.Error(), "not found")) {
+ fmt.Fprintf(c.writer, "%s failed verify fileId %s: %+v\n",
+ fileMsg, chunk.GetFileIdString(), err)
+ }
+ if itemIsVerifed.Load() {
+ itemIsVerifed.Store(false)
+ errorCount.Add(1)
+ }
+ }
+ continue
+ }
+ c.waitChanLock.RLock()
+ waitChan, ok := c.waitChan[string(volumeServer)]
+ c.waitChanLock.RUnlock()
+ if !ok {
+ fmt.Fprintf(c.writer, "%s failed to get channel for %s fileId: %s\n",
+ string(volumeServer), fileMsg, chunk.GetFileIdString())
+ if itemIsVerifed.Load() {
+ itemIsVerifed.Store(false)
+ errorCount.Add(1)
+ }
+ continue
+ }
+ wg.Add(1)
+ waitChan <- struct{}{}
+ go func(fChunk *filer_pb.FileChunk, path string, volumeServer pb.ServerAddress, msg string) {
+ defer wg.Done()
+ if err := c.verifyChunk(volumeServer, fChunk.Fid); err != nil {
+ if !(*c.metadataFromLog && strings.HasSuffix(err.Error(), "not found")) {
+ fmt.Fprintf(c.writer, "%s failed verify fileId %s: %+v\n",
+ msg, fChunk.GetFileIdString(), err)
+ }
+ if itemIsVerifed.Load() {
+ itemIsVerifed.Store(false)
+ errorCount.Add(1)
+ }
+ }
+ <-waitChan
+ }(chunk, path, volumeServer, fileMsg)
+ }
+ } else {
+ if !*c.metadataFromLog {
+ err := fmt.Errorf("volumeId %d not found", chunk.Fid.VolumeId)
+ fmt.Fprintf(c.writer, "%s failed verify fileId %s: %+v\n",
+ fileMsg, chunk.GetFileIdString(), err)
+ }
+ if itemIsVerifed.Load() {
+ itemIsVerifed.Store(false)
+ errorCount.Add(1)
+ }
+ break
+ }
+ }
+ return itemIsVerifed.Load()
+}
+
func (c *commandFsVerify) verifyTraverseBfs(path string) (fileCount uint64, errCount uint64, err error) {
timeNowAtSec := time.Now().Unix()
return fileCount, errCount, doTraverseBfsAndSaving(c.env, c.writer, path, false,
@@ -166,63 +293,9 @@ func (c *commandFsVerify) verifyTraverseBfs(path string) (fileCount uint64, errC
for itemEntry := range outputChan {
i := itemEntry.(*ItemEntry)
itemPath := string(i.path)
- fileMsg := fmt.Sprintf("file:%s", itemPath)
- itemIsVerifed := atomic.NewBool(true)
- for _, chunk := range i.chunks {
- if volumeIds, ok := c.volumeIds[chunk.Fid.VolumeId]; ok {
- for _, volumeServer := range volumeIds {
- if *c.concurrency == 0 {
- if err = c.verifyEntry(volumeServer, chunk.Fid); err != nil {
- fmt.Fprintf(c.writer, "%s failed verify fileId %s: %+v\n",
- fileMsg, chunk.GetFileIdString(), err)
- if itemIsVerifed.Load() {
- itemIsVerifed.Store(false)
- itemErrCount.Add(1)
- }
- }
- continue
- }
- c.waitChanLock.RLock()
- waitChan, ok := c.waitChan[string(volumeServer)]
- c.waitChanLock.RUnlock()
- if !ok {
- fmt.Fprintf(c.writer, "%s failed to get channel for %s fileId: %s: %+v\n",
- string(volumeServer), fileMsg, chunk.GetFileIdString(), err)
- if itemIsVerifed.Load() {
- itemIsVerifed.Store(false)
- itemErrCount.Add(1)
- }
- continue
- }
- wg.Add(1)
- waitChan <- struct{}{}
- go func(fChunk *filer_pb.FileChunk, path string, volumeServer pb.ServerAddress, msg string) {
- defer wg.Done()
- if err = c.verifyEntry(volumeServer, fChunk.Fid); err != nil {
- fmt.Fprintf(c.writer, "%s failed verify fileId %s: %+v\n",
- msg, fChunk.GetFileIdString(), err)
- if itemIsVerifed.Load() {
- itemIsVerifed.Store(false)
- itemErrCount.Add(1)
- }
- }
- <-waitChan
- }(chunk, itemPath, volumeServer, fileMsg)
- }
- } else {
- err = fmt.Errorf("volumeId %d not found", chunk.Fid.VolumeId)
- fmt.Fprintf(c.writer, "%s failed verify fileId %s: %+v\n",
- fileMsg, chunk.GetFileIdString(), err)
- if itemIsVerifed.Load() {
- itemIsVerifed.Store(false)
- itemErrCount.Add(1)
- }
- break
- }
- }
- if itemIsVerifed.Load() {
+ if c.verifyEntry(itemPath, i.chunks, itemErrCount, &wg) {
if *c.verbose {
- fmt.Fprintf(c.writer, "%s needles:%d verifed\n", fileMsg, len(i.chunks))
+ fmt.Fprintf(c.writer, "file: %s needles:%d verifed\n", itemPath, len(i.chunks))
}
fileCount++
}
diff --git a/weed/shell/command_remote_uncache.go b/weed/shell/command_remote_uncache.go
index 34269ce4e..25e51ff74 100644
--- a/weed/shell/command_remote_uncache.go
+++ b/weed/shell/command_remote_uncache.go
@@ -7,6 +7,7 @@ import (
"io"
"path/filepath"
"strings"
+ "time"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
@@ -164,12 +165,12 @@ func (ff *FileFilter) matches(entry *filer_pb.Entry) bool {
}
}
if *ff.minAge != -1 {
- if entry.Attributes.Crtime < *ff.minAge {
+ if entry.Attributes.Crtime + *ff.minAge > time.Now().Unix() {
return false
}
}
if *ff.maxAge != -1 {
- if entry.Attributes.Crtime > *ff.maxAge {
+ if entry.Attributes.Crtime + *ff.maxAge < time.Now().Unix() {
return false
}
}
diff --git a/weed/shell/command_s3_bucket_quota_check.go b/weed/shell/command_s3_bucket_quota_check.go
index bc0d838f7..b130e4fad 100644
--- a/weed/shell/command_s3_bucket_quota_check.go
+++ b/weed/shell/command_s3_bucket_quota_check.go
@@ -130,7 +130,7 @@ func (c *commandS3BucketQuotaEnforce) processEachBucket(fc *filer.FilerConf, fil
} else {
fmt.Fprintf(writer, " changing bucket %s to writable.\n", entry.Name)
}
- fc.AddLocationConf(locConf)
+ fc.SetLocationConf(locConf)
}
return
diff --git a/weed/shell/command_s3_clean_uploads.go b/weed/shell/command_s3_clean_uploads.go
index 2be61f72a..accce60ba 100644
--- a/weed/shell/command_s3_clean_uploads.go
+++ b/weed/shell/command_s3_clean_uploads.go
@@ -12,6 +12,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
func init() {
@@ -90,7 +91,7 @@ func (c *commandS3CleanUploads) cleanupUploads(commandEnv *CommandEnv, writer io
deleteUrl := fmt.Sprintf("http://%s%s/%s?recursive=true&ignoreRecursiveError=true", commandEnv.option.FilerAddress.ToHttpAddress(), uploadsDir, staleUpload)
fmt.Fprintf(writer, "purge %s\n", deleteUrl)
- err = util.Delete(deleteUrl, string(encodedJwt))
+ err = util_http.Delete(deleteUrl, string(encodedJwt))
if err != nil && err.Error() != "" {
return fmt.Errorf("purge %s/%s: %v", uploadsDir, staleUpload, err)
}
diff --git a/weed/shell/command_volume_check_disk.go b/weed/shell/command_volume_check_disk.go
index 3e2512bdd..0e76f6ac9 100644
--- a/weed/shell/command_volume_check_disk.go
+++ b/weed/shell/command_volume_check_disk.go
@@ -279,7 +279,7 @@ func doVolumeCheckDisk(minuend, subtrahend *needle_map.MemDb, source, target *Vo
fmt.Fprintf(writer, "delete %s %s => %s\n", needleValue.Key.FileId(source.info.Id), source.location.dataNode.Id, target.location.dataNode.Id)
}
}
- deleteResults, deleteErr := operation.DeleteFilesAtOneVolumeServer(
+ deleteResults, deleteErr := operation.DeleteFileIdsAtOneVolumeServer(
pb.NewServerAddressFromDataNode(target.location.dataNode),
grpcDialOption, fidList, false)
if deleteErr != nil {
diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go
index 1d27fae1d..acb0ee5ad 100644
--- a/weed/shell/command_volume_fsck.go
+++ b/weed/shell/command_volume_fsck.go
@@ -7,6 +7,18 @@ import (
"errors"
"flag"
"fmt"
+ "io"
+ "math"
+ "net/http"
+ "net/url"
+ "os"
+ "path"
+ "path/filepath"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
+
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb"
@@ -14,23 +26,12 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/storage"
- "github.com/seaweedfs/seaweedfs/weed/storage/idx"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/util"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
"golang.org/x/sync/errgroup"
- "io"
- "math"
- "net/http"
- "net/url"
- "os"
- "path"
- "path/filepath"
- "strconv"
- "strings"
- "sync"
- "time"
)
func init() {
@@ -163,7 +164,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
delete(volumeIdToVInfo, volumeId)
continue
}
- err = c.collectOneVolumeFileIds(dataNodeId, volumeId, vinfo, uint64(collectModifyFromAtNs), uint64(collectCutoffFromAtNs))
+ err = c.collectOneVolumeFileIds(dataNodeId, volumeId, vinfo)
if err != nil {
return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, err)
}
@@ -198,7 +199,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
return fmt.Errorf("failed to collect file ids from filer: %v", err)
}
// volume file ids subtract filer file ids
- if err = c.findExtraChunksInVolumeServers(dataNodeVolumeIdToVInfo, *applyPurging); err != nil {
+ if err = c.findExtraChunksInVolumeServers(dataNodeVolumeIdToVInfo, *applyPurging, uint64(collectModifyFromAtNs), uint64(collectCutoffFromAtNs)); err != nil {
return fmt.Errorf("findExtraChunksInVolumeServers: %v", err)
}
}
@@ -288,7 +289,7 @@ func (c *commandVolumeFsck) findFilerChunksMissingInVolumeServers(volumeIdToVInf
return nil
}
-func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVInfo map[string]map[uint32]VInfo, applyPurging bool) error {
+func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVInfo map[string]map[uint32]VInfo, applyPurging bool, modifyFrom, cutoffFrom uint64) error {
var totalInUseCount, totalOrphanChunkCount, totalOrphanDataSize uint64
volumeIdOrphanFileIds := make(map[uint32]map[string]bool)
@@ -298,7 +299,7 @@ func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVIn
serverReplicas := make(map[uint32][]pb.ServerAddress)
for dataNodeId, volumeIdToVInfo := range dataNodeVolumeIdToVInfo {
for volumeId, vinfo := range volumeIdToVInfo {
- inUseCount, orphanFileIds, orphanDataSize, checkErr := c.oneVolumeFileIdsSubtractFilerFileIds(dataNodeId, volumeId, &vinfo)
+ inUseCount, orphanFileIds, orphanDataSize, checkErr := c.oneVolumeFileIdsSubtractFilerFileIds(dataNodeId, volumeId, &vinfo, modifyFrom, cutoffFrom)
if checkErr != nil {
return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, checkErr)
}
@@ -394,7 +395,7 @@ func (c *commandVolumeFsck) findExtraChunksInVolumeServers(dataNodeVolumeIdToVIn
return nil
}
-func (c *commandVolumeFsck) collectOneVolumeFileIds(dataNodeId string, volumeId uint32, vinfo VInfo, modifyFrom uint64, cutoffFrom uint64) error {
+func (c *commandVolumeFsck) collectOneVolumeFileIds(dataNodeId string, volumeId uint32, vinfo VInfo) error {
if *c.verbose {
fmt.Fprintf(c.writer, "collecting volume %d file ids from %s ...\n", volumeId, vinfo.server)
@@ -431,29 +432,6 @@ func (c *commandVolumeFsck) collectOneVolumeFileIds(dataNodeId string, volumeId
}
buf.Write(resp.FileContent)
}
- if !vinfo.isReadOnly && (modifyFrom != 0 || cutoffFrom != 0) {
- index, err := idx.FirstInvalidIndex(buf.Bytes(),
- func(key types.NeedleId, offset types.Offset, size types.Size) (bool, error) {
- resp, err := volumeServerClient.ReadNeedleMeta(context.Background(), &volume_server_pb.ReadNeedleMetaRequest{
- VolumeId: volumeId,
- NeedleId: uint64(key),
- Offset: offset.ToActualOffset(),
- Size: int32(size),
- })
- if err != nil {
- return false, fmt.Errorf("read needle meta with id %d from volume %d: %v", key, volumeId, err)
- }
- if (modifyFrom == 0 || modifyFrom <= resp.AppendAtNs) && (cutoffFrom == 0 || resp.AppendAtNs <= cutoffFrom) {
- return true, nil
- }
- return false, nil
- })
- if err != nil {
- fmt.Fprintf(c.writer, "Failed to search for last valid index on volume %d with error %v\n", volumeId, err)
- } else {
- buf.Truncate(index * types.NeedleMapEntrySize)
- }
- }
idxFilename := getVolumeFileIdFile(c.tempFolder, dataNodeId, volumeId)
err = writeToFile(buf.Bytes(), idxFilename)
if err != nil {
@@ -552,9 +530,7 @@ func (c *commandVolumeFsck) httpDelete(path util.FullPath) {
fmt.Fprintf(c.writer, "HTTP delete request error: %v\n", err)
}
- client := &http.Client{}
-
- resp, err := client.Do(req)
+ resp, err := util_http.GetGlobalHttpClient().Do(req)
if err != nil {
fmt.Fprintf(c.writer, "DELETE fetch error: %v\n", err)
}
@@ -571,7 +547,7 @@ func (c *commandVolumeFsck) httpDelete(path util.FullPath) {
}
}
-func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(dataNodeId string, volumeId uint32, vinfo *VInfo) (inUseCount uint64, orphanFileIds []string, orphanDataSize uint64, err error) {
+func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(dataNodeId string, volumeId uint32, vinfo *VInfo, modifyFrom, cutoffFrom uint64) (inUseCount uint64, orphanFileIds []string, orphanDataSize uint64, err error) {
volumeFileIdDb := needle_map.NewMemDb()
defer volumeFileIdDb.Close()
@@ -611,9 +587,30 @@ func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(dataNodeId stri
if n.Size.IsDeleted() {
return nil
}
- orphanFileIds = append(orphanFileIds, n.Key.FileId(volumeId))
- orphanFileCount++
- orphanDataSize += uint64(n.Size)
+ if cutoffFrom > 0 || modifyFrom > 0 {
+ return operation.WithVolumeServerClient(false, vinfo.server, c.env.option.GrpcDialOption,
+ func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ resp, err := volumeServerClient.ReadNeedleMeta(context.Background(), &volume_server_pb.ReadNeedleMetaRequest{
+ VolumeId: volumeId,
+ NeedleId: types.NeedleIdToUint64(n.Key),
+ Offset: n.Offset.ToActualOffset(),
+ Size: int32(n.Size),
+ })
+ if err != nil {
+ return fmt.Errorf("read needle meta with id %d from volume %d: %v", n.Key, volumeId, err)
+ }
+ if (modifyFrom == 0 || modifyFrom <= resp.AppendAtNs) && (cutoffFrom == 0 || resp.AppendAtNs <= cutoffFrom) {
+ orphanFileIds = append(orphanFileIds, n.Key.FileId(volumeId))
+ orphanFileCount++
+ orphanDataSize += uint64(n.Size)
+ }
+ return nil
+ })
+ } else {
+ orphanFileIds = append(orphanFileIds, n.Key.FileId(volumeId))
+ orphanFileCount++
+ orphanDataSize += uint64(n.Size)
+ }
return nil
}); err != nil {
err = fmt.Errorf("failed to AscendingVisit %+v", err)
@@ -697,7 +694,7 @@ func (c *commandVolumeFsck) purgeFileIdsForOneVolume(volumeId uint32, fileIds []
go func(server pb.ServerAddress, fidList []string) {
defer wg.Done()
- if deleteResults, deleteErr := operation.DeleteFilesAtOneVolumeServer(server, c.env.option.GrpcDialOption, fidList, false); deleteErr != nil {
+ if deleteResults, deleteErr := operation.DeleteFileIdsAtOneVolumeServer(server, c.env.option.GrpcDialOption, fidList, false); deleteErr != nil {
err = deleteErr
} else if deleteResults != nil {
resultChan <- deleteResults
diff --git a/weed/shell/command_volume_grow.go b/weed/shell/command_volume_grow.go
new file mode 100644
index 000000000..21d98dddd
--- /dev/null
+++ b/weed/shell/command_volume_grow.go
@@ -0,0 +1,64 @@
+package shell
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
+ "io"
+)
+
+func init() {
+ Commands = append(Commands, &commandGrow{})
+}
+
+type commandGrow struct {
+}
+
+func (c *commandGrow) Name() string {
+ return "volume.grow"
+}
+
+func (c *commandGrow) Help() string {
+ return `grow volumes
+
+ volume.grow [-collection=<collection name>] [-dataCenter=<data center name>]
+
+`
+}
+
+func (c *commandGrow) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+
+ volumeVacuumCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
+ growCount := volumeVacuumCommand.Uint("count", 2, "")
+ collection := volumeVacuumCommand.String("collection", "", "grow this collection")
+ dataCenter := volumeVacuumCommand.String("dataCenter", "", "grow volumes only from the specified data center")
+
+ if err = volumeVacuumCommand.Parse(args); err != nil {
+ return nil
+ }
+
+ assignRequest := &master_pb.AssignRequest{
+ Count: 0,
+ Collection: *collection,
+ WritableVolumeCount: uint32(*growCount),
+ }
+ if *dataCenter != "" {
+ assignRequest.DataCenter = *dataCenter
+ }
+
+ err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
+ _, err := client.Assign(context.Background(), assignRequest)
+
+ if err != nil {
+ return fmt.Errorf("Assign: %v", err)
+ }
+ return nil
+ })
+
+ if err != nil {
+ return
+ }
+
+ return nil
+}
diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go
index 57eb6fc45..bad695cd7 100644
--- a/weed/shell/command_volume_server_evacuate.go
+++ b/weed/shell/command_volume_server_evacuate.go
@@ -3,14 +3,15 @@ package shell
import (
"flag"
"fmt"
+ "io"
+ "os"
+
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"golang.org/x/exp/slices"
- "io"
- "os"
)
func init() {
@@ -219,7 +220,7 @@ func moveAwayOneNormalVolume(commandEnv *CommandEnv, volumeReplicas map[uint32][
})
for i := 0; i < len(otherNodes); i++ {
emptyNode := otherNodes[i]
- if freeVolumeCountfn(emptyNode.info) < 0 {
+ if freeVolumeCountfn(emptyNode.info) <= 0 {
continue
}
hasMoved, err = maybeMoveOneVolume(commandEnv, volumeReplicas, thisNode, vol, emptyNode, applyChange)
diff --git a/weed/shell/command_volume_tier_upload.go b/weed/shell/command_volume_tier_upload.go
index 6932317ab..cb805b0cf 100644
--- a/weed/shell/command_volume_tier_upload.go
+++ b/weed/shell/command_volume_tier_upload.go
@@ -113,11 +113,14 @@ func doVolumeTierUpload(commandEnv *CommandEnv, writer io.Writer, collection str
return fmt.Errorf("copy dat file for volume %d on %s to %s: %v", vid, existingLocations[0].Url, dest, err)
}
+ if keepLocalDatFile {
+ return nil
+ }
// now the first replica has the .idx and .vif files.
// ask replicas on other volume server to delete its own local copy
for i, location := range existingLocations {
if i == 0 {
- break
+ continue
}
fmt.Printf("delete volume %d from %s\n", vid, location.Url)
err = deleteVolume(commandEnv.option.GrpcDialOption, vid, location.ServerAddress(), false)