diff options
| author | chrislu <chris.lu@gmail.com> | 2024-06-04 21:12:30 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2024-06-04 21:12:30 -0700 |
| commit | 9f02bf4ede081d59dd12f5203518e61e689b1ddb (patch) | |
| tree | 2cba26b062dfdb2dcd58f475bb736da68287405e /weed | |
| parent | 41cc825ddbeaea3659e6899f1753be83554ae9fe (diff) | |
| parent | 1f2dc54647f8ab84f2e8e9cd9aa3632760cee869 (diff) | |
| download | seaweedfs-9f02bf4ede081d59dd12f5203518e61e689b1ddb.tar.xz seaweedfs-9f02bf4ede081d59dd12f5203518e61e689b1ddb.zip | |
Merge branch 'master' into mq
Diffstat (limited to 'weed')
| -rw-r--r-- | weed/s3api/s3api_object_handlers_postpolicy.go | 4 | ||||
| -rw-r--r-- | weed/server/master_grpc_server.go | 8 | ||||
| -rw-r--r-- | weed/shell/command_ec_encode.go | 4 | ||||
| -rw-r--r-- | weed/shell/command_volume_fsck.go | 51 | ||||
| -rw-r--r-- | weed/shell/command_volume_tier_upload.go | 6 | ||||
| -rw-r--r-- | weed/stats/disk_common.go | 17 | ||||
| -rw-r--r-- | weed/stats/disk_notsupported.go | 4 | ||||
| -rw-r--r-- | weed/stats/disk_openbsd.go | 6 | ||||
| -rw-r--r-- | weed/stats/disk_solaris.go | 24 | ||||
| -rw-r--r-- | weed/stats/disk_windows.go | 4 | ||||
| -rw-r--r-- | weed/storage/needle_map_sorted_file.go | 2 |
11 files changed, 102 insertions, 28 deletions
diff --git a/weed/s3api/s3api_object_handlers_postpolicy.go b/weed/s3api/s3api_object_handlers_postpolicy.go index cd80e0ad3..e77d734ac 100644 --- a/weed/s3api/s3api_object_handlers_postpolicy.go +++ b/weed/s3api/s3api_object_handlers_postpolicy.go @@ -166,8 +166,10 @@ func (s3a *S3ApiServer) PostPolicyBucketHandler(w http.ResponseWriter, r *http.R s3err.PostLog(r, http.StatusCreated, s3err.ErrNone) case "200": s3err.WriteEmptyResponse(w, r, http.StatusOK) + case "204": + s3err.WriteEmptyResponse(w, r, http.StatusNoContent) default: - writeSuccessResponseEmpty(w, r) + s3err.WriteEmptyResponse(w, r, http.StatusNoContent) } } diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index b989da424..91f69fef4 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -292,6 +292,12 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ _, err := stream.Recv() if err != nil { glog.V(2).Infof("- client %v: %v", clientName, err) + go func() { + // consume message chan to avoid deadlock, go routine exit when message chan is closed + for range messageChan { + // no op + } + }() close(stopChan) return } @@ -367,6 +373,8 @@ func (ms *MasterServer) addClient(filerGroup, clientType string, clientAddress p func (ms *MasterServer) deleteClient(clientName string) { glog.V(0).Infof("- client %v", clientName) ms.clientChansLock.Lock() + // close message chan, so that the KeepConnected go routine can exit + close(ms.clientChans[clientName]) delete(ms.clientChans, clientName) ms.clientChansLock.Unlock() } diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go index c2b2074e4..16de2ce73 100644 --- a/weed/shell/command_ec_encode.go +++ b/weed/shell/command_ec_encode.go @@ -304,6 +304,10 @@ func collectVolumeIdsForEcEncode(commandEnv *CommandEnv, selectedCollection stri eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) { for _, diskInfo := range dn.DiskInfos { for _, v := range diskInfo.VolumeInfos { + // ignore remote volumes + if v.RemoteStorageName != "" && v.RemoteStorageKey != "" { + continue + } if v.Collection == selectedCollection && v.ModifiedAtSecond+quietSeconds < nowUnixSeconds { if float64(v.Size) > fullPercentage/100*float64(volumeSizeLimitMb)*1024*1024 { vidMap[v.Id] = true diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go index 8916e90bd..d85a9e13f 100644 --- a/weed/shell/command_volume_fsck.go +++ b/weed/shell/command_volume_fsck.go @@ -19,6 +19,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/storage/needle_map" "github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/util" + "golang.org/x/sync/errgroup" "io" "math" "net/http" @@ -137,32 +138,46 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io. return fmt.Errorf("read filer buckets path: %v", err) } - collectCutoffFromAtNs := time.Now().Add(-*cutoffTimeAgo).UnixNano() + var collectCutoffFromAtNs int64 = 0 + if cutoffTimeAgo.Seconds() != 0 { + collectCutoffFromAtNs = time.Now().Add(-*cutoffTimeAgo).UnixNano() + } var collectModifyFromAtNs int64 = 0 if modifyTimeAgo.Seconds() != 0 { collectModifyFromAtNs = time.Now().Add(-*modifyTimeAgo).UnixNano() } // collect each volume file ids - for dataNodeId, volumeIdToVInfo := range dataNodeVolumeIdToVInfo { - for volumeId, vinfo := range volumeIdToVInfo { - if len(c.volumeIds) > 0 { - if _, ok := c.volumeIds[volumeId]; !ok { + eg, gCtx := errgroup.WithContext(context.Background()) + _ = gCtx + for _dataNodeId, _volumeIdToVInfo := range dataNodeVolumeIdToVInfo { + dataNodeId, volumeIdToVInfo := _dataNodeId, _volumeIdToVInfo + eg.Go(func() error { + for volumeId, vinfo := range volumeIdToVInfo { + if len(c.volumeIds) > 0 { + if _, ok := c.volumeIds[volumeId]; !ok { + delete(volumeIdToVInfo, volumeId) + continue + } + } + if *c.collection != "" && vinfo.collection != *c.collection { delete(volumeIdToVInfo, volumeId) continue } + err = c.collectOneVolumeFileIds(dataNodeId, volumeId, vinfo, uint64(collectModifyFromAtNs), uint64(collectCutoffFromAtNs)) + if err != nil { + return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, err) + } } - if *c.collection != "" && vinfo.collection != *c.collection { - delete(volumeIdToVInfo, volumeId) - continue - } - err = c.collectOneVolumeFileIds(dataNodeId, volumeId, vinfo, uint64(collectModifyFromAtNs), uint64(collectCutoffFromAtNs)) - if err != nil { - return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, err) + if *c.verbose { + fmt.Fprintf(c.writer, "dn %+v filtred %d volumes and locations.\n", dataNodeId, len(dataNodeVolumeIdToVInfo[dataNodeId])) } - } - if *c.verbose { - fmt.Fprintf(c.writer, "dn %+v filtred %d volumes and locations.\n", dataNodeId, len(dataNodeVolumeIdToVInfo[dataNodeId])) - } + return nil + }) + } + err = eg.Wait() + if err != nil { + fmt.Fprintf(c.writer, "got error: %v", err) + return err } if *c.findMissingChunksInFiler { @@ -416,7 +431,7 @@ func (c *commandVolumeFsck) collectOneVolumeFileIds(dataNodeId string, volumeId } buf.Write(resp.FileContent) } - if !vinfo.isReadOnly { + if !vinfo.isReadOnly && (modifyFrom != 0 || cutoffFrom != 0) { index, err := idx.FirstInvalidIndex(buf.Bytes(), func(key types.NeedleId, offset types.Offset, size types.Size) (bool, error) { resp, err := volumeServerClient.ReadNeedleMeta(context.Background(), &volume_server_pb.ReadNeedleMetaRequest{ @@ -428,7 +443,7 @@ func (c *commandVolumeFsck) collectOneVolumeFileIds(dataNodeId string, volumeId if err != nil { return false, fmt.Errorf("read needle meta with id %d from volume %d: %v", key, volumeId, err) } - if (modifyFrom == 0 || modifyFrom <= resp.AppendAtNs) && (resp.AppendAtNs <= cutoffFrom) { + if (modifyFrom == 0 || modifyFrom <= resp.AppendAtNs) && (cutoffFrom == 0 || resp.AppendAtNs <= cutoffFrom) { return true, nil } return false, nil diff --git a/weed/shell/command_volume_tier_upload.go b/weed/shell/command_volume_tier_upload.go index c109d59d8..6932317ab 100644 --- a/weed/shell/command_volume_tier_upload.go +++ b/weed/shell/command_volume_tier_upload.go @@ -139,6 +139,12 @@ func uploadDatToRemoteTier(grpcDialOption grpc.DialOption, writer io.Writer, vol KeepLocalDatFile: keepLocalDatFile, }) + if stream == nil && copyErr == nil { + // when the volume is already uploaded, VolumeTierMoveDatToRemote will return nil stream and nil error + // so we should directly return in this case + fmt.Fprintf(writer, "volume %v already uploaded", volumeId) + return nil + } var lastProcessed int64 for { resp, recvErr := stream.Recv() diff --git a/weed/stats/disk_common.go b/weed/stats/disk_common.go new file mode 100644 index 000000000..936c77e91 --- /dev/null +++ b/weed/stats/disk_common.go @@ -0,0 +1,17 @@ +package stats + +import "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" + +func calculateDiskRemaining(disk *volume_server_pb.DiskStatus) { + disk.Used = disk.All - disk.Free + + if disk.All > 0 { + disk.PercentFree = float32((float64(disk.Free) / float64(disk.All)) * 100) + disk.PercentUsed = float32((float64(disk.Used) / float64(disk.All)) * 100) + } else { + disk.PercentFree = 0 + disk.PercentUsed = 0 + } + + return +} diff --git a/weed/stats/disk_notsupported.go b/weed/stats/disk_notsupported.go index 418164546..956c7bf9f 100644 --- a/weed/stats/disk_notsupported.go +++ b/weed/stats/disk_notsupported.go @@ -1,5 +1,5 @@ -//go:build netbsd || plan9 || solaris -// +build netbsd plan9 solaris +//go:build netbsd || plan9 +// +build netbsd plan9 package stats diff --git a/weed/stats/disk_openbsd.go b/weed/stats/disk_openbsd.go index bb03126bf..1be41e5c7 100644 --- a/weed/stats/disk_openbsd.go +++ b/weed/stats/disk_openbsd.go @@ -15,10 +15,10 @@ func fillInDiskStatus(disk *volume_server_pb.DiskStatus) { if err != nil { return } + disk.All = fs.F_blocks * uint64(fs.F_bsize) disk.Free = fs.F_bfree * uint64(fs.F_bsize) - disk.Used = disk.All - disk.Free - disk.PercentFree = float32((float64(disk.Free) / float64(disk.All)) * 100) - disk.PercentUsed = float32((float64(disk.Used) / float64(disk.All)) * 100) + calculateDiskRemaining(disk) + return } diff --git a/weed/stats/disk_solaris.go b/weed/stats/disk_solaris.go new file mode 100644 index 000000000..cce20e4cc --- /dev/null +++ b/weed/stats/disk_solaris.go @@ -0,0 +1,24 @@ +//go:build solaris +// +build solaris + +package stats + +import ( + "golang.org/x/sys/unix" + + "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" +) + +func fillInDiskStatus(disk *volume_server_pb.DiskStatus) { + var stat unix.Statvfs_t + err := unix.Statvfs(disk.Dir, &stat) + if err != nil { + return + } + + disk.All = stat.Blocks * uint64(stat.Bsize) + disk.Free = stat.Bfree * uint64(stat.Bsize) + calculateDiskRemaining(disk) + + return +} diff --git a/weed/stats/disk_windows.go b/weed/stats/disk_windows.go index 16e6d3326..93c703134 100644 --- a/weed/stats/disk_windows.go +++ b/weed/stats/disk_windows.go @@ -39,9 +39,7 @@ func fillInDiskStatus(disk *volume_server_pb.DiskStatus) { return } - disk.Used = disk.All - disk.Free - disk.PercentFree = float32((float64(disk.Free) / float64(disk.All)) * 100) - disk.PercentUsed = float32((float64(disk.Used) / float64(disk.All)) * 100) + calculateDiskRemaining(disk) return } diff --git a/weed/storage/needle_map_sorted_file.go b/weed/storage/needle_map_sorted_file.go index 0433ffa0d..5bd67ea86 100644 --- a/weed/storage/needle_map_sorted_file.go +++ b/weed/storage/needle_map_sorted_file.go @@ -27,7 +27,7 @@ func NewSortedFileNeedleMap(indexBaseFileName string, indexFile *os.File) (m *So } glog.V(1).Infof("Opening %s...", fileName) - if m.dbFile, err = os.Open(indexBaseFileName + ".sdx"); err != nil { + if m.dbFile, err = os.OpenFile(indexBaseFileName+".sdx", os.O_RDWR, 0); err != nil { return } dbStat, _ := m.dbFile.Stat() |
