diff options
Diffstat (limited to 'weed/shell')
| -rw-r--r-- | weed/shell/command_collection_list.go | 21 | ||||
| -rw-r--r-- | weed/shell/command_ec_encode_test.go | 3 | ||||
| -rw-r--r-- | weed/shell/command_fs_configure.go | 2 | ||||
| -rw-r--r-- | weed/shell/command_s3_bucket_list.go | 4 | ||||
| -rw-r--r-- | weed/shell/command_s3_bucket_quota_check.go | 12 | ||||
| -rw-r--r-- | weed/shell/command_volume_fsck.go | 147 | ||||
| -rw-r--r-- | weed/shell/command_volume_vacuum.go | 2 | ||||
| -rw-r--r-- | weed/shell/commands.go | 2 |
8 files changed, 127 insertions, 66 deletions
diff --git a/weed/shell/command_collection_list.go b/weed/shell/command_collection_list.go index 55fd6d9b9..47a4da553 100644 --- a/weed/shell/command_collection_list.go +++ b/weed/shell/command_collection_list.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" "io" ) @@ -23,10 +24,10 @@ func (c *commandCollectionList) Help() string { } type CollectionInfo struct { - FileCount uint64 - DeleteCount uint64 - DeletedByteCount uint64 - Size uint64 + FileCount float64 + DeleteCount float64 + DeletedByteCount float64 + Size float64 VolumeCount int } @@ -52,7 +53,7 @@ func (c *commandCollectionList) Do(args []string, commandEnv *CommandEnv, writer if !found { continue } - fmt.Fprintf(writer, "collection:\"%s\"\tvolumeCount:%d\tsize:%d\tfileCount:%d\tdeletedBytes:%d\tdeletion:%d\n", c, cif.VolumeCount, cif.Size, cif.FileCount, cif.DeletedByteCount, cif.DeleteCount) + fmt.Fprintf(writer, "collection:\"%s\"\tvolumeCount:%d\tsize:%.0f\tfileCount:%.0f\tdeletedBytes:%.0f\tdeletion:%.0f\n", c, cif.VolumeCount, cif.Size, cif.FileCount, cif.DeletedByteCount, cif.DeleteCount) } fmt.Fprintf(writer, "Total %d collections.\n", len(collections)) @@ -85,10 +86,12 @@ func addToCollection(collectionInfos map[string]*CollectionInfo, vif *master_pb. cif = &CollectionInfo{} collectionInfos[c] = cif } - cif.Size += vif.Size - cif.DeleteCount += vif.DeleteCount - cif.FileCount += vif.FileCount - cif.DeletedByteCount += vif.DeletedByteCount + replicaPlacement, _ := super_block.NewReplicaPlacementFromByte(byte(vif.ReplicaPlacement)) + copyCount := float64(replicaPlacement.GetCopyCount()) + cif.Size += float64(vif.Size) / copyCount + cif.DeleteCount += float64(vif.DeleteCount) / copyCount + cif.FileCount += float64(vif.FileCount) / copyCount + cif.DeletedByteCount += float64(vif.DeletedByteCount) / copyCount cif.VolumeCount++ } diff --git a/weed/shell/command_ec_encode_test.go b/weed/shell/command_ec_encode_test.go index d5e341e5b..940c64266 100644 --- a/weed/shell/command_ec_encode_test.go +++ b/weed/shell/command_ec_encode_test.go @@ -24,7 +24,8 @@ func TestEcDistribution(t *testing.T) { } for _, dn := range allocatedDataNodes { - fmt.Printf("info %+v %+v\n", dn.info, dn) + // fmt.Printf("info %+v %+v\n", dn.info, dn) + fmt.Printf("=> %+v %+v\n", dn.info.Id, dn.freeEcSlot) } } diff --git a/weed/shell/command_fs_configure.go b/weed/shell/command_fs_configure.go index 73bb8e5c6..068e83a5b 100644 --- a/weed/shell/command_fs_configure.go +++ b/weed/shell/command_fs_configure.go @@ -62,7 +62,7 @@ func (c *commandFsConfigure) Do(args []string, commandEnv *CommandEnv, writer io isDelete := fsConfigureCommand.Bool("delete", false, "delete the configuration by locationPrefix") apply := fsConfigureCommand.Bool("apply", false, "update and apply filer configuration") if err = fsConfigureCommand.Parse(args); err != nil { - return nil + return err } fc, err := filer.ReadFilerConf(commandEnv.option.FilerAddress, commandEnv.option.GrpcDialOption, commandEnv.MasterClient) diff --git a/weed/shell/command_s3_bucket_list.go b/weed/shell/command_s3_bucket_list.go index 65297e239..cd5a92888 100644 --- a/weed/shell/command_s3_bucket_list.go +++ b/weed/shell/command_s3_bucket_list.go @@ -58,12 +58,12 @@ func (c *commandS3BucketList) Do(args []string, commandEnv *CommandEnv, writer i return nil } collection := entry.Name - var collectionSize, fileCount uint64 + var collectionSize, fileCount float64 if collectionInfo, found := collectionInfos[collection]; found { collectionSize = collectionInfo.Size fileCount = collectionInfo.FileCount - collectionInfo.DeleteCount } - fmt.Fprintf(writer, " %s\tsize:%d\tfile:%d", entry.Name, collectionSize, fileCount) + fmt.Fprintf(writer, " %s\tsize:%.0f\tfile:%.0f", entry.Name, collectionSize, fileCount) if entry.Quota > 0 { fmt.Fprintf(writer, "\tquota:%d\tusage:%.2f%%", entry.Quota, float64(collectionSize)*100/float64(entry.Quota)) } diff --git a/weed/shell/command_s3_bucket_quota_check.go b/weed/shell/command_s3_bucket_quota_check.go index 066ef6909..0e0665cc6 100644 --- a/weed/shell/command_s3_bucket_quota_check.go +++ b/weed/shell/command_s3_bucket_quota_check.go @@ -65,7 +65,7 @@ func (c *commandS3BucketQuotaEnforce) Do(args []string, commandEnv *CommandEnv, return nil } collection := entry.Name - var collectionSize uint64 + var collectionSize float64 if collectionInfo, found := collectionInfos[collection]; found { collectionSize = collectionInfo.Size } @@ -95,7 +95,7 @@ func (c *commandS3BucketQuotaEnforce) Do(args []string, commandEnv *CommandEnv, } -func (c *commandS3BucketQuotaEnforce) processEachBucket(fc *filer.FilerConf, filerBucketsPath string, entry *filer_pb.Entry, writer io.Writer, collectionSize uint64) (hasConfChanges bool) { +func (c *commandS3BucketQuotaEnforce) processEachBucket(fc *filer.FilerConf, filerBucketsPath string, entry *filer_pb.Entry, writer io.Writer, collectionSize float64) (hasConfChanges bool) { locPrefix := filerBucketsPath + "/" + entry.Name + "/" locConf := fc.MatchStorageRule(locPrefix) @@ -103,12 +103,12 @@ func (c *commandS3BucketQuotaEnforce) processEachBucket(fc *filer.FilerConf, fil if entry.Quota > 0 { if locConf.ReadOnly { - if collectionSize < uint64(entry.Quota) { + if collectionSize < float64(entry.Quota) { locConf.ReadOnly = false hasConfChanges = true } } else { - if collectionSize > uint64(entry.Quota) { + if collectionSize > float64(entry.Quota) { locConf.ReadOnly = true hasConfChanges = true } @@ -121,8 +121,8 @@ func (c *commandS3BucketQuotaEnforce) processEachBucket(fc *filer.FilerConf, fil } if hasConfChanges { - fmt.Fprintf(writer, " %s\tsize:%d", entry.Name, collectionSize) - fmt.Fprintf(writer, "\tquota:%d\tusage:%.2f%%", entry.Quota, float64(collectionSize)*100/float64(entry.Quota)) + fmt.Fprintf(writer, " %s\tsize:%.0f", entry.Name, collectionSize) + fmt.Fprintf(writer, "\tquota:%d\tusage:%.2f%%", entry.Quota, collectionSize*100/float64(entry.Quota)) fmt.Fprintln(writer) if locConf.ReadOnly { fmt.Fprintf(writer, " changing bucket %s to read only!\n", entry.Name) diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go index e6adf043d..1b3d7bf0d 100644 --- a/weed/shell/command_volume_fsck.go +++ b/weed/shell/command_volume_fsck.go @@ -6,7 +6,10 @@ import ( "flag" "fmt" "io" + "io/ioutil" "math" + "net/http" + "net/url" "os" "path/filepath" "sync" @@ -61,7 +64,8 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io. verbose := fsckCommand.Bool("v", false, "verbose mode") findMissingChunksInFiler := fsckCommand.Bool("findMissingChunksInFiler", false, "see \"help volume.fsck\"") findMissingChunksInFilerPath := fsckCommand.String("findMissingChunksInFilerPath", "/", "used together with findMissingChunksInFiler") - applyPurging := fsckCommand.Bool("reallyDeleteFromVolume", false, "<expert only> delete data not referenced by the filer") + applyPurging := fsckCommand.Bool("reallyDeleteFromVolume", false, "<expert only!> after detection, delete missing data from volumes / delete missing file entries from filer") + purgeAbsent := fsckCommand.Bool("reallyDeleteFilerEntries", false, "<expert only!> delete missing file entries from filer if the corresponding volume is missing for any reason, please ensure all still existing/expected volumes are connected! used together with findMissingChunksInFiler") if err = fsckCommand.Parse(args); err != nil { return nil } @@ -98,20 +102,20 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io. if *findMissingChunksInFiler { // collect all filer file ids and paths - if err = c.collectFilerFileIdAndPaths(volumeIdToVInfo, tempFolder, writer, *findMissingChunksInFilerPath, *verbose, applyPurging); err != nil { + if err = c.collectFilerFileIdAndPaths(volumeIdToVInfo, tempFolder, writer, *findMissingChunksInFilerPath, *verbose, *purgeAbsent); err != nil { return fmt.Errorf("collectFilerFileIdAndPaths: %v", err) } // for each volume, check filer file ids - if err = c.findFilerChunksMissingInVolumeServers(volumeIdToVInfo, tempFolder, writer, *verbose, applyPurging); err != nil { + if err = c.findFilerChunksMissingInVolumeServers(volumeIdToVInfo, tempFolder, writer, *verbose, *applyPurging); err != nil { return fmt.Errorf("findFilerChunksMissingInVolumeServers: %v", err) } } else { // collect all filer file ids - if err = c.collectFilerFileIds(tempFolder, volumeIdToVInfo, *verbose, writer); err != nil { + if err = c.collectFilerFileIds(volumeIdToVInfo, tempFolder, writer, *verbose); err != nil { return fmt.Errorf("failed to collect file ids from filer: %v", err) } - // volume file ids substract filer file ids - if err = c.findExtraChunksInVolumeServers(volumeIdToVInfo, tempFolder, writer, *verbose, applyPurging); err != nil { + // volume file ids subtract filer file ids + if err = c.findExtraChunksInVolumeServers(volumeIdToVInfo, tempFolder, writer, *verbose, *applyPurging); err != nil { return fmt.Errorf("findExtraChunksInVolumeServers: %v", err) } } @@ -119,7 +123,7 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io. return nil } -func (c *commandVolumeFsck) collectFilerFileIdAndPaths(volumeIdToServer map[uint32]VInfo, tempFolder string, writer io.Writer, filerPath string, verbose bool, applyPurging *bool) error { +func (c *commandVolumeFsck) collectFilerFileIdAndPaths(volumeIdToServer map[uint32]VInfo, tempFolder string, writer io.Writer, filerPath string, verbose bool, purgeAbsent bool) error { if verbose { fmt.Fprintf(writer, "checking each file from filer ...\n") @@ -149,12 +153,12 @@ func (c *commandVolumeFsck) collectFilerFileIdAndPaths(volumeIdToServer map[uint if verbose && entry.Entry.IsDirectory { fmt.Fprintf(writer, "checking directory %s\n", util.NewFullPath(entry.Dir, entry.Entry.Name)) } - dChunks, mChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.Chunks, 0, math.MaxInt64) + dataChunks, manifestChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.Chunks, 0, math.MaxInt64) if resolveErr != nil { return nil } - dChunks = append(dChunks, mChunks...) - for _, chunk := range dChunks { + dataChunks = append(dataChunks, manifestChunks...) + for _, chunk := range dataChunks { outputChan <- &Item{ vid: chunk.Fid.VolumeId, fileKey: chunk.Fid.FileKey, @@ -176,16 +180,20 @@ func (c *commandVolumeFsck) collectFilerFileIdAndPaths(volumeIdToServer map[uint // fmt.Fprintf(writer, "%d,%x%08x %d %s\n", i.vid, i.fileKey, i.cookie, len(i.path), i.path) } else { fmt.Fprintf(writer, "%d,%x%08x %s volume not found\n", i.vid, i.fileKey, i.cookie, i.path) + if purgeAbsent { + fmt.Printf("deleting path %s after volume not found", i.path) + c.httpDelete(i.path, verbose) + } } } }) } -func (c *commandVolumeFsck) findFilerChunksMissingInVolumeServers(volumeIdToVInfo map[uint32]VInfo, tempFolder string, writer io.Writer, verbose bool, applyPurging *bool) error { +func (c *commandVolumeFsck) findFilerChunksMissingInVolumeServers(volumeIdToVInfo map[uint32]VInfo, tempFolder string, writer io.Writer, verbose bool, applyPurging bool) error { for volumeId, vinfo := range volumeIdToVInfo { - checkErr := c.oneVolumeFileIdsCheckOneVolume(tempFolder, volumeId, writer, verbose) + checkErr := c.oneVolumeFileIdsCheckOneVolume(tempFolder, volumeId, writer, verbose, applyPurging) if checkErr != nil { return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, checkErr) } @@ -193,8 +201,10 @@ func (c *commandVolumeFsck) findFilerChunksMissingInVolumeServers(volumeIdToVInf return nil } -func (c *commandVolumeFsck) findExtraChunksInVolumeServers(volumeIdToVInfo map[uint32]VInfo, tempFolder string, writer io.Writer, verbose bool, applyPurging *bool) error { +func (c *commandVolumeFsck) findExtraChunksInVolumeServers(volumeIdToVInfo map[uint32]VInfo, tempFolder string, writer io.Writer, verbose bool, applyPurging bool) error { + var totalInUseCount, totalOrphanChunkCount, totalOrphanDataSize uint64 + for volumeId, vinfo := range volumeIdToVInfo { inUseCount, orphanFileIds, orphanDataSize, checkErr := c.oneVolumeFileIdsSubtractFilerFileIds(tempFolder, volumeId, writer, verbose) if checkErr != nil { @@ -210,39 +220,53 @@ func (c *commandVolumeFsck) findExtraChunksInVolumeServers(volumeIdToVInfo map[u } } - if *applyPurging && len(orphanFileIds) > 0 { + if applyPurging && len(orphanFileIds) > 0 { + if verbose { + fmt.Fprintf(writer, "purging process for volume %d", volumeId) + } + if vinfo.isEcVolume { - fmt.Fprintf(writer, "Skip purging for Erasure Coded volume %d.\n", volumeId) + fmt.Fprintf(writer, "skip purging for Erasure Coded volume %d.\n", volumeId) continue } + + needleVID := needle.VolumeId(volumeId) + if vinfo.isReadOnly { - fmt.Fprintf(writer, "Skip purging for read only volume %d.\n", volumeId) - continue - } - if inUseCount == 0 { - if err := deleteVolume(c.env.option.GrpcDialOption, needle.VolumeId(volumeId), vinfo.server); err != nil { - return fmt.Errorf("delete volume %d: %v", volumeId, err) - } - } else { - if err := c.purgeFileIdsForOneVolume(volumeId, orphanFileIds, writer); err != nil { - return fmt.Errorf("purge for volume %d: %v", volumeId, err) + err := markVolumeWritable(c.env.option.GrpcDialOption, needleVID, vinfo.server, true) + if err != nil { + return fmt.Errorf("mark volume %d read/write: %v", volumeId, err) } + + fmt.Fprintf(writer, "temporarily marked %d on server %v writable for forced purge\n", volumeId, vinfo.server) + defer markVolumeWritable(c.env.option.GrpcDialOption, needleVID, vinfo.server, false) } - } - } - if totalOrphanChunkCount == 0 { - fmt.Fprintf(writer, "no orphan data\n") - return nil + fmt.Fprintf(writer, "marked %d on server %v writable for forced purge\n", volumeId, vinfo.server) + + if verbose { + fmt.Fprintf(writer, "purging files from volume %d\n", volumeId) + } + + if err := c.purgeFileIdsForOneVolume(volumeId, orphanFileIds, writer); err != nil { + return fmt.Errorf("purging volume %d: %v", volumeId, err) + } + } } - if !*applyPurging { + if !applyPurging { pct := float64(totalOrphanChunkCount*100) / (float64(totalOrphanChunkCount + totalInUseCount)) fmt.Fprintf(writer, "\nTotal\t\tentries:%d\torphan:%d\t%.2f%%\t%dB\n", totalOrphanChunkCount+totalInUseCount, totalOrphanChunkCount, pct, totalOrphanDataSize) fmt.Fprintf(writer, "This could be normal if multiple filers or no filers are used.\n") } + + if totalOrphanChunkCount == 0 { + fmt.Fprintf(writer, "no orphan data\n") + //return nil + } + return nil } @@ -283,7 +307,7 @@ func (c *commandVolumeFsck) collectOneVolumeFileIds(tempFolder string, volumeId } -func (c *commandVolumeFsck) collectFilerFileIds(tempFolder string, volumeIdToServer map[uint32]VInfo, verbose bool, writer io.Writer) error { +func (c *commandVolumeFsck) collectFilerFileIds(volumeIdToServer map[uint32]VInfo, tempFolder string, writer io.Writer, verbose bool) error { if verbose { fmt.Fprintf(writer, "collecting file ids from filer ...\n") @@ -308,15 +332,15 @@ func (c *commandVolumeFsck) collectFilerFileIds(tempFolder string, volumeIdToSer fileKey uint64 } return doTraverseBfsAndSaving(c.env, nil, "/", false, func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) { - dChunks, mChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.Chunks, 0, math.MaxInt64) + dataChunks, manifestChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.Chunks, 0, math.MaxInt64) if resolveErr != nil { if verbose { fmt.Fprintf(writer, "resolving manifest chunks in %s: %v\n", util.NewFullPath(entry.Dir, entry.Entry.Name), resolveErr) } return nil } - dChunks = append(dChunks, mChunks...) - for _, chunk := range dChunks { + dataChunks = append(dataChunks, manifestChunks...) + for _, chunk := range dataChunks { outputChan <- &Item{ vid: chunk.Fid.VolumeId, fileKey: chunk.Fid.FileKey, @@ -333,10 +357,10 @@ func (c *commandVolumeFsck) collectFilerFileIds(tempFolder string, volumeIdToSer }) } -func (c *commandVolumeFsck) oneVolumeFileIdsCheckOneVolume(tempFolder string, volumeId uint32, writer io.Writer, verbose bool) (err error) { +func (c *commandVolumeFsck) oneVolumeFileIdsCheckOneVolume(tempFolder string, volumeId uint32, writer io.Writer, verbose bool, applyPurging bool) (err error) { if verbose { - fmt.Fprintf(writer, "find missing file chuns in volume %d ...\n", volumeId) + fmt.Fprintf(writer, "find missing file chunks in volume %d ...\n", volumeId) } db := needle_map.NewMemDb() @@ -366,11 +390,7 @@ func (c *commandVolumeFsck) oneVolumeFileIdsCheckOneVolume(tempFolder string, vo for { readSize, err = io.ReadFull(br, buffer) if err != nil || readSize != 16 { - if err == io.EOF { - return nil - } else { - break - } + break } item.fileKey = util.BytesToUint64(buffer[:8]) @@ -386,14 +406,51 @@ func (c *commandVolumeFsck) oneVolumeFileIdsCheckOneVolume(tempFolder string, vo } item.path = util.FullPath(string(pathBytes)) - if _, found := db.Get(types.NeedleId(item.fileKey)); !found { - fmt.Fprintf(writer, "%d,%x%08x in %s %d not found\n", volumeId, item.fileKey, item.cookie, item.path, pathSize) + needleId := types.NeedleId(item.fileKey) + if _, found := db.Get(needleId); !found { + fmt.Fprintf(writer, "%s\n", item.path) + + if applyPurging { + // defining the URL this way automatically escapes complex path names + c.httpDelete(item.path, verbose) + } } + } + return nil +} + +func (c *commandVolumeFsck) httpDelete(path util.FullPath, verbose bool) { + req, err := http.NewRequest(http.MethodDelete, "", nil) + req.URL = &url.URL{ + Scheme: "http", + Host: c.env.option.FilerAddress.ToHttpAddress(), + Path: string(path), + } + if verbose { + fmt.Printf("full HTTP delete request to be sent: %v\n", req) + } + if err != nil { + fmt.Errorf("HTTP delete request error: %v\n", err) } - return + client := &http.Client{} + + resp, err := client.Do(req) + if err != nil { + fmt.Errorf("DELETE fetch error: %v\n", err) + } + defer resp.Body.Close() + + _, err = ioutil.ReadAll(resp.Body) + if err != nil { + fmt.Errorf("DELETE response error: %v\n", err) + } + if verbose { + fmt.Println("delete response Status : ", resp.Status) + fmt.Println("delete response Headers : ", resp.Header) + } } func (c *commandVolumeFsck) oneVolumeFileIdsSubtractFilerFileIds(tempFolder string, volumeId uint32, writer io.Writer, verbose bool) (inUseCount uint64, orphanFileIds []string, orphanDataSize uint64, err error) { diff --git a/weed/shell/command_volume_vacuum.go b/weed/shell/command_volume_vacuum.go index a09bf5d56..61b1f06fa 100644 --- a/weed/shell/command_volume_vacuum.go +++ b/weed/shell/command_volume_vacuum.go @@ -32,7 +32,7 @@ func (c *commandVacuum) Do(args []string, commandEnv *CommandEnv, writer io.Writ volumeVacuumCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) garbageThreshold := volumeVacuumCommand.Float64("garbageThreshold", 0.3, "vacuum when garbage is more than this limit") if err = volumeVacuumCommand.Parse(args); err != nil { - return nil + return } if err = commandEnv.confirmIsLocked(args); err != nil { diff --git a/weed/shell/commands.go b/weed/shell/commands.go index ec71edee0..3ff49f1d2 100644 --- a/weed/shell/commands.go +++ b/weed/shell/commands.go @@ -46,7 +46,7 @@ var ( func NewCommandEnv(options *ShellOptions) *CommandEnv { ce := &CommandEnv{ env: make(map[string]string), - MasterClient: wdclient.NewMasterClient(options.GrpcDialOption, pb.AdminShellClient, "", "", pb.ServerAddresses(*options.Masters).ToAddresses()), + MasterClient: wdclient.NewMasterClient(options.GrpcDialOption, pb.AdminShellClient, "", "", pb.ServerAddresses(*options.Masters).ToAddressMap()), option: options, } ce.locker = exclusive_locks.NewExclusiveLocker(ce.MasterClient, "admin") |
