aboutsummaryrefslogtreecommitdiff
path: root/weed/pb/grpc_client_server.go
diff options
context:
space:
mode:
authorAleksey Kosov <rusyak777@list.ru>2025-05-21 17:57:39 +0300
committerGitHub <noreply@github.com>2025-05-21 07:57:39 -0700
commit5182d46e22d2458b16f1f2fb0358f6b5f3e18b5d (patch)
tree1f95adace7c8954512150f205005a347aff3653b /weed/pb/grpc_client_server.go
parent140b7a7402109a55072458e42a32bc1ef4a608a9 (diff)
downloadseaweedfs-5182d46e22d2458b16f1f2fb0358f6b5f3e18b5d.tar.xz
seaweedfs-5182d46e22d2458b16f1f2fb0358f6b5f3e18b5d.zip
Added middleware for processing request_id grpc and http requests (#6805)
Diffstat (limited to 'weed/pb/grpc_client_server.go')
-rw-r--r--weed/pb/grpc_client_server.go26
1 files changed, 26 insertions, 0 deletions
diff --git a/weed/pb/grpc_client_server.go b/weed/pb/grpc_client_server.go
index 777dfb402..ba0d7d0cc 100644
--- a/weed/pb/grpc_client_server.go
+++ b/weed/pb/grpc_client_server.go
@@ -3,6 +3,7 @@ package pb
import (
"context"
"fmt"
+ "github.com/google/uuid"
"google.golang.org/grpc/metadata"
"math/rand/v2"
"net/http"
@@ -58,6 +59,7 @@ func NewGrpcServer(opts ...grpc.ServerOption) *grpc.Server {
}),
grpc.MaxRecvMsgSize(Max_Message_Size),
grpc.MaxSendMsgSize(Max_Message_Size),
+ grpc.UnaryInterceptor(requestIDUnaryInterceptor()),
)
for _, opt := range opts {
if opt != nil {
@@ -118,6 +120,30 @@ func getOrCreateConnection(address string, waitForReady bool, opts ...grpc.DialO
return vgc, nil
}
+func requestIDUnaryInterceptor() grpc.UnaryServerInterceptor {
+ return func(
+ ctx context.Context,
+ req interface{},
+ info *grpc.UnaryServerInfo,
+ handler grpc.UnaryHandler,
+ ) (interface{}, error) {
+ md, _ := metadata.FromIncomingContext(ctx)
+ idList := md.Get(util.RequestIDKey)
+ var reqID string
+ if len(idList) > 0 {
+ reqID = idList[0]
+ }
+ if reqID == "" {
+ reqID = uuid.New().String()
+ }
+
+ ctx = util.WithRequestID(ctx, reqID)
+ grpc.SetTrailer(ctx, metadata.Pairs(util.RequestIDKey, reqID))
+
+ return handler(ctx, req)
+ }
+}
+
// WithGrpcClient In streamingMode, always use a fresh connection. Otherwise, try to reuse an existing connection.
func WithGrpcClient(streamingMode bool, signature int32, fn func(*grpc.ClientConn) error, address string, waitForReady bool, opts ...grpc.DialOption) error {