aboutsummaryrefslogtreecommitdiff
path: root/weed/filer/meta_aggregator.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer/meta_aggregator.go')
-rw-r--r--weed/filer/meta_aggregator.go14
1 files changed, 9 insertions, 5 deletions
diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go
index 2ff62bf13..1ea334224 100644
--- a/weed/filer/meta_aggregator.go
+++ b/weed/filer/meta_aggregator.go
@@ -3,14 +3,15 @@ package filer
import (
"context"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
- "github.com/seaweedfs/seaweedfs/weed/util"
"io"
"strings"
"sync"
"sync/atomic"
"time"
+ "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+
"google.golang.org/grpc"
"google.golang.org/protobuf/proto"
@@ -29,8 +30,9 @@ type MetaAggregator struct {
peerChans map[pb.ServerAddress]chan struct{}
peerChansLock sync.Mutex
// notifying clients
- ListenersLock sync.Mutex
- ListenersCond *sync.Cond
+ ListenersLock sync.Mutex
+ ListenersCond *sync.Cond
+ ListenersWaits int64 // Atomic counter
}
// MetaAggregator only aggregates data "on the fly". The logs are not re-persisted to disk.
@@ -44,7 +46,9 @@ func NewMetaAggregator(filer *Filer, self pb.ServerAddress, grpcDialOption grpc.
}
t.ListenersCond = sync.NewCond(&t.ListenersLock)
t.MetaLogBuffer = log_buffer.NewLogBuffer("aggr", LogFlushInterval, nil, nil, func() {
- t.ListenersCond.Broadcast()
+ if atomic.LoadInt64(&t.ListenersWaits) > 0 {
+ t.ListenersCond.Broadcast()
+ }
})
return t
}