aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
Diffstat (limited to 'weed')
-rw-r--r--weed/filer/meta_aggregator.go2
-rw-r--r--weed/pb/filer_pb/filer.pb.go4
-rw-r--r--weed/pb/filer_pb/filer_grpc.pb.go1
-rw-r--r--weed/pb/iam_pb/iam.pb.go4
-rw-r--r--weed/pb/iam_pb/iam_grpc.pb.go1
-rw-r--r--weed/pb/master_pb/master.pb.go4
-rw-r--r--weed/pb/master_pb/master_grpc.pb.go1
-rw-r--r--weed/pb/mount_pb/mount.pb.go4
-rw-r--r--weed/pb/mount_pb/mount_grpc.pb.go1
-rw-r--r--weed/pb/mq_pb/mq.pb.go4
-rw-r--r--weed/pb/mq_pb/mq_grpc.pb.go1
-rw-r--r--weed/pb/remote_pb/remote.pb.go4
-rw-r--r--weed/pb/s3_pb/s3.pb.go4
-rw-r--r--weed/pb/s3_pb/s3_grpc.pb.go1
-rw-r--r--weed/pb/volume_server.proto1
-rw-r--r--weed/pb/volume_server_pb/volume_server.pb.go20
-rw-r--r--weed/pb/volume_server_pb/volume_server_grpc.pb.go1
-rw-r--r--weed/server/master_server.go62
-rw-r--r--weed/server/volume_grpc_vacuum.go11
-rw-r--r--weed/storage/volume_vacuum.go10
-rw-r--r--weed/topology/topology_vacuum.go6
21 files changed, 66 insertions, 81 deletions
diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go
index e684e9bd8..b48ef01c6 100644
--- a/weed/filer/meta_aggregator.go
+++ b/weed/filer/meta_aggregator.go
@@ -102,7 +102,7 @@ func (ma *MetaAggregator) loopSubscribeToOneFiler(f *Filer, self pb.ServerAddres
if err != nil {
errLvl := glog.Level(0)
if strings.Contains(err.Error(), "duplicated local subscription detected") {
- errLvl = glog.Level(1)
+ errLvl = glog.Level(4)
}
glog.V(errLvl).Infof("subscribing remote %s meta change: %v", peer, err)
}
diff --git a/weed/pb/filer_pb/filer.pb.go b/weed/pb/filer_pb/filer.pb.go
index beefbca10..95f400a92 100644
--- a/weed/pb/filer_pb/filer.pb.go
+++ b/weed/pb/filer_pb/filer.pb.go
@@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
-// protoc-gen-go v1.26.0
-// protoc v3.17.3
+// protoc-gen-go v1.28.0
+// protoc v3.21.4
// source: filer.proto
package filer_pb
diff --git a/weed/pb/filer_pb/filer_grpc.pb.go b/weed/pb/filer_pb/filer_grpc.pb.go
index 270e13e6f..ea918ab2a 100644
--- a/weed/pb/filer_pb/filer_grpc.pb.go
+++ b/weed/pb/filer_pb/filer_grpc.pb.go
@@ -11,7 +11,6 @@ import (
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
-// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
// SeaweedFilerClient is the client API for SeaweedFiler service.
diff --git a/weed/pb/iam_pb/iam.pb.go b/weed/pb/iam_pb/iam.pb.go
index 9f0c2184d..775fcc61c 100644
--- a/weed/pb/iam_pb/iam.pb.go
+++ b/weed/pb/iam_pb/iam.pb.go
@@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
-// protoc-gen-go v1.26.0
-// protoc v3.17.3
+// protoc-gen-go v1.28.0
+// protoc v3.21.4
// source: iam.proto
package iam_pb
diff --git a/weed/pb/iam_pb/iam_grpc.pb.go b/weed/pb/iam_pb/iam_grpc.pb.go
index b9438a295..3688cf423 100644
--- a/weed/pb/iam_pb/iam_grpc.pb.go
+++ b/weed/pb/iam_pb/iam_grpc.pb.go
@@ -8,7 +8,6 @@ import (
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
-// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
// SeaweedIdentityAccessManagementClient is the client API for SeaweedIdentityAccessManagement service.
diff --git a/weed/pb/master_pb/master.pb.go b/weed/pb/master_pb/master.pb.go
index d865bb1d7..683296fff 100644
--- a/weed/pb/master_pb/master.pb.go
+++ b/weed/pb/master_pb/master.pb.go
@@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
-// protoc-gen-go v1.26.0
-// protoc v3.17.3
+// protoc-gen-go v1.28.0
+// protoc v3.21.4
// source: master.proto
package master_pb
diff --git a/weed/pb/master_pb/master_grpc.pb.go b/weed/pb/master_pb/master_grpc.pb.go
index f8b11e8c5..ac6ff545d 100644
--- a/weed/pb/master_pb/master_grpc.pb.go
+++ b/weed/pb/master_pb/master_grpc.pb.go
@@ -11,7 +11,6 @@ import (
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
-// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
// SeaweedClient is the client API for Seaweed service.
diff --git a/weed/pb/mount_pb/mount.pb.go b/weed/pb/mount_pb/mount.pb.go
index 55684e782..826aa4fdd 100644
--- a/weed/pb/mount_pb/mount.pb.go
+++ b/weed/pb/mount_pb/mount.pb.go
@@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
-// protoc-gen-go v1.26.0
-// protoc v3.17.3
+// protoc-gen-go v1.28.0
+// protoc v3.21.4
// source: mount.proto
package mount_pb
diff --git a/weed/pb/mount_pb/mount_grpc.pb.go b/weed/pb/mount_pb/mount_grpc.pb.go
index 41737aa21..0dba35218 100644
--- a/weed/pb/mount_pb/mount_grpc.pb.go
+++ b/weed/pb/mount_pb/mount_grpc.pb.go
@@ -11,7 +11,6 @@ import (
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
-// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
// SeaweedMountClient is the client API for SeaweedMount service.
diff --git a/weed/pb/mq_pb/mq.pb.go b/weed/pb/mq_pb/mq.pb.go
index 1f837d7bf..a9a3cb890 100644
--- a/weed/pb/mq_pb/mq.pb.go
+++ b/weed/pb/mq_pb/mq.pb.go
@@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
-// protoc-gen-go v1.26.0
-// protoc v3.17.3
+// protoc-gen-go v1.28.0
+// protoc v3.21.4
// source: mq.proto
package mq_pb
diff --git a/weed/pb/mq_pb/mq_grpc.pb.go b/weed/pb/mq_pb/mq_grpc.pb.go
index 55b2b4fc2..4793f060f 100644
--- a/weed/pb/mq_pb/mq_grpc.pb.go
+++ b/weed/pb/mq_pb/mq_grpc.pb.go
@@ -11,7 +11,6 @@ import (
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
-// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
// SeaweedMessagingClient is the client API for SeaweedMessaging service.
diff --git a/weed/pb/remote_pb/remote.pb.go b/weed/pb/remote_pb/remote.pb.go
index fbe77a4ac..33f295657 100644
--- a/weed/pb/remote_pb/remote.pb.go
+++ b/weed/pb/remote_pb/remote.pb.go
@@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
-// protoc-gen-go v1.26.0
-// protoc v3.17.3
+// protoc-gen-go v1.28.0
+// protoc v3.21.4
// source: remote.proto
package remote_pb
diff --git a/weed/pb/s3_pb/s3.pb.go b/weed/pb/s3_pb/s3.pb.go
index 640d05853..13d996adb 100644
--- a/weed/pb/s3_pb/s3.pb.go
+++ b/weed/pb/s3_pb/s3.pb.go
@@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
-// protoc-gen-go v1.26.0
-// protoc v3.17.3
+// protoc-gen-go v1.28.0
+// protoc v3.21.4
// source: s3.proto
package s3_pb
diff --git a/weed/pb/s3_pb/s3_grpc.pb.go b/weed/pb/s3_pb/s3_grpc.pb.go
index 1bc956be6..7808026e0 100644
--- a/weed/pb/s3_pb/s3_grpc.pb.go
+++ b/weed/pb/s3_pb/s3_grpc.pb.go
@@ -11,7 +11,6 @@ import (
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
-// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
// SeaweedS3Client is the client API for SeaweedS3 service.
diff --git a/weed/pb/volume_server.proto b/weed/pb/volume_server.proto
index bd741d134..7953e772d 100644
--- a/weed/pb/volume_server.proto
+++ b/weed/pb/volume_server.proto
@@ -147,6 +147,7 @@ message VacuumVolumeCompactRequest {
}
message VacuumVolumeCompactResponse {
int64 processed_bytes = 1;
+ float load_avg_1m = 2;
}
message VacuumVolumeCommitRequest {
diff --git a/weed/pb/volume_server_pb/volume_server.pb.go b/weed/pb/volume_server_pb/volume_server.pb.go
index 2bfe69453..1107f71d9 100644
--- a/weed/pb/volume_server_pb/volume_server.pb.go
+++ b/weed/pb/volume_server_pb/volume_server.pb.go
@@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
-// protoc-gen-go v1.26.0
-// protoc v3.17.3
+// protoc-gen-go v1.28.0
+// protoc v3.21.4
// source: volume_server.proto
package volume_server_pb
@@ -394,7 +394,8 @@ type VacuumVolumeCompactResponse struct {
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
- ProcessedBytes int64 `protobuf:"varint,1,opt,name=processed_bytes,json=processedBytes,proto3" json:"processed_bytes,omitempty"`
+ ProcessedBytes int64 `protobuf:"varint,1,opt,name=processed_bytes,json=processedBytes,proto3" json:"processed_bytes,omitempty"`
+ LoadAvg_1M float32 `protobuf:"fixed32,2,opt,name=load_avg_1m,json=loadAvg1m,proto3" json:"load_avg_1m,omitempty"`
}
func (x *VacuumVolumeCompactResponse) Reset() {
@@ -436,6 +437,13 @@ func (x *VacuumVolumeCompactResponse) GetProcessedBytes() int64 {
return 0
}
+func (x *VacuumVolumeCompactResponse) GetLoadAvg_1M() float32 {
+ if x != nil {
+ return x.LoadAvg_1M
+ }
+ return 0
+}
+
type VacuumVolumeCommitRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
@@ -5611,12 +5619,14 @@ var file_volume_server_proto_rawDesc = []byte{
0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08,
0x76, 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x49, 0x64, 0x12, 0x20, 0x0a, 0x0b, 0x70, 0x72, 0x65, 0x61,
0x6c, 0x6c, 0x6f, 0x63, 0x61, 0x74, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0b, 0x70,
- 0x72, 0x65, 0x61, 0x6c, 0x6c, 0x6f, 0x63, 0x61, 0x74, 0x65, 0x22, 0x46, 0x0a, 0x1b, 0x56, 0x61,
+ 0x72, 0x65, 0x61, 0x6c, 0x6c, 0x6f, 0x63, 0x61, 0x74, 0x65, 0x22, 0x66, 0x0a, 0x1b, 0x56, 0x61,
0x63, 0x75, 0x75, 0x6d, 0x56, 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63,
0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x27, 0x0a, 0x0f, 0x70, 0x72, 0x6f,
0x63, 0x65, 0x73, 0x73, 0x65, 0x64, 0x5f, 0x62, 0x79, 0x74, 0x65, 0x73, 0x18, 0x01, 0x20, 0x01,
0x28, 0x03, 0x52, 0x0e, 0x70, 0x72, 0x6f, 0x63, 0x65, 0x73, 0x73, 0x65, 0x64, 0x42, 0x79, 0x74,
- 0x65, 0x73, 0x22, 0x38, 0x0a, 0x19, 0x56, 0x61, 0x63, 0x75, 0x75, 0x6d, 0x56, 0x6f, 0x6c, 0x75,
+ 0x65, 0x73, 0x12, 0x1e, 0x0a, 0x0b, 0x6c, 0x6f, 0x61, 0x64, 0x5f, 0x61, 0x76, 0x67, 0x5f, 0x31,
+ 0x6d, 0x18, 0x02, 0x20, 0x01, 0x28, 0x02, 0x52, 0x09, 0x6c, 0x6f, 0x61, 0x64, 0x41, 0x76, 0x67,
+ 0x31, 0x6d, 0x22, 0x38, 0x0a, 0x19, 0x56, 0x61, 0x63, 0x75, 0x75, 0x6d, 0x56, 0x6f, 0x6c, 0x75,
0x6d, 0x65, 0x43, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12,
0x1b, 0x0a, 0x09, 0x76, 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01,
0x28, 0x0d, 0x52, 0x08, 0x76, 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x49, 0x64, 0x22, 0x3e, 0x0a, 0x1a,
diff --git a/weed/pb/volume_server_pb/volume_server_grpc.pb.go b/weed/pb/volume_server_pb/volume_server_grpc.pb.go
index e1b162457..4025c9b63 100644
--- a/weed/pb/volume_server_pb/volume_server_grpc.pb.go
+++ b/weed/pb/volume_server_pb/volume_server_grpc.pb.go
@@ -11,7 +11,6 @@ import (
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
-// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
// VolumeServerClient is the client API for VolumeServer service.
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index 45981bbb4..57103f166 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -1,7 +1,6 @@
package weed_server
import (
- "context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/stats"
"net/http"
@@ -32,9 +31,8 @@ import (
)
const (
- SequencerType = "master.sequencer.type"
- SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id"
- RaftServerRemovalTime = 72 * time.Minute
+ SequencerType = "master.sequencer.type"
+ SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id"
)
type MasterOption struct {
@@ -65,9 +63,6 @@ type MasterServer struct {
boundedLeaderChan chan int
- onPeerUpdateDoneCn chan string
- onPeerUpdateDoneCnExist bool
-
// notifying clients
clientChansLock sync.RWMutex
clientChans map[string]chan *master_pb.KeepConnectedResponse
@@ -118,7 +113,6 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se
Cluster: cluster.NewCluster(),
}
ms.boundedLeaderChan = make(chan int, 16)
- ms.onPeerUpdateDoneCn = make(chan string)
ms.MasterClient.OnPeerUpdate = ms.OnPeerUpdate
@@ -351,50 +345,18 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startF
peerAddress := pb.ServerAddress(update.Address)
peerName := string(peerAddress)
isLeader := ms.Topo.HashicorpRaft.State() == hashicorpRaft.Leader
- if update.IsAdd {
- if isLeader {
- raftServerFound := false
- for _, server := range ms.Topo.HashicorpRaft.GetConfiguration().Configuration().Servers {
- if string(server.ID) == peerName {
- raftServerFound = true
- }
- }
- if !raftServerFound {
- glog.V(0).Infof("adding new raft server: %s", peerName)
- ms.Topo.HashicorpRaft.AddVoter(
- hashicorpRaft.ServerID(peerName),
- hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0)
+ if update.IsAdd && isLeader {
+ raftServerFound := false
+ for _, server := range ms.Topo.HashicorpRaft.GetConfiguration().Configuration().Servers {
+ if string(server.ID) == peerName {
+ raftServerFound = true
}
}
- if ms.onPeerUpdateDoneCnExist {
- ms.onPeerUpdateDoneCn <- peerName
+ if !raftServerFound {
+ glog.V(0).Infof("adding new raft server: %s", peerName)
+ ms.Topo.HashicorpRaft.AddVoter(
+ hashicorpRaft.ServerID(peerName),
+ hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0)
}
- } else if isLeader {
- go func(peerName string) {
- raftServerRemovalTimeAfter := time.After(RaftServerRemovalTime)
- for {
- select {
- case <-raftServerRemovalTimeAfter:
- err := ms.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
- _, err := client.RaftRemoveServer(context.Background(), &master_pb.RaftRemoveServerRequest{
- Id: peerName,
- Force: false,
- })
- return err
- })
- if err != nil {
- glog.Warningf("failed to removing old raft server %s: %v", peerName, err)
- }
- glog.V(0).Infof("old raft server %s removed", peerName)
- return
- case peerDone := <-ms.onPeerUpdateDoneCn:
- if peerName == peerDone {
- glog.V(0).Infof("raft server %s remove canceled", peerName)
- return
- }
- }
- }
- }(peerName)
- ms.onPeerUpdateDoneCnExist = true
}
}
diff --git a/weed/server/volume_grpc_vacuum.go b/weed/server/volume_grpc_vacuum.go
index f11764f3a..5252584e1 100644
--- a/weed/server/volume_grpc_vacuum.go
+++ b/weed/server/volume_grpc_vacuum.go
@@ -3,11 +3,15 @@ package weed_server
import (
"context"
+ "github.com/prometheus/procfs"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
+ "runtime"
)
+var numCPU = runtime.NumCPU()
+
func (vs *VolumeServer) VacuumVolumeCheck(ctx context.Context, req *volume_server_pb.VacuumVolumeCheckRequest) (*volume_server_pb.VacuumVolumeCheckResponse, error) {
resp := &volume_server_pb.VacuumVolumeCheckResponse{}
@@ -29,11 +33,16 @@ func (vs *VolumeServer) VacuumVolumeCompact(req *volume_server_pb.VacuumVolumeCo
resp := &volume_server_pb.VacuumVolumeCompactResponse{}
reportInterval := int64(1024 * 1024 * 128)
nextReportTarget := reportInterval
-
+ fs, fsErr := procfs.NewDefaultFS()
var sendErr error
err := vs.store.CompactVolume(needle.VolumeId(req.VolumeId), req.Preallocate, vs.compactionBytePerSecond, func(processed int64) bool {
if processed > nextReportTarget {
resp.ProcessedBytes = processed
+ if fsErr == nil && numCPU > 0 {
+ if fsLa, err := fs.LoadAvg(); err == nil {
+ resp.LoadAvg_1M = float32(fsLa.Load1 / float64(numCPU))
+ }
+ }
if sendErr = stream.Send(resp); sendErr != nil {
return false
}
diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go
index 927885e17..ba00666bc 100644
--- a/weed/storage/volume_vacuum.go
+++ b/weed/storage/volume_vacuum.go
@@ -88,7 +88,15 @@ func (v *Volume) Compact2(preallocate int64, compactionBytePerSecond int64, prog
if err := v.nm.Sync(); err != nil {
glog.V(0).Infof("compact2 fail to sync volume idx %d: %v", v.Id, err)
}
- return copyDataBasedOnIndexFile(v.FileName(".dat"), v.FileName(".idx"), v.FileName(".cpd"), v.FileName(".cpx"), v.SuperBlock, v.Version(), preallocate, compactionBytePerSecond, progressFn)
+ return copyDataBasedOnIndexFile(
+ v.FileName(".dat"), v.FileName(".idx"),
+ v.FileName(".cpd"), v.FileName(".cpx"),
+ v.SuperBlock,
+ v.Version(),
+ preallocate,
+ compactionBytePerSecond,
+ progressFn,
+ )
}
func (v *Volume) CommitCompact() error {
diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go
index 918bda902..717dc8f76 100644
--- a/weed/topology/topology_vacuum.go
+++ b/weed/topology/topology_vacuum.go
@@ -89,7 +89,8 @@ func (t *Topology) batchVacuumVolumeCompact(grpcDialOption grpc.DialOption, vl *
return recvErr
}
}
- glog.V(0).Infof("%d vacuum %d on %s processed %d bytes", index, vid, url, resp.ProcessedBytes)
+ glog.V(0).Infof("%d vacuum %d on %s processed %d bytes, loadAvg %.02f%%",
+ index, vid, url, resp.ProcessedBytes, resp.LoadAvg_1M*100)
}
return nil
})
@@ -217,7 +218,8 @@ func (t *Topology) vacuumOneVolumeLayout(grpcDialOption grpc.DialOption, volumeL
}
glog.V(2).Infof("check vacuum on collection:%s volume:%d", c.Name, vid)
- if vacuumLocationList, needVacuum := t.batchVacuumVolumeCheck(grpcDialOption, vid, locationList, garbageThreshold); needVacuum {
+ if vacuumLocationList, needVacuum := t.batchVacuumVolumeCheck(
+ grpcDialOption, vid, locationList, garbageThreshold); needVacuum {
if t.batchVacuumVolumeCompact(grpcDialOption, volumeLayout, vid, vacuumLocationList, preallocate) {
t.batchVacuumVolumeCommit(grpcDialOption, volumeLayout, vid, vacuumLocationList)
} else {