aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
Diffstat (limited to 'weed')
-rw-r--r--weed/filer/entry.go7
-rw-r--r--weed/filer/entry_codec.go77
-rw-r--r--weed/filer/filer_conf.go65
-rw-r--r--weed/filer/filer_conf_test.go81
-rw-r--r--weed/filer/filerstore_wrapper.go12
-rw-r--r--weed/filer/foundationdb/foundationdb_store.go21
-rw-r--r--weed/pb/grpc_client_server.go24
-rw-r--r--weed/server/filer_server_handlers_write.go8
-rw-r--r--weed/shell/command_s3_bucket_quota_check.go10
-rw-r--r--weed/util/log_buffer/log_buffer.go24
10 files changed, 298 insertions, 31 deletions
diff --git a/weed/filer/entry.go b/weed/filer/entry.go
index 4757d5c9e..25fc26feb 100644
--- a/weed/filer/entry.go
+++ b/weed/filer/entry.go
@@ -92,7 +92,12 @@ func (entry *Entry) ToExistingProtoEntry(message *filer_pb.Entry) {
return
}
message.IsDirectory = entry.IsDirectory()
- message.Attributes = EntryAttributeToPb(entry)
+ // Reuse pre-allocated attributes if available, otherwise allocate
+ if message.Attributes != nil {
+ EntryAttributeToExistingPb(entry, message.Attributes)
+ } else {
+ message.Attributes = EntryAttributeToPb(entry)
+ }
message.Chunks = entry.GetChunks()
message.Extended = entry.Extended
message.HardLinkId = entry.HardLinkId
diff --git a/weed/filer/entry_codec.go b/weed/filer/entry_codec.go
index ce9c0484b..1c096c911 100644
--- a/weed/filer/entry_codec.go
+++ b/weed/filer/entry_codec.go
@@ -4,6 +4,7 @@ import (
"bytes"
"fmt"
"os"
+ "sync"
"time"
"google.golang.org/protobuf/proto"
@@ -11,15 +12,61 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
)
+// pbEntryPool reduces allocations in EncodeAttributesAndChunks and DecodeAttributesAndChunks
+// which are called on every filer store operation
+var pbEntryPool = sync.Pool{
+ New: func() interface{} {
+ return &filer_pb.Entry{
+ Attributes: &filer_pb.FuseAttributes{}, // Pre-allocate attributes
+ }
+ },
+}
+
+// resetPbEntry clears a protobuf Entry for reuse
+func resetPbEntry(e *filer_pb.Entry) {
+ // Use struct assignment to clear all fields including protobuf internal fields
+ // (unknownFields, sizeCache) that field-by-field reset would miss
+ attrs := e.Attributes
+ *e = filer_pb.Entry{}
+ if attrs == nil {
+ attrs = &filer_pb.FuseAttributes{}
+ } else {
+ resetFuseAttributes(attrs)
+ }
+ e.Attributes = attrs
+}
+
+// resetFuseAttributes clears FuseAttributes for reuse
+func resetFuseAttributes(a *filer_pb.FuseAttributes) {
+ // Use struct assignment to clear all fields including protobuf internal fields
+ *a = filer_pb.FuseAttributes{}
+}
+
func (entry *Entry) EncodeAttributesAndChunks() ([]byte, error) {
- message := &filer_pb.Entry{}
+ message := pbEntryPool.Get().(*filer_pb.Entry)
+ defer func() {
+ resetPbEntry(message)
+ pbEntryPool.Put(message)
+ }()
+
entry.ToExistingProtoEntry(message)
- return proto.Marshal(message)
+
+ data, err := proto.Marshal(message)
+ if err != nil {
+ return nil, err
+ }
+
+ // Copy the data to a new slice since proto.Marshal may return a slice
+ // that shares memory with the message (not guaranteed to be a copy)
+ return append([]byte(nil), data...), nil
}
func (entry *Entry) DecodeAttributesAndChunks(blob []byte) error {
-
- message := &filer_pb.Entry{}
+ message := pbEntryPool.Get().(*filer_pb.Entry)
+ defer func() {
+ resetPbEntry(message)
+ pbEntryPool.Put(message)
+ }()
if err := proto.Unmarshal(blob, message); err != nil {
return fmt.Errorf("decoding value blob for %s: %v", entry.FullPath, err)
@@ -50,6 +97,28 @@ func EntryAttributeToPb(entry *Entry) *filer_pb.FuseAttributes {
}
}
+// EntryAttributeToExistingPb fills an existing FuseAttributes to avoid allocation.
+// Safe to call with nil attr (will return early without populating).
+func EntryAttributeToExistingPb(entry *Entry, attr *filer_pb.FuseAttributes) {
+ if attr == nil {
+ return
+ }
+ attr.Crtime = entry.Attr.Crtime.Unix()
+ attr.Mtime = entry.Attr.Mtime.Unix()
+ attr.FileMode = uint32(entry.Attr.Mode)
+ attr.Uid = entry.Uid
+ attr.Gid = entry.Gid
+ attr.Mime = entry.Mime
+ attr.TtlSec = entry.Attr.TtlSec
+ attr.UserName = entry.Attr.UserName
+ attr.GroupName = entry.Attr.GroupNames
+ attr.SymlinkTarget = entry.Attr.SymlinkTarget
+ attr.Md5 = entry.Attr.Md5
+ attr.FileSize = entry.Attr.FileSize
+ attr.Rdev = entry.Attr.Rdev
+ attr.Inode = entry.Attr.Inode
+}
+
func PbToEntryAttribute(attr *filer_pb.FuseAttributes) Attr {
t := Attr{}
diff --git a/weed/filer/filer_conf.go b/weed/filer/filer_conf.go
index 869b3b93d..b5219df20 100644
--- a/weed/filer/filer_conf.go
+++ b/weed/filer/filer_conf.go
@@ -160,18 +160,79 @@ func (fc *FilerConf) DeleteLocationConf(locationPrefix string) {
return true
})
fc.rules = rules
- return
}
+// emptyPathConf is a singleton for paths with no matching rules
+// Callers must NOT mutate the returned value
+var emptyPathConf = &filer_pb.FilerConf_PathConf{}
+
func (fc *FilerConf) MatchStorageRule(path string) (pathConf *filer_pb.FilerConf_PathConf) {
+ // Convert once to avoid allocation in multi-match case
+ pathBytes := []byte(path)
+
+ // Fast path: check if any rules match before allocating
+ // This avoids allocation for paths with no configured rules (common case)
+ var firstMatch *filer_pb.FilerConf_PathConf
+ matchCount := 0
+
+ fc.rules.MatchPrefix(pathBytes, func(key []byte, value *filer_pb.FilerConf_PathConf) bool {
+ matchCount++
+ if matchCount == 1 {
+ firstMatch = value
+ return true // continue to check for more matches
+ }
+ // Stop after 2 matches - we only need to know if there are multiple
+ return false
+ })
+
+ // No rules match - return singleton (callers must NOT mutate)
+ if matchCount == 0 {
+ return emptyPathConf
+ }
+
+ // Single rule matches - return directly (callers must NOT mutate)
+ if matchCount == 1 {
+ return firstMatch
+ }
+
+ // Multiple rules match - need to merge (allocate new)
pathConf = &filer_pb.FilerConf_PathConf{}
- fc.rules.MatchPrefix([]byte(path), func(key []byte, value *filer_pb.FilerConf_PathConf) bool {
+ fc.rules.MatchPrefix(pathBytes, func(key []byte, value *filer_pb.FilerConf_PathConf) bool {
mergePathConf(pathConf, value)
return true
})
return pathConf
}
+// ClonePathConf creates a mutable copy of an existing PathConf.
+// Use this when you need to modify a config (e.g., before calling SetLocationConf).
+//
+// IMPORTANT: Keep in sync with filer_pb.FilerConf_PathConf fields.
+// When adding new fields to the protobuf, update this function accordingly.
+func ClonePathConf(src *filer_pb.FilerConf_PathConf) *filer_pb.FilerConf_PathConf {
+ if src == nil {
+ return &filer_pb.FilerConf_PathConf{}
+ }
+ return &filer_pb.FilerConf_PathConf{
+ LocationPrefix: src.LocationPrefix,
+ Collection: src.Collection,
+ Replication: src.Replication,
+ Ttl: src.Ttl,
+ DiskType: src.DiskType,
+ Fsync: src.Fsync,
+ VolumeGrowthCount: src.VolumeGrowthCount,
+ ReadOnly: src.ReadOnly,
+ MaxFileNameLength: src.MaxFileNameLength,
+ DataCenter: src.DataCenter,
+ Rack: src.Rack,
+ DataNode: src.DataNode,
+ DisableChunkDeletion: src.DisableChunkDeletion,
+ Worm: src.Worm,
+ WormGracePeriodSeconds: src.WormGracePeriodSeconds,
+ WormRetentionTimeSeconds: src.WormRetentionTimeSeconds,
+ }
+}
+
func (fc *FilerConf) GetCollectionTtls(collection string) (ttls map[string]string) {
ttls = make(map[string]string)
fc.rules.Walk(func(key []byte, value *filer_pb.FilerConf_PathConf) bool {
diff --git a/weed/filer/filer_conf_test.go b/weed/filer/filer_conf_test.go
index 02615b814..121ea7e18 100644
--- a/weed/filer/filer_conf_test.go
+++ b/weed/filer/filer_conf_test.go
@@ -1,6 +1,7 @@
package filer
import (
+ "reflect"
"testing"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
@@ -47,3 +48,83 @@ func TestFilerConf(t *testing.T) {
assert.Equal(t, false, fc.MatchStorageRule("/buckets/other").ReadOnly)
}
+
+// TestClonePathConf verifies that ClonePathConf copies all exported fields.
+// Uses reflection to automatically detect new fields added to the protobuf,
+// ensuring the test fails if ClonePathConf is not updated for new fields.
+func TestClonePathConf(t *testing.T) {
+ // Create a fully-populated PathConf with non-zero values for all fields
+ src := &filer_pb.FilerConf_PathConf{
+ LocationPrefix: "/test/path",
+ Collection: "test_collection",
+ Replication: "001",
+ Ttl: "7d",
+ DiskType: "ssd",
+ Fsync: true,
+ VolumeGrowthCount: 5,
+ ReadOnly: true,
+ MaxFileNameLength: 255,
+ DataCenter: "dc1",
+ Rack: "rack1",
+ DataNode: "node1",
+ DisableChunkDeletion: true,
+ Worm: true,
+ WormGracePeriodSeconds: 3600,
+ WormRetentionTimeSeconds: 86400,
+ }
+
+ clone := ClonePathConf(src)
+
+ // Verify it's a different object
+ assert.NotSame(t, src, clone, "ClonePathConf should return a new object, not the same pointer")
+
+ // Use reflection to compare all exported fields
+ // This will automatically catch any new fields added to the protobuf
+ srcVal := reflect.ValueOf(src).Elem()
+ cloneVal := reflect.ValueOf(clone).Elem()
+ srcType := srcVal.Type()
+
+ for i := 0; i < srcType.NumField(); i++ {
+ field := srcType.Field(i)
+
+ // Skip unexported fields (protobuf internal fields like sizeCache, unknownFields)
+ if !field.IsExported() {
+ continue
+ }
+
+ srcField := srcVal.Field(i)
+ cloneField := cloneVal.Field(i)
+
+ // Compare field values
+ if !reflect.DeepEqual(srcField.Interface(), cloneField.Interface()) {
+ t.Errorf("Field %s not copied correctly: src=%v, clone=%v",
+ field.Name, srcField.Interface(), cloneField.Interface())
+ }
+ }
+
+ // Additionally verify that all exported fields in src are non-zero
+ // This ensures we're testing with fully populated data
+ for i := 0; i < srcType.NumField(); i++ {
+ field := srcType.Field(i)
+ if !field.IsExported() {
+ continue
+ }
+
+ srcField := srcVal.Field(i)
+ if srcField.IsZero() {
+ t.Errorf("Test setup error: field %s has zero value, update test to set a non-zero value", field.Name)
+ }
+ }
+
+ // Verify mutation of clone doesn't affect source
+ clone.Collection = "modified"
+ clone.ReadOnly = false
+ assert.Equal(t, "test_collection", src.Collection, "Modifying clone should not affect source Collection")
+ assert.Equal(t, true, src.ReadOnly, "Modifying clone should not affect source ReadOnly")
+}
+
+func TestClonePathConfNil(t *testing.T) {
+ clone := ClonePathConf(nil)
+ assert.NotNil(t, clone, "ClonePathConf(nil) should return a non-nil empty PathConf")
+ assert.Equal(t, "", clone.LocationPrefix, "ClonePathConf(nil) should return empty PathConf")
+}
diff --git a/weed/filer/filerstore_wrapper.go b/weed/filer/filerstore_wrapper.go
index 8694db984..5114955c7 100644
--- a/weed/filer/filerstore_wrapper.go
+++ b/weed/filer/filerstore_wrapper.go
@@ -32,9 +32,10 @@ type VirtualFilerStore interface {
}
type FilerStoreWrapper struct {
- defaultStore FilerStore
- pathToStore ptrie.Trie[string]
- storeIdToStore map[string]FilerStore
+ defaultStore FilerStore
+ pathToStore ptrie.Trie[string]
+ storeIdToStore map[string]FilerStore
+ hasPathSpecificStore bool // fast check to skip MatchPrefix when no path-specific stores
}
func NewFilerStoreWrapper(store FilerStore) *FilerStoreWrapper {
@@ -82,10 +83,15 @@ func (fsw *FilerStoreWrapper) AddPathSpecificStore(path string, storeId string,
if err != nil {
glog.Fatalf("put path specific store: %v", err)
}
+ fsw.hasPathSpecificStore = true
}
func (fsw *FilerStoreWrapper) getActualStore(path util.FullPath) (store FilerStore) {
store = fsw.defaultStore
+ // Fast path: skip MatchPrefix if no path-specific stores are configured (common case)
+ if !fsw.hasPathSpecificStore {
+ return
+ }
if path == "/" || path == "//" {
return
}
diff --git a/weed/filer/foundationdb/foundationdb_store.go b/weed/filer/foundationdb/foundationdb_store.go
index 852ad2701..cbbdc96b2 100644
--- a/weed/filer/foundationdb/foundationdb_store.go
+++ b/weed/filer/foundationdb/foundationdb_store.go
@@ -730,9 +730,28 @@ func (store *FoundationDBStore) Shutdown() {
glog.V(0).Infof("FoundationDB store shutdown")
}
+// tuplePool reduces allocations in genKey which is called on every FDB operation
+var tuplePool = sync.Pool{
+ New: func() interface{} {
+ // Pre-allocate slice with capacity 2 for (dirPath, fileName)
+ t := make(tuple.Tuple, 2)
+ return &t
+ },
+}
+
// Helper functions
func (store *FoundationDBStore) genKey(dirPath, fileName string) fdb.Key {
- return store.seaweedfsDir.Pack(tuple.Tuple{dirPath, fileName})
+ // Get a tuple from pool to avoid slice allocation
+ tp := tuplePool.Get().(*tuple.Tuple)
+ defer func() {
+ // Clear references before returning to pool to avoid memory leaks
+ (*tp)[0] = nil
+ (*tp)[1] = nil
+ tuplePool.Put(tp)
+ }()
+ (*tp)[0] = dirPath
+ (*tp)[1] = fileName
+ return store.seaweedfsDir.Pack(*tp)
}
func (store *FoundationDBStore) extractFileName(key fdb.Key) (string, error) {
diff --git a/weed/pb/grpc_client_server.go b/weed/pb/grpc_client_server.go
index 8460d5949..e199cddbe 100644
--- a/weed/pb/grpc_client_server.go
+++ b/weed/pb/grpc_client_server.go
@@ -138,23 +138,25 @@ func requestIDUnaryInterceptor() grpc.UnaryServerInterceptor {
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
- incomingMd, _ := metadata.FromIncomingContext(ctx)
- idList := incomingMd.Get(request_id.AmzRequestIDHeader)
+ // Get request ID from incoming metadata
var reqID string
- if len(idList) > 0 {
- reqID = idList[0]
+ if incomingMd, ok := metadata.FromIncomingContext(ctx); ok {
+ if idList := incomingMd.Get(request_id.AmzRequestIDHeader); len(idList) > 0 {
+ reqID = idList[0]
+ }
}
if reqID == "" {
reqID = uuid.New().String()
}
- ctx = metadata.NewOutgoingContext(ctx,
- metadata.New(map[string]string{
- request_id.AmzRequestIDHeader: reqID,
- }))
-
+ // Store request ID in context for handlers to access
ctx = request_id.Set(ctx, reqID)
+ // Also set outgoing context so handlers making downstream gRPC calls
+ // will automatically propagate the request ID
+ ctx = metadata.AppendToOutgoingContext(ctx, request_id.AmzRequestIDHeader, reqID)
+
+ // Set trailer with request ID for response
grpc.SetTrailer(ctx, metadata.Pairs(request_id.AmzRequestIDHeader, reqID))
return handler(ctx, req)
@@ -187,8 +189,8 @@ func WithGrpcClient(streamingMode bool, signature int32, fn func(*grpc.ClientCon
} else {
ctx := context.Background()
if signature != 0 {
- md := metadata.New(map[string]string{"sw-client-id": fmt.Sprintf("%d", signature)})
- ctx = metadata.NewOutgoingContext(ctx, md)
+ // Optimize: Use AppendToOutgoingContext instead of creating new map
+ ctx = metadata.AppendToOutgoingContext(ctx, "sw-client-id", fmt.Sprintf("%d", signature))
}
grpcConnection, err := GrpcDial(ctx, address, waitForReady, opts...)
if err != nil {
diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go
index a7bd73c35..ae9c46fb2 100644
--- a/weed/server/filer_server_handlers_write.go
+++ b/weed/server/filer_server_handlers_write.go
@@ -254,8 +254,10 @@ func (fs *FilerServer) detectStorageOption(ctx context.Context, requestURI, qCol
return nil, ErrReadOnly
}
- if rule.MaxFileNameLength == 0 {
- rule.MaxFileNameLength = fs.filer.MaxFilenameLength
+ // Use local variable instead of mutating shared rule
+ maxFileNameLength := rule.MaxFileNameLength
+ if maxFileNameLength == 0 {
+ maxFileNameLength = fs.filer.MaxFilenameLength
}
// required by buckets folder
@@ -282,7 +284,7 @@ func (fs *FilerServer) detectStorageOption(ctx context.Context, requestURI, qCol
DiskType: util.Nvl(diskType, rule.DiskType),
Fsync: rule.Fsync,
VolumeGrowthCount: rule.VolumeGrowthCount,
- MaxFileNameLength: rule.MaxFileNameLength,
+ MaxFileNameLength: maxFileNameLength,
}, nil
}
diff --git a/weed/shell/command_s3_bucket_quota_check.go b/weed/shell/command_s3_bucket_quota_check.go
index bb54b73a4..c92b52117 100644
--- a/weed/shell/command_s3_bucket_quota_check.go
+++ b/weed/shell/command_s3_bucket_quota_check.go
@@ -5,10 +5,11 @@ import (
"context"
"flag"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/filer"
- "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"io"
"math"
+
+ "github.com/seaweedfs/seaweedfs/weed/filer"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
)
func init() {
@@ -104,7 +105,10 @@ func (c *commandS3BucketQuotaEnforce) Do(args []string, commandEnv *CommandEnv,
func (c *commandS3BucketQuotaEnforce) processEachBucket(fc *filer.FilerConf, filerBucketsPath string, entry *filer_pb.Entry, writer io.Writer, collectionSize float64) (hasConfChanges bool) {
locPrefix := filerBucketsPath + "/" + entry.Name + "/"
- locConf := fc.MatchStorageRule(locPrefix)
+ existingConf := fc.MatchStorageRule(locPrefix)
+
+ // Create a mutable copy for modification
+ locConf := filer.ClonePathConf(existingConf)
locConf.LocationPrefix = locPrefix
if entry.Quota > 0 {
diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go
index 853cbe475..1c65fc916 100644
--- a/weed/util/log_buffer/log_buffer.go
+++ b/weed/util/log_buffer/log_buffer.go
@@ -17,7 +17,7 @@ import (
)
const BufferSize = 8 * 1024 * 1024
-const PreviousBufferCount = 32
+const PreviousBufferCount = 4
// Errors that can be returned by log buffer operations
var (
@@ -862,6 +862,18 @@ var bufferPool = sync.Pool{
},
}
+// logEntryPool reduces allocations in readTs which is called frequently during binary search
+var logEntryPool = sync.Pool{
+ New: func() interface{} {
+ return &filer_pb.LogEntry{}
+ },
+}
+
+// resetLogEntry clears a LogEntry for pool reuse
+func resetLogEntry(e *filer_pb.LogEntry) {
+ proto.Reset(e)
+}
+
func copiedBytes(buf []byte) (copied *bytes.Buffer) {
copied = bufferPool.Get().(*bytes.Buffer)
copied.Reset()
@@ -883,7 +895,13 @@ func readTs(buf []byte, pos int) (size int, ts int64, err error) {
}
entryData := buf[pos+4 : pos+4+size]
- logEntry := &filer_pb.LogEntry{}
+
+ // Use pooled LogEntry to avoid allocation on every call
+ logEntry := logEntryPool.Get().(*filer_pb.LogEntry)
+ defer func() {
+ resetLogEntry(logEntry)
+ logEntryPool.Put(logEntry)
+ }()
err = proto.Unmarshal(entryData, logEntry)
if err != nil {
@@ -891,6 +909,6 @@ func readTs(buf []byte, pos int) (size int, ts int64, err error) {
// This allows caller to handle corruption gracefully
return 0, 0, fmt.Errorf("corrupted log buffer: failed to unmarshal LogEntry at pos %d, size %d: %w", pos, size, err)
}
- return size, logEntry.TsNs, nil
+ return size, logEntry.TsNs, nil
}