aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-06-04 21:12:30 -0700
committerchrislu <chris.lu@gmail.com>2024-06-04 21:12:30 -0700
commit9f02bf4ede081d59dd12f5203518e61e689b1ddb (patch)
tree2cba26b062dfdb2dcd58f475bb736da68287405e /weed
parent41cc825ddbeaea3659e6899f1753be83554ae9fe (diff)
parent1f2dc54647f8ab84f2e8e9cd9aa3632760cee869 (diff)
downloadseaweedfs-9f02bf4ede081d59dd12f5203518e61e689b1ddb.tar.xz
seaweedfs-9f02bf4ede081d59dd12f5203518e61e689b1ddb.zip
Merge branch 'master' into mq
Diffstat (limited to 'weed')
-rw-r--r--weed/s3api/s3api_object_handlers_postpolicy.go4
-rw-r--r--weed/server/master_grpc_server.go8
-rw-r--r--weed/shell/command_ec_encode.go4
-rw-r--r--weed/shell/command_volume_fsck.go51
-rw-r--r--weed/shell/command_volume_tier_upload.go6
-rw-r--r--weed/stats/disk_common.go17
-rw-r--r--weed/stats/disk_notsupported.go4
-rw-r--r--weed/stats/disk_openbsd.go6
-rw-r--r--weed/stats/disk_solaris.go24
-rw-r--r--weed/stats/disk_windows.go4
-rw-r--r--weed/storage/needle_map_sorted_file.go2
11 files changed, 102 insertions, 28 deletions
diff --git a/weed/s3api/s3api_object_handlers_postpolicy.go b/weed/s3api/s3api_object_handlers_postpolicy.go
index cd80e0ad3..e77d734ac 100644
--- a/weed/s3api/s3api_object_handlers_postpolicy.go
+++ b/weed/s3api/s3api_object_handlers_postpolicy.go
@@ -166,8 +166,10 @@ func (s3a *S3ApiServer) PostPolicyBucketHandler(w http.ResponseWriter, r *http.R
s3err.PostLog(r, http.StatusCreated, s3err.ErrNone)
case "200":
s3err.WriteEmptyResponse(w, r, http.StatusOK)
+ case "204":
+ s3err.WriteEmptyResponse(w, r, http.StatusNoContent)
default:
- writeSuccessResponseEmpty(w, r)
+ s3err.WriteEmptyResponse(w, r, http.StatusNoContent)
}
}
diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go
index b989da424..91f69fef4 100644
--- a/weed/server/master_grpc_server.go
+++ b/weed/server/master_grpc_server.go
@@ -292,6 +292,12 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ
_, err := stream.Recv()
if err != nil {
glog.V(2).Infof("- client %v: %v", clientName, err)
+ go func() {
+ // consume message chan to avoid deadlock, go routine exit when message chan is closed
+ for range messageChan {
+ // no op
+ }
+ }()
close(stopChan)
return
}
@@ -367,6 +373,8 @@ func (ms *MasterServer) addClient(filerGroup, clientType string, clientAddress p
func (ms *MasterServer) deleteClient(clientName string) {
glog.V(0).Infof("- client %v", clientName)
ms.clientChansLock.Lock()
+ // close message chan, so that the KeepConnected go routine can exit
+ close(ms.clientChans[clientName])
delete(ms.clientChans, clientName)
ms.clientChansLock.Unlock()
}
diff --git a/weed/shell/command_ec_encode.go b/weed/shell/command_ec_encode.go
index c2b2074e4..16de2ce73 100644
--- a/weed/shell/command_ec_encode.go
+++ b/weed/shell/command_ec_encode.go
@@ -304,6 +304,10 @@ func collectVolumeIdsForEcEncode(commandEnv *CommandEnv, selectedCollection stri
eachDataNode(topologyInfo, func(dc string, rack RackId, dn *master_pb.DataNodeInfo) {
for _, diskInfo := range dn.DiskInfos {
for _, v := range diskInfo.VolumeInfos {
+ // ignore remote volumes
+ if v.RemoteStorageName != "" && v.RemoteStorageKey != "" {
+ continue
+ }
if v.Collection == selectedCollection && v.ModifiedAtSecond+quietSeconds < nowUnixSeconds {
if float64(v.Size) > fullPercentage/100*float64(volumeSizeLimitMb)*1024*1024 {
vidMap[v.Id] = true
diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go
index 8916e90bd..d85a9e13f 100644
--- a/weed/shell/command_volume_fsck.go
+++ b/weed/shell/command_volume_fsck.go
@@ -19,6 +19,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/util"
+ "golang.org/x/sync/errgroup"
"io"
"math"
"net/http"
@@ -137,32 +138,46 @@ func (c *commandVolumeFsck) Do(args []string, commandEnv *CommandEnv, writer io.
return fmt.Errorf("read filer buckets path: %v", err)
}
- collectCutoffFromAtNs := time.Now().Add(-*cutoffTimeAgo).UnixNano()
+ var collectCutoffFromAtNs int64 = 0
+ if cutoffTimeAgo.Seconds() != 0 {
+ collectCutoffFromAtNs = time.Now().Add(-*cutoffTimeAgo).UnixNano()
+ }
var collectModifyFromAtNs int64 = 0
if modifyTimeAgo.Seconds() != 0 {
collectModifyFromAtNs = time.Now().Add(-*modifyTimeAgo).UnixNano()
}
// collect each volume file ids
- for dataNodeId, volumeIdToVInfo := range dataNodeVolumeIdToVInfo {
- for volumeId, vinfo := range volumeIdToVInfo {
- if len(c.volumeIds) > 0 {
- if _, ok := c.volumeIds[volumeId]; !ok {
+ eg, gCtx := errgroup.WithContext(context.Background())
+ _ = gCtx
+ for _dataNodeId, _volumeIdToVInfo := range dataNodeVolumeIdToVInfo {
+ dataNodeId, volumeIdToVInfo := _dataNodeId, _volumeIdToVInfo
+ eg.Go(func() error {
+ for volumeId, vinfo := range volumeIdToVInfo {
+ if len(c.volumeIds) > 0 {
+ if _, ok := c.volumeIds[volumeId]; !ok {
+ delete(volumeIdToVInfo, volumeId)
+ continue
+ }
+ }
+ if *c.collection != "" && vinfo.collection != *c.collection {
delete(volumeIdToVInfo, volumeId)
continue
}
+ err = c.collectOneVolumeFileIds(dataNodeId, volumeId, vinfo, uint64(collectModifyFromAtNs), uint64(collectCutoffFromAtNs))
+ if err != nil {
+ return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, err)
+ }
}
- if *c.collection != "" && vinfo.collection != *c.collection {
- delete(volumeIdToVInfo, volumeId)
- continue
- }
- err = c.collectOneVolumeFileIds(dataNodeId, volumeId, vinfo, uint64(collectModifyFromAtNs), uint64(collectCutoffFromAtNs))
- if err != nil {
- return fmt.Errorf("failed to collect file ids from volume %d on %s: %v", volumeId, vinfo.server, err)
+ if *c.verbose {
+ fmt.Fprintf(c.writer, "dn %+v filtred %d volumes and locations.\n", dataNodeId, len(dataNodeVolumeIdToVInfo[dataNodeId]))
}
- }
- if *c.verbose {
- fmt.Fprintf(c.writer, "dn %+v filtred %d volumes and locations.\n", dataNodeId, len(dataNodeVolumeIdToVInfo[dataNodeId]))
- }
+ return nil
+ })
+ }
+ err = eg.Wait()
+ if err != nil {
+ fmt.Fprintf(c.writer, "got error: %v", err)
+ return err
}
if *c.findMissingChunksInFiler {
@@ -416,7 +431,7 @@ func (c *commandVolumeFsck) collectOneVolumeFileIds(dataNodeId string, volumeId
}
buf.Write(resp.FileContent)
}
- if !vinfo.isReadOnly {
+ if !vinfo.isReadOnly && (modifyFrom != 0 || cutoffFrom != 0) {
index, err := idx.FirstInvalidIndex(buf.Bytes(),
func(key types.NeedleId, offset types.Offset, size types.Size) (bool, error) {
resp, err := volumeServerClient.ReadNeedleMeta(context.Background(), &volume_server_pb.ReadNeedleMetaRequest{
@@ -428,7 +443,7 @@ func (c *commandVolumeFsck) collectOneVolumeFileIds(dataNodeId string, volumeId
if err != nil {
return false, fmt.Errorf("read needle meta with id %d from volume %d: %v", key, volumeId, err)
}
- if (modifyFrom == 0 || modifyFrom <= resp.AppendAtNs) && (resp.AppendAtNs <= cutoffFrom) {
+ if (modifyFrom == 0 || modifyFrom <= resp.AppendAtNs) && (cutoffFrom == 0 || resp.AppendAtNs <= cutoffFrom) {
return true, nil
}
return false, nil
diff --git a/weed/shell/command_volume_tier_upload.go b/weed/shell/command_volume_tier_upload.go
index c109d59d8..6932317ab 100644
--- a/weed/shell/command_volume_tier_upload.go
+++ b/weed/shell/command_volume_tier_upload.go
@@ -139,6 +139,12 @@ func uploadDatToRemoteTier(grpcDialOption grpc.DialOption, writer io.Writer, vol
KeepLocalDatFile: keepLocalDatFile,
})
+ if stream == nil && copyErr == nil {
+ // when the volume is already uploaded, VolumeTierMoveDatToRemote will return nil stream and nil error
+ // so we should directly return in this case
+ fmt.Fprintf(writer, "volume %v already uploaded", volumeId)
+ return nil
+ }
var lastProcessed int64
for {
resp, recvErr := stream.Recv()
diff --git a/weed/stats/disk_common.go b/weed/stats/disk_common.go
new file mode 100644
index 000000000..936c77e91
--- /dev/null
+++ b/weed/stats/disk_common.go
@@ -0,0 +1,17 @@
+package stats
+
+import "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
+
+func calculateDiskRemaining(disk *volume_server_pb.DiskStatus) {
+ disk.Used = disk.All - disk.Free
+
+ if disk.All > 0 {
+ disk.PercentFree = float32((float64(disk.Free) / float64(disk.All)) * 100)
+ disk.PercentUsed = float32((float64(disk.Used) / float64(disk.All)) * 100)
+ } else {
+ disk.PercentFree = 0
+ disk.PercentUsed = 0
+ }
+
+ return
+}
diff --git a/weed/stats/disk_notsupported.go b/weed/stats/disk_notsupported.go
index 418164546..956c7bf9f 100644
--- a/weed/stats/disk_notsupported.go
+++ b/weed/stats/disk_notsupported.go
@@ -1,5 +1,5 @@
-//go:build netbsd || plan9 || solaris
-// +build netbsd plan9 solaris
+//go:build netbsd || plan9
+// +build netbsd plan9
package stats
diff --git a/weed/stats/disk_openbsd.go b/weed/stats/disk_openbsd.go
index bb03126bf..1be41e5c7 100644
--- a/weed/stats/disk_openbsd.go
+++ b/weed/stats/disk_openbsd.go
@@ -15,10 +15,10 @@ func fillInDiskStatus(disk *volume_server_pb.DiskStatus) {
if err != nil {
return
}
+
disk.All = fs.F_blocks * uint64(fs.F_bsize)
disk.Free = fs.F_bfree * uint64(fs.F_bsize)
- disk.Used = disk.All - disk.Free
- disk.PercentFree = float32((float64(disk.Free) / float64(disk.All)) * 100)
- disk.PercentUsed = float32((float64(disk.Used) / float64(disk.All)) * 100)
+ calculateDiskRemaining(disk)
+
return
}
diff --git a/weed/stats/disk_solaris.go b/weed/stats/disk_solaris.go
new file mode 100644
index 000000000..cce20e4cc
--- /dev/null
+++ b/weed/stats/disk_solaris.go
@@ -0,0 +1,24 @@
+//go:build solaris
+// +build solaris
+
+package stats
+
+import (
+ "golang.org/x/sys/unix"
+
+ "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
+)
+
+func fillInDiskStatus(disk *volume_server_pb.DiskStatus) {
+ var stat unix.Statvfs_t
+ err := unix.Statvfs(disk.Dir, &stat)
+ if err != nil {
+ return
+ }
+
+ disk.All = stat.Blocks * uint64(stat.Bsize)
+ disk.Free = stat.Bfree * uint64(stat.Bsize)
+ calculateDiskRemaining(disk)
+
+ return
+}
diff --git a/weed/stats/disk_windows.go b/weed/stats/disk_windows.go
index 16e6d3326..93c703134 100644
--- a/weed/stats/disk_windows.go
+++ b/weed/stats/disk_windows.go
@@ -39,9 +39,7 @@ func fillInDiskStatus(disk *volume_server_pb.DiskStatus) {
return
}
- disk.Used = disk.All - disk.Free
- disk.PercentFree = float32((float64(disk.Free) / float64(disk.All)) * 100)
- disk.PercentUsed = float32((float64(disk.Used) / float64(disk.All)) * 100)
+ calculateDiskRemaining(disk)
return
}
diff --git a/weed/storage/needle_map_sorted_file.go b/weed/storage/needle_map_sorted_file.go
index 0433ffa0d..5bd67ea86 100644
--- a/weed/storage/needle_map_sorted_file.go
+++ b/weed/storage/needle_map_sorted_file.go
@@ -27,7 +27,7 @@ func NewSortedFileNeedleMap(indexBaseFileName string, indexFile *os.File) (m *So
}
glog.V(1).Infof("Opening %s...", fileName)
- if m.dbFile, err = os.Open(indexBaseFileName + ".sdx"); err != nil {
+ if m.dbFile, err = os.OpenFile(indexBaseFileName+".sdx", os.O_RDWR, 0); err != nil {
return
}
dbStat, _ := m.dbFile.Stat()