aboutsummaryrefslogtreecommitdiff
path: root/weed/filer_client/filer_client_accessor.go
blob: 955a295cca9f0ab1d6c8dd1d8c1cb1af4d09deb9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
package filer_client

import (
	"fmt"
	"math/rand"
	"sync"
	"sync/atomic"
	"time"

	"github.com/seaweedfs/seaweedfs/weed/glog"
	"github.com/seaweedfs/seaweedfs/weed/mq/topic"
	"github.com/seaweedfs/seaweedfs/weed/pb"
	"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
	"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
	"google.golang.org/grpc"
)

// filerHealth tracks the health status of a filer
type filerHealth struct {
	address      pb.ServerAddress
	failureCount int32
	lastFailure  time.Time
	backoffUntil time.Time
}

// isHealthy returns true if the filer is not in backoff period
func (fh *filerHealth) isHealthy() bool {
	return time.Now().After(fh.backoffUntil)
}

// recordFailure updates failure count and sets backoff time using exponential backoff
func (fh *filerHealth) recordFailure() {
	count := atomic.AddInt32(&fh.failureCount, 1)
	fh.lastFailure = time.Now()

	// Exponential backoff: 1s, 2s, 4s, 8s, 16s, 32s, max 30s
	// Calculate 2^(count-1) but cap the result at 30 seconds
	backoffSeconds := 1 << (count - 1)
	if backoffSeconds > 30 {
		backoffSeconds = 30
	}
	fh.backoffUntil = time.Now().Add(time.Duration(backoffSeconds) * time.Second)

	glog.V(1).Infof("Filer %v failed %d times, backing off for %ds", fh.address, count, backoffSeconds)
}

// recordSuccess resets failure count and clears backoff
func (fh *filerHealth) recordSuccess() {
	atomic.StoreInt32(&fh.failureCount, 0)
	fh.backoffUntil = time.Time{}
}

type FilerClientAccessor struct {
	GetGrpcDialOption func() grpc.DialOption
	GetFilers         func() []pb.ServerAddress // Returns multiple filer addresses for failover

	// Health tracking for smart failover
	filerHealthMap sync.Map // map[pb.ServerAddress]*filerHealth
}

// getOrCreateFilerHealth returns the health tracker for a filer, creating one if needed
func (fca *FilerClientAccessor) getOrCreateFilerHealth(address pb.ServerAddress) *filerHealth {
	if health, ok := fca.filerHealthMap.Load(address); ok {
		return health.(*filerHealth)
	}

	newHealth := &filerHealth{
		address:      address,
		failureCount: 0,
		backoffUntil: time.Time{},
	}

	actual, _ := fca.filerHealthMap.LoadOrStore(address, newHealth)
	return actual.(*filerHealth)
}

// partitionFilers separates filers into healthy and backoff groups
func (fca *FilerClientAccessor) partitionFilers(filers []pb.ServerAddress) (healthy, backoff []pb.ServerAddress) {
	for _, filer := range filers {
		health := fca.getOrCreateFilerHealth(filer)
		if health.isHealthy() {
			healthy = append(healthy, filer)
		} else {
			backoff = append(backoff, filer)
		}
	}
	return healthy, backoff
}

// shuffleFilers randomizes the order of filers to distribute load
func (fca *FilerClientAccessor) shuffleFilers(filers []pb.ServerAddress) []pb.ServerAddress {
	if len(filers) <= 1 {
		return filers
	}

	shuffled := make([]pb.ServerAddress, len(filers))
	copy(shuffled, filers)

	// Fisher-Yates shuffle
	for i := len(shuffled) - 1; i > 0; i-- {
		j := rand.Intn(i + 1)
		shuffled[i], shuffled[j] = shuffled[j], shuffled[i]
	}

	return shuffled
}

func (fca *FilerClientAccessor) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
	return fca.withMultipleFilers(streamingMode, fn)
}

