1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
|
package s3api
import (
"encoding/base64"
"fmt"
"net/http"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
"google.golang.org/grpc"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
)
var _ = filer_pb.FilerClient(&S3ApiServer{})
func (s3a *S3ApiServer) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
// Use filerClient for proper connection management and failover
if s3a.filerClient != nil {
return s3a.withFilerClientFailover(streamingMode, fn)
}
// Fallback to direct connection if filerClient not initialized
// This should only happen during initialization or testing
return pb.WithGrpcClient(streamingMode, s3a.randomClientId, func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}, s3a.getFilerAddress().ToGrpcAddress(), false, s3a.option.GrpcDialOption)
}
// withFilerClientFailover attempts to execute fn with automatic failover to other filers
func (s3a *S3ApiServer) withFilerClientFailover(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
// Get current filer as starting point
currentFiler := s3a.filerClient.GetCurrentFiler()
// Try current filer first (fast path)
err := pb.WithGrpcClient(streamingMode, s3a.randomClientId, func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}, currentFiler.ToGrpcAddress(), false, s3a.option.GrpcDialOption)
if err == nil {
s3a.filerClient.RecordFilerSuccess(currentFiler)
return nil
}
// Record failure for current filer
s3a.filerClient.RecordFilerFailure(currentFiler)
// Current filer failed - try all other filers with health-aware selection
filers := s3a.filerClient.GetAllFilers()
var lastErr error = err
for _, filer := range filers {
if filer == currentFiler {
continue // Already tried this one
}
// Skip filers known to be unhealthy (circuit breaker pattern)
if s3a.filerClient.ShouldSkipUnhealthyFiler(filer) {
glog.V(2).Infof("WithFilerClient: skipping unhealthy filer %s", filer)
continue
}
err = pb.WithGrpcClient(streamingMode, s3a.randomClientId, func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}, filer.ToGrpcAddress(), false, s3a.option.GrpcDialOption)
if err == nil {
// Success! Record success and update current filer for future requests
s3a.filerClient.RecordFilerSuccess(filer)
s3a.filerClient.SetCurrentFiler(filer)
glog.V(1).Infof("WithFilerClient: failover from %s to %s succeeded", currentFiler, filer)
return nil
}
// Record failure for health tracking
s3a.filerClient.RecordFilerFailure(filer)
glog.V(2).Infof("WithFilerClient: failover to %s failed: %v", filer, err)
lastErr = err
}
// All filers failed
return fmt.Errorf("all filers failed, last error: %w", lastErr)
}
func (s3a *S3ApiServer) AdjustedUrl(location *filer_pb.Location) string {
return location.Url
}
func (s3a *S3ApiServer) GetDataCenter() string {
return s3a.option.DataCenter
}
func writeSuccessResponseXML(w http.ResponseWriter, r *http.Request, response interface{}) {
s3err.WriteXMLResponse(w, r, http.StatusOK, response)
s3err.PostLog(r, http.StatusOK, s3err.ErrNone)
}
func writeSuccessResponseEmpty(w http.ResponseWriter, r *http.Request) {
s3err.WriteEmptyResponse(w, r, http.StatusOK)
}
func writeFailureResponse(w http.ResponseWriter, r *http.Request, errCode s3err.ErrorCode) {
s3err.WriteErrorResponse(w, r, errCode)
}
func validateContentMd5(h http.Header) ([]byte, error) {
md5B64, ok := h["Content-Md5"]
if ok {
if md5B64[0] == "" {
return nil, fmt.Errorf("Content-Md5 header set to empty value")
}
return base64.StdEncoding.DecodeString(md5B64[0])
}
return []byte{}, nil
}
|