aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-10-13 23:10:49 -0700
committerchrislu <chris.lu@gmail.com>2022-10-13 23:10:49 -0700
commite05637c42cbb7519aa97c3b46a0305872fa6d11a (patch)
treee2ade7d0cac8c386c84f1c8ec41375bd905ee429
parentdcd0743a35912dfa559ae912e5208f15dd186386 (diff)
parenta5b867af69ff7be5e0f0944b2ee4275524d542e9 (diff)
downloadseaweedfs-e05637c42cbb7519aa97c3b46a0305872fa6d11a.tar.xz
seaweedfs-e05637c42cbb7519aa97c3b46a0305872fa6d11a.zip
Merge branch 'master' of https://github.com/seaweedfs/seaweedfs
-rw-r--r--weed/s3api/s3acl/acl_helper.go4
-rw-r--r--weed/s3api/s3acl/acl_helper_test.go66
-rw-r--r--weed/server/volume_grpc_vacuum.go21
-rw-r--r--weed/stats/metrics.go28
-rw-r--r--weed/storage/backend/disk_file.go3
-rw-r--r--weed/storage/disk_location.go2
-rw-r--r--weed/storage/erasure_coding/ec_volume.go1
-rw-r--r--weed/storage/needle_map/memdb.go5
-rw-r--r--weed/storage/volume.go18
-rw-r--r--weed/storage/volume_vacuum.go25
10 files changed, 106 insertions, 67 deletions
diff --git a/weed/s3api/s3acl/acl_helper.go b/weed/s3api/s3acl/acl_helper.go
index e54e67556..bb956368e 100644
--- a/weed/s3api/s3acl/acl_helper.go
+++ b/weed/s3api/s3acl/acl_helper.go
@@ -411,6 +411,8 @@ func AssembleEntryWithAcp(objectEntry *filer_pb.Entry, objectOwner string, grant
if len(objectOwner) > 0 {
objectEntry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(objectOwner)
+ } else {
+ delete(objectEntry.Extended, s3_constants.ExtAmzOwnerKey)
}
if len(grants) > 0 {
@@ -420,6 +422,8 @@ func AssembleEntryWithAcp(objectEntry *filer_pb.Entry, objectOwner string, grant
return s3err.ErrInvalidRequest
}
objectEntry.Extended[s3_constants.ExtAmzAclKey] = grantsBytes
+ } else {
+ delete(objectEntry.Extended, s3_constants.ExtAmzAclKey)
}
return s3err.ErrNone
diff --git a/weed/s3api/s3acl/acl_helper_test.go b/weed/s3api/s3acl/acl_helper_test.go
index efc137989..ce177595b 100644
--- a/weed/s3api/s3acl/acl_helper_test.go
+++ b/weed/s3api/s3acl/acl_helper_test.go
@@ -487,46 +487,44 @@ func TestDetermineReqGrants(t *testing.T) {
func TestAssembleEntryWithAcp(t *testing.T) {
defaultOwner := "admin"
- {
- //case1
- expectOwner := "accountS"
- expectGrants := []*s3.Grant{
- {
- Permission: &s3_constants.PermissionRead,
- Grantee: &s3.Grantee{
- Type: &s3_constants.GrantTypeGroup,
- ID: &s3account.AccountAdmin.Id,
- URI: &s3_constants.GranteeGroupAllUsers,
- },
+
+ //case1
+ //assemble with non-empty grants
+ expectOwner := "accountS"
+ expectGrants := []*s3.Grant{
+ {
+ Permission: &s3_constants.PermissionRead,
+ Grantee: &s3.Grantee{
+ Type: &s3_constants.GrantTypeGroup,
+ ID: &s3account.AccountAdmin.Id,
+ URI: &s3_constants.GranteeGroupAllUsers,
},
- }
- entry := &filer_pb.Entry{}
- AssembleEntryWithAcp(entry, expectOwner, expectGrants)
+ },
+ }
+ entry := &filer_pb.Entry{}
+ AssembleEntryWithAcp(entry, expectOwner, expectGrants)
- resultOwner := GetAcpOwner(entry.Extended, defaultOwner)
- if resultOwner != expectOwner {
- t.Fatalf("owner not expect")
- }
+ resultOwner := GetAcpOwner(entry.Extended, defaultOwner)
+ if resultOwner != expectOwner {
+ t.Fatalf("owner not expect")
+ }
- resultGrants := GetAcpGrants(entry.Extended)
- if !grantsEquals(resultGrants, expectGrants) {
- t.Fatal("grants not expect")
- }
+ resultGrants := GetAcpGrants(entry.Extended)
+ if !grantsEquals(resultGrants, expectGrants) {
+ t.Fatal("grants not expect")
}
- {
- //case2
- entry := &filer_pb.Entry{}
- AssembleEntryWithAcp(entry, "", nil)
- resultOwner := GetAcpOwner(entry.Extended, defaultOwner)
- if resultOwner != defaultOwner {
- t.Fatalf("owner not expect")
- }
+ //case2
+ //assemble with empty grants (override)
+ AssembleEntryWithAcp(entry, "", nil)
+ resultOwner = GetAcpOwner(entry.Extended, defaultOwner)
+ if resultOwner != defaultOwner {
+ t.Fatalf("owner not expect")
+ }
- resultGrants := GetAcpGrants(entry.Extended)
- if len(resultGrants) != 0 {
- t.Fatal("grants not expect")
- }
+ resultGrants = GetAcpGrants(entry.Extended)
+ if len(resultGrants) != 0 {
+ t.Fatal("grants not expect")
}
}
diff --git a/weed/server/volume_grpc_vacuum.go b/weed/server/volume_grpc_vacuum.go
index 5252584e1..296760ba6 100644
--- a/weed/server/volume_grpc_vacuum.go
+++ b/weed/server/volume_grpc_vacuum.go
@@ -2,6 +2,9 @@ package weed_server
import (
"context"
+ "github.com/seaweedfs/seaweedfs/weed/stats"
+ "strconv"
+ "time"
"github.com/prometheus/procfs"
"github.com/seaweedfs/seaweedfs/weed/glog"
@@ -29,6 +32,10 @@ func (vs *VolumeServer) VacuumVolumeCheck(ctx context.Context, req *volume_serve
}
func (vs *VolumeServer) VacuumVolumeCompact(req *volume_server_pb.VacuumVolumeCompactRequest, stream volume_server_pb.VolumeServer_VacuumVolumeCompactServer) error {
+ start := time.Now()
+ defer func(start time.Time) {
+ stats.VolumeServerVacuumingHistogram.WithLabelValues("compact").Observe(time.Since(start).Seconds())
+ }(start)
resp := &volume_server_pb.VacuumVolumeCompactResponse{}
reportInterval := int64(1024 * 1024 * 128)
@@ -51,12 +58,13 @@ func (vs *VolumeServer) VacuumVolumeCompact(req *volume_server_pb.VacuumVolumeCo
return true
})
+ stats.VolumeServerVacuumingCompactCounter.WithLabelValues(strconv.FormatBool(err == nil && sendErr == nil)).Inc()
if err != nil {
- glog.Errorf("compact volume %d: %v", req.VolumeId, err)
+ glog.Errorf("failed compact volume %d: %v", req.VolumeId, err)
return err
}
if sendErr != nil {
- glog.Errorf("compact volume %d report progress: %v", req.VolumeId, sendErr)
+ glog.Errorf("failed compact volume %d report progress: %v", req.VolumeId, sendErr)
return sendErr
}
@@ -66,16 +74,21 @@ func (vs *VolumeServer) VacuumVolumeCompact(req *volume_server_pb.VacuumVolumeCo
}
func (vs *VolumeServer) VacuumVolumeCommit(ctx context.Context, req *volume_server_pb.VacuumVolumeCommitRequest) (*volume_server_pb.VacuumVolumeCommitResponse, error) {
+ start := time.Now()
+ defer func(start time.Time) {
+ stats.VolumeServerVacuumingHistogram.WithLabelValues("commit").Observe(time.Since(start).Seconds())
+ }(start)
resp := &volume_server_pb.VacuumVolumeCommitResponse{}
readOnly, err := vs.store.CommitCompactVolume(needle.VolumeId(req.VolumeId))
if err != nil {
- glog.Errorf("commit volume %d: %v", req.VolumeId, err)
+ glog.Errorf("failed commit volume %d: %v", req.VolumeId, err)
} else {
glog.V(1).Infof("commit volume %d", req.VolumeId)
}
+ stats.VolumeServerVacuumingCommitCounter.WithLabelValues(strconv.FormatBool(err == nil)).Inc()
resp.IsReadOnly = readOnly
return resp, err
@@ -88,7 +101,7 @@ func (vs *VolumeServer) VacuumVolumeCleanup(ctx context.Context, req *volume_ser
err := vs.store.CommitCleanupVolume(needle.VolumeId(req.VolumeId))
if err != nil {
- glog.Errorf("cleanup volume %d: %v", req.VolumeId, err)
+ glog.Errorf("failed cleanup volume %d: %v", req.VolumeId, err)
} else {
glog.V(1).Infof("cleanup volume %d", req.VolumeId)
}
diff --git a/weed/stats/metrics.go b/weed/stats/metrics.go
index d1723fdc6..9f9c0c18d 100644
--- a/weed/stats/metrics.go
+++ b/weed/stats/metrics.go
@@ -137,6 +137,31 @@ var (
Help: "Counter of volume server requests.",
}, []string{"type"})
+ VolumeServerVacuumingCompactCounter = prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Namespace: Namespace,
+ Subsystem: "volumeServer",
+ Name: "vacuuming_compact_count",
+ Help: "Counter of volume vacuuming Compact counter",
+ }, []string{"success"})
+
+ VolumeServerVacuumingCommitCounter = prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Namespace: Namespace,
+ Subsystem: "volumeServer",
+ Name: "vacuuming_commit_count",
+ Help: "Counter of volume vacuuming commit counter",
+ }, []string{"success"})
+
+ VolumeServerVacuumingHistogram = prometheus.NewHistogramVec(
+ prometheus.HistogramOpts{
+ Namespace: Namespace,
+ Subsystem: "volumeServer",
+ Name: "vacuuming_seconds",
+ Help: "Bucketed histogram of volume server vacuuming processing time.",
+ Buckets: prometheus.ExponentialBuckets(0.0001, 2, 24),
+ }, []string{"type"})
+
VolumeServerRequestHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: Namespace,
@@ -223,6 +248,9 @@ func init() {
Gather.MustRegister(VolumeServerRequestCounter)
Gather.MustRegister(VolumeServerRequestHistogram)
+ Gather.MustRegister(VolumeServerVacuumingCompactCounter)
+ Gather.MustRegister(VolumeServerVacuumingCommitCounter)
+ Gather.MustRegister(VolumeServerVacuumingHistogram)
Gather.MustRegister(VolumeServerVolumeCounter)
Gather.MustRegister(VolumeServerMaxVolumeCounter)
Gather.MustRegister(VolumeServerReadOnlyVolumeGauge)
diff --git a/weed/storage/backend/disk_file.go b/weed/storage/backend/disk_file.go
index 7a3a40977..18dde8dca 100644
--- a/weed/storage/backend/disk_file.go
+++ b/weed/storage/backend/disk_file.go
@@ -69,6 +69,9 @@ func (df *DiskFile) Truncate(off int64) error {
}
func (df *DiskFile) Close() error {
+ if err := df.Sync(); err != nil {
+ return err
+ }
return df.File.Close()
}
diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go
index 6f938da8f..b3be04703 100644
--- a/weed/storage/disk_location.go
+++ b/weed/storage/disk_location.go
@@ -364,7 +364,7 @@ func (l *DiskLocation) VolumesLen() int {
func (l *DiskLocation) SetStopping() {
l.volumesLock.Lock()
for _, v := range l.volumes {
- v.SetStopping()
+ v.SyncToDisk()
}
l.volumesLock.Unlock()
diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go
index aa1e15722..ddee742a8 100644
--- a/weed/storage/erasure_coding/ec_volume.go
+++ b/weed/storage/erasure_coding/ec_volume.go
@@ -125,6 +125,7 @@ func (ev *EcVolume) Close() {
ev.ecjFileAccessLock.Unlock()
}
if ev.ecxFile != nil {
+ _ = ev.ecxFile.Sync()
_ = ev.ecxFile.Close()
ev.ecxFile = nil
}
diff --git a/weed/storage/needle_map/memdb.go b/weed/storage/needle_map/memdb.go
index 7fb98dcea..463245cd1 100644
--- a/weed/storage/needle_map/memdb.go
+++ b/weed/storage/needle_map/memdb.go
@@ -86,7 +86,10 @@ func (cm *MemDb) SaveToIdx(idxName string) (ret error) {
if err != nil {
return
}
- defer idxFile.Close()
+ defer func() {
+ idxFile.Sync()
+ idxFile.Close()
+ }()
return cm.AscendingVisit(func(value NeedleValue) error {
if value.Offset.IsZero() || value.Size.IsDeleted() {
diff --git a/weed/storage/volume.go b/weed/storage/volume.go
index 1a9c8bd24..ab8af91e2 100644
--- a/weed/storage/volume.go
+++ b/weed/storage/volume.go
@@ -180,21 +180,6 @@ func (v *Volume) DiskType() types.DiskType {
return v.location.DiskType
}
-func (v *Volume) SetStopping() {
- v.dataFileAccessLock.Lock()
- defer v.dataFileAccessLock.Unlock()
- if v.nm != nil {
- if err := v.nm.Sync(); err != nil {
- glog.Warningf("Volume SetStopping fail to sync volume idx %d", v.Id)
- }
- }
- if v.DataBackend != nil {
- if err := v.DataBackend.Sync(); err != nil {
- glog.Warningf("Volume SetStopping fail to sync volume %d", v.Id)
- }
- }
-}
-
func (v *Volume) SyncToDisk() {
v.dataFileAccessLock.Lock()
defer v.dataFileAccessLock.Unlock()
@@ -228,10 +213,9 @@ func (v *Volume) Close() {
v.nm = nil
}
if v.DataBackend != nil {
- if err := v.DataBackend.Sync(); err != nil {
+ if err := v.DataBackend.Close(); err != nil {
glog.Warningf("Volume Close fail to sync volume %d", v.Id)
}
- _ = v.DataBackend.Close()
v.DataBackend = nil
stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "volume").Dec()
}
diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go
index 47b0800eb..0eaca5ff4 100644
--- a/weed/storage/volume_vacuum.go
+++ b/weed/storage/volume_vacuum.go
@@ -55,10 +55,10 @@ func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error
v.lastCompactRevision = v.SuperBlock.CompactionRevision
glog.V(3).Infof("creating copies for volume %d ,last offset %d...", v.Id, v.lastCompactIndexOffset)
if err := v.DataBackend.Sync(); err != nil {
- glog.V(0).Infof("compact fail to sync volume %d", v.Id)
+ glog.V(0).Infof("compact failed to sync volume %d", v.Id)
}
if err := v.nm.Sync(); err != nil {
- glog.V(0).Infof("compact fail to sync volume idx %d", v.Id)
+ glog.V(0).Infof("compact failed to sync volume idx %d", v.Id)
}
return v.copyDataAndGenerateIndexFile(v.FileName(".cpd"), v.FileName(".cpx"), preallocate, compactionBytePerSecond)
}
@@ -83,10 +83,10 @@ func (v *Volume) Compact2(preallocate int64, compactionBytePerSecond int64, prog
return fmt.Errorf("volume %d backend is empty remote:%v", v.Id, v.HasRemoteFile())
}
if err := v.DataBackend.Sync(); err != nil {
- glog.V(0).Infof("compact2 fail to sync volume dat %d: %v", v.Id, err)
+ glog.V(0).Infof("compact2 failed to sync volume dat %d: %v", v.Id, err)
}
if err := v.nm.Sync(); err != nil {
- glog.V(0).Infof("compact2 fail to sync volume idx %d: %v", v.Id, err)
+ glog.V(0).Infof("compact2 failed to sync volume idx %d: %v", v.Id, err)
}
return v.copyDataBasedOnIndexFile(
v.FileName(".dat"), v.FileName(".idx"),
@@ -120,7 +120,7 @@ func (v *Volume) CommitCompact() error {
}
if v.DataBackend != nil {
if err := v.DataBackend.Close(); err != nil {
- glog.V(0).Infof("fail to close volume %d", v.Id)
+ glog.V(0).Infof("failed to close volume %d", v.Id)
}
}
v.DataBackend = nil
@@ -270,7 +270,11 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
return fmt.Errorf("open idx file %s failed: %v", newIdxFileName, err)
}
- defer idx.Close()
+ defer func() {
+ idx.Sync()
+ idx.Close()
+ }()
+
stat, err := idx.Stat()
if err != nil {
return fmt.Errorf("stat file %s: %v", idx.Name(), err)
@@ -387,9 +391,7 @@ func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset in
}
func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, preallocate int64, compactionBytePerSecond int64) (err error) {
- var (
- dst backend.BackendStorageFile
- )
+ var dst backend.BackendStorageFile
if dst, err = backend.CreateVolumeFile(dstName, preallocate, 0); err != nil {
return err
}
@@ -493,7 +495,10 @@ func (v *Volume) copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, da
glog.Errorf("cannot open Volume Index %s: %v", datIdxName, err)
return err
}
- defer indexFile.Close()
+ defer func() {
+ indexFile.Sync()
+ indexFile.Close()
+ }()
if v.tmpNm != nil {
v.tmpNm.Close()
v.tmpNm = nil