aboutsummaryrefslogtreecommitdiff
path: root/weed/s3api/s3api_handlers.go
blob: 6c47e8256a3735458f3f43291c232c7261b408c7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package s3api

import (
	"encoding/base64"
	"fmt"
	"net/http"

	"github.com/seaweedfs/seaweedfs/weed/glog"
	"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
	"google.golang.org/grpc"

	"github.com/seaweedfs/seaweedfs/weed/pb"
	"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
)

var _ = filer_pb.FilerClient(&S3ApiServer{})

func (s3a *S3ApiServer) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
	// Use filerClient for proper connection management and failover
	if s3a.filerClient != nil {
		return s3a.withFilerClientFailover(streamingMode, fn)
	}
	
	// Fallback to direct connection if filerClient not initialized
	// This should only happen during initialization or testing
	return pb.WithGrpcClient(streamingMode, s3a.randomClientId, func(grpcConnection *grpc.ClientConn) error {
		client := filer_pb.NewSeaweedFilerClient(grpcConnection)
		return fn(client)
	}, s3a.getFilerAddress().ToGrpcAddress(), false, s3a.option.GrpcDialOption)

}

// withFilerClientFailover attempts to execute fn with automatic failover to other filers
func (s3a *S3ApiServer) withFilerClientFailover(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
	// Get current filer as starting point
	currentFiler := s3a.filerClient.GetCurrentFiler()
	
	// Try current filer first (fast path)
	err := pb.WithGrpcClient(streamingMode, s3a.randomClientId, func(grpcConnection *grpc.ClientConn) error {
		client := filer_pb.NewSeaweedFilerClient(grpcConnection)
		return fn(client)
	}, currentFiler.ToGrpcAddress(), false, s3a.option.GrpcDialOption)
	
	if err == nil {
		s3a.filerClient.RecordFilerSuccess(currentFiler)
		return nil
	}
	
	// Record failure for current filer
	s3a.filerClient.RecordFilerFailure(currentFiler)
	
	// Current filer failed - try all other filers with health-aware selection
	filers := s3a.filerClient.GetAllFilers()
	var lastErr error = err
	
	for _, filer := range filers {
		if filer == currentFiler {
			continue // Already tried this one
		}
		
		// Skip filers known to be unhealthy (circuit breaker pattern)
		if s3a.filerClient.ShouldSkipUnhealthyFiler(filer) {
			glog.V(2).Infof("WithFilerClient: skipping unhealthy filer %s", filer)
			continue
		}
		
		err = pb.WithGrpcClient(streamingMode, s3a.randomClientId, func(grpcConnection *grpc.ClientConn) error {
			client := filer_pb.NewSeaweedFilerClient(grpcConnection)
			return fn(client)
		}, filer.ToGrpcAddress(), false, s3a.option.GrpcDialOption)
		
		if err == nil {
			// Success! Record success and update current filer for future requests
			s3a.filerClient.RecordFilerSuccess(filer)
			s3a.filerClient.SetCurrentFiler(filer)
			glog.V(1).Infof("WithFilerClient: failover from %s to %s succeeded", currentFiler, filer)
			return nil
		}
		
		// Record failure for health tracking
		s3a.filerClient.RecordFilerFailure(filer)
		glog.V(2).Infof("WithFilerClient: failover to %s failed: %v", filer, err)
		lastErr = err
	}
	
	// All filers failed
	return fmt.Errorf("all filers failed, last error: %w", lastErr)
}

func (s3a *S3ApiServer) AdjustedUrl(location *filer_pb.Location) string {
	return location.Url
}

func (s3a *S3ApiServer) GetDataCenter() string {
	return s3a.option.DataCenter
}

func writeSuccessResponseXML(w http.ResponseWriter, r *http.Request, response interface{}) {
	s3err.WriteXMLResponse(w, r, http.StatusOK, response)
	s3err.PostLog(r, http.StatusOK, s3err.ErrNone)
}

func writeSuccessResponseEmpty(w http.ResponseWriter, r *http.Request) {
	s3err.WriteEmptyResponse(w, r, http.StatusOK)
}

func writeFailureResponse(w http.ResponseWriter, r *http.Request, errCode s3err.ErrorCode) {
	s3err.WriteErrorResponse(w, r, errCode)
}

func validateContentMd5(h http.Header) ([]byte, error) {
	md5B64, ok := h["Content-Md5"]
	if ok {
		if md5B64[0] == "" {
			return nil, fmt.Errorf("Content-Md5 header set to empty value")
		}
		return base64.StdEncoding.DecodeString(md5B64[0])
	}
	return []byte{}, nil
}