aboutsummaryrefslogtreecommitdiff
path: root/weed/filer/meta_aggregator.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer/meta_aggregator.go')
-rw-r--r--weed/filer/meta_aggregator.go16
1 files changed, 8 insertions, 8 deletions
diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go
index 913cbd454..008fd33a7 100644
--- a/weed/filer/meta_aggregator.go
+++ b/weed/filer/meta_aggregator.go
@@ -18,7 +18,7 @@ import (
)
type MetaAggregator struct {
- filers []string
+ filers []pb.ServerAddress
grpcDialOption grpc.DialOption
MetaLogBuffer *log_buffer.LogBuffer
// notifying clients
@@ -28,7 +28,7 @@ type MetaAggregator struct {
// MetaAggregator only aggregates data "on the fly". The logs are not re-persisted to disk.
// The old data comes from what each LocalMetadata persisted on disk.
-func NewMetaAggregator(filers []string, grpcDialOption grpc.DialOption) *MetaAggregator {
+func NewMetaAggregator(filers []pb.ServerAddress, grpcDialOption grpc.DialOption) *MetaAggregator {
t := &MetaAggregator{
filers: filers,
grpcDialOption: grpcDialOption,
@@ -40,13 +40,13 @@ func NewMetaAggregator(filers []string, grpcDialOption grpc.DialOption) *MetaAgg
return t
}
-func (ma *MetaAggregator) StartLoopSubscribe(f *Filer, self string) {
+func (ma *MetaAggregator) StartLoopSubscribe(f *Filer, self pb.ServerAddress) {
for _, filer := range ma.filers {
go ma.subscribeToOneFiler(f, self, filer)
}
}
-func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, peer string) {
+func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self pb.ServerAddress, peer pb.ServerAddress) {
/*
Each filer reads the "filer.store.id", which is the store's signature when filer starts.
@@ -123,7 +123,7 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, peer string
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := client.SubscribeLocalMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
- ClientName: "filer:" + self,
+ ClientName: "filer:" + string(self),
PathPrefix: "/",
SinceNs: lastTsNs,
})
@@ -156,7 +156,7 @@ func (ma *MetaAggregator) subscribeToOneFiler(f *Filer, self string, peer string
}
}
-func (ma *MetaAggregator) readFilerStoreSignature(peer string) (sig int32, err error) {
+func (ma *MetaAggregator) readFilerStoreSignature(peer pb.ServerAddress) (sig int32, err error) {
err = pb.WithFilerClient(peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
@@ -172,7 +172,7 @@ const (
MetaOffsetPrefix = "Meta"
)
-func (ma *MetaAggregator) readOffset(f *Filer, peer string, peerSignature int32) (lastTsNs int64, err error) {
+func (ma *MetaAggregator) readOffset(f *Filer, peer pb.ServerAddress, peerSignature int32) (lastTsNs int64, err error) {
key := []byte(MetaOffsetPrefix + "xxxx")
util.Uint32toBytes(key[len(MetaOffsetPrefix):], uint32(peerSignature))
@@ -195,7 +195,7 @@ func (ma *MetaAggregator) readOffset(f *Filer, peer string, peerSignature int32)
return
}
-func (ma *MetaAggregator) updateOffset(f *Filer, peer string, peerSignature int32, lastTsNs int64) (err error) {
+func (ma *MetaAggregator) updateOffset(f *Filer, peer pb.ServerAddress, peerSignature int32, lastTsNs int64) (err error) {
key := []byte(MetaOffsetPrefix + "xxxx")
util.Uint32toBytes(key[len(MetaOffsetPrefix):], uint32(peerSignature))