// withMultipleFilers tries each filer with smart failover and backoff logic
func (fca *FilerClientAccessor) withMultipleFilers(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
	filers := fca.GetFilers()
	if len(filers) == 0 {
		return fmt.Errorf("no filer addresses available")
	}

	// Partition filers into healthy and backoff groups
	healthyFilers, backoffFilers := fca.partitionFilers(filers)

	// Shuffle healthy filers to distribute load evenly
	healthyFilers = fca.shuffleFilers(healthyFilers)

	// Try healthy filers first
	var lastErr error
	for _, filerAddress := range healthyFilers {
		health := fca.getOrCreateFilerHealth(filerAddress)

		err := pb.WithFilerClient(streamingMode, 0, filerAddress, fca.GetGrpcDialOption(), fn)
		if err == nil {
			// Success - record it and return
			health.recordSuccess()
			glog.V(2).Infof("Filer %v succeeded", filerAddress)
			return nil
		}

		// Record failure and continue to next filer
		health.recordFailure()
		lastErr = err
		glog.V(1).Infof("Healthy filer %v failed: %v, trying next", filerAddress, err)
	}

	// If all healthy filers failed, try backoff filers as last resort
	if len(backoffFilers) > 0 {
		glog.V(1).Infof("All healthy filers failed, trying %d backoff filers", len(backoffFilers))

		for _, filerAddress := range backoffFilers {
			health := fca.getOrCreateFilerHealth(filerAddress)

			err := pb.WithFilerClient(streamingMode, 0, filerAddress, fca.GetGrpcDialOption(), fn)
			if err == nil {
				// Success - record it and return
				health.recordSuccess()
				glog.V(1).Infof("Backoff filer %v recovered and succeeded", filerAddress)
				return nil
			}

			// Update failure record
			health.recordFailure()
			lastErr = err
			glog.V(1).Infof("Backoff filer %v still failing: %v", filerAddress, err)
		}
	}

	return fmt.Errorf("all filer connections failed, last error: %v", lastErr)
}

func (fca *FilerClientAccessor) SaveTopicConfToFiler(t topic.Topic, conf *mq_pb.ConfigureTopicResponse) error {

	glog.V(0).Infof("save conf for topic %v to filer", t)

	// save the topic configuration on filer
	return fca.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
		return t.WriteConfFile(client, conf)
	})
}

func (fca *FilerClientAccessor) ReadTopicConfFromFiler(t topic.Topic) (conf *mq_pb.ConfigureTopicResponse, err error) {

	glog.V(1).Infof("load conf for topic %v from filer", t)

	if err = fca.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
		conf, err = t.ReadConfFile(client)
		return err
	}); err != nil {
		return nil, err
	}

	return conf, nil
}

// ReadTopicConfFromFilerWithMetadata reads topic configuration along with file creation and modification times
func (fca *FilerClientAccessor) ReadTopicConfFromFilerWithMetadata(t topic.Topic) (conf *mq_pb.ConfigureTopicResponse, createdAtNs, modifiedAtNs int64, err error) {

	glog.V(1).Infof("load conf with metadata for topic %v from filer", t)

	if err = fca.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
		conf, createdAtNs, modifiedAtNs, err = t.ReadConfFileWithMetadata(client)
		return err
	}); err != nil {
		return nil, 0, 0, err
	}

	return conf, createdAtNs, modifiedAtNs, nil
}

// NewFilerClientAccessor creates a FilerClientAccessor with one or more filers
func NewFilerClientAccessor(filerAddresses []pb.ServerAddress, grpcDialOption grpc.DialOption) *FilerClientAccessor {
	if len(filerAddresses) == 0 {
		panic("at least one filer address is required")
	}

	return &FilerClientAccessor{
		GetGrpcDialOption: func() grpc.DialOption {
			return grpcDialOption
		},
		GetFilers: func() []pb.ServerAddress {
			return filerAddresses
		},
		filerHealthMap: sync.Map{},
	}
}

// AddFilerAddresses adds more filer addresses to the existing list
func (fca *FilerClientAccessor) AddFilerAddresses(additionalFilers []pb.ServerAddress) {
	if len(additionalFilers) == 0 {
		return
	}

	// Get the current filers if available
	var allFilers []pb.ServerAddress
	if fca.GetFilers != nil {
		allFilers = append(allFilers, fca.GetFilers()...)
	}

	// Add the additional filers
	allFilers = append(allFilers, additionalFilers...)

	// Update the filers list
	fca.GetFilers = func() []pb.ServerAddress {
		return allFilers
	}
}