aboutsummaryrefslogtreecommitdiff
path: root/weed/filer/meta_aggregator.go
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2025-05-22 09:54:31 -0700
committerchrislu <chris.lu@gmail.com>2025-05-22 09:54:31 -0700
commit0d62be44846354c3c37b857028297edd4b8df17b (patch)
treec89320a7d58351030f1b740c7267f56bf0206429 /weed/filer/meta_aggregator.go
parentd8c574a5ef1a811f9a0d447097d9edfcc0c1d84c (diff)
downloadseaweedfs-origin/changing-to-zap.tar.xz
seaweedfs-origin/changing-to-zap.zip
Diffstat (limited to 'weed/filer/meta_aggregator.go')
-rw-r--r--weed/filer/meta_aggregator.go40
1 files changed, 20 insertions, 20 deletions
diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go
index 976822ad1..9ce94ba05 100644
--- a/weed/filer/meta_aggregator.go
+++ b/weed/filer/meta_aggregator.go
@@ -14,7 +14,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/protobuf/proto"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
@@ -73,23 +73,23 @@ func (ma *MetaAggregator) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, star
func (ma *MetaAggregator) loopSubscribeToOneFiler(f *Filer, self pb.ServerAddress, peer pb.ServerAddress, startFrom time.Time, stopChan chan struct{}) {
lastTsNs := startFrom.UnixNano()
for {
- glog.V(0).Infof("loopSubscribeToOneFiler read %s start from %v %d", peer, time.Unix(0, lastTsNs), lastTsNs)
+ log.V(3).Infof("loopSubscribeToOneFiler read %s start from %v %d", peer, time.Unix(0, lastTsNs), lastTsNs)
nextLastTsNs, err := ma.doSubscribeToOneFiler(f, self, peer, lastTsNs)
// check stopChan to see if we should stop
select {
case <-stopChan:
- glog.V(0).Infof("stop subscribing peer %s meta change", peer)
+ log.V(3).Infof("stop subscribing peer %s meta change", peer)
return
default:
}
if err != nil {
- errLvl := glog.Level(0)
+ errLvl := log.Level(0)
if strings.Contains(err.Error(), "duplicated local subscription detected") {
- errLvl = glog.Level(4)
+ errLvl = log.Level(4)
}
- glog.V(errLvl).Infof("subscribing remote %s meta change: %v", peer, err)
+ log.V(errLvl).Infof("subscribing remote %s meta change: %v", peer, err)
}
if lastTsNs < nextLastTsNs {
lastTsNs = nextLastTsNs
@@ -126,35 +126,35 @@ func (ma *MetaAggregator) doSubscribeToOneFiler(f *Filer, self pb.ServerAddress,
defer func(prevTsNs int64) {
if lastTsNs != prevTsNs && lastTsNs != lastPersistTime.UnixNano() {
if err := ma.updateOffset(f, peer, peerSignature, lastTsNs); err == nil {
- glog.V(0).Infof("last sync time with %s at %v (%d)", peer, time.Unix(0, lastTsNs), lastTsNs)
+ log.V(3).Infof("last sync time with %s at %v (%d)", peer, time.Unix(0, lastTsNs), lastTsNs)
} else {
- glog.Errorf("failed to save last sync time with %s at %v (%d)", peer, time.Unix(0, lastTsNs), lastTsNs)
+ log.Errorf("failed to save last sync time with %s at %v (%d)", peer, time.Unix(0, lastTsNs), lastTsNs)
}
}
}(prevTsNs)
}
- glog.V(0).Infof("follow peer: %v, last %v (%d)", peer, time.Unix(0, lastTsNs), lastTsNs)
+ log.V(3).Infof("follow peer: %v, last %v (%d)", peer, time.Unix(0, lastTsNs), lastTsNs)
var counter int64
var synced bool
maybeReplicateMetadataChange = func(event *filer_pb.SubscribeMetadataResponse) {
if err := Replay(f.Store, event); err != nil {
- glog.Errorf("failed to reply metadata change from %v: %v", peer, err)
+ log.Errorf("failed to reply metadata change from %v: %v", peer, err)
return
}
counter++
if lastPersistTime.Add(time.Minute).Before(time.Now()) {
if err := ma.updateOffset(f, peer, peerSignature, event.TsNs); err == nil {
if event.TsNs < time.Now().Add(-2*time.Minute).UnixNano() {
- glog.V(0).Infof("sync with %s progressed to: %v %0.2f/sec", peer, time.Unix(0, event.TsNs), float64(counter)/60.0)
+ log.V(3).Infof("sync with %s progressed to: %v %0.2f/sec", peer, time.Unix(0, event.TsNs), float64(counter)/60.0)
} else if !synced {
synced = true
- glog.V(0).Infof("synced with %s", peer)
+ log.V(3).Infof("synced with %s", peer)
}
lastPersistTime = time.Now()
counter = 0
} else {
- glog.V(0).Infof("failed to update offset for %v: %v", peer, err)
+ log.V(3).Infof("failed to update offset for %v: %v", peer, err)
}
}
}
@@ -163,7 +163,7 @@ func (ma *MetaAggregator) doSubscribeToOneFiler(f *Filer, self pb.ServerAddress,
processEventFn := func(event *filer_pb.SubscribeMetadataResponse) error {
data, err := proto.Marshal(event)
if err != nil {
- glog.Errorf("failed to marshal subscribed filer_pb.SubscribeMetadataResponse %+v: %v", event, err)
+ log.Errorf("failed to marshal subscribed filer_pb.SubscribeMetadataResponse %+v: %v", event, err)
return err
}
dir := event.Directory
@@ -175,7 +175,7 @@ func (ma *MetaAggregator) doSubscribeToOneFiler(f *Filer, self pb.ServerAddress,
return nil
}
- glog.V(0).Infof("subscribing remote %s meta change: %v, clientId:%d", peer, time.Unix(0, lastTsNs), ma.filer.UniqueFilerId)
+ log.V(3).Infof("subscribing remote %s meta change: %v, clientId:%d", peer, time.Unix(0, lastTsNs), ma.filer.UniqueFilerId)
err = pb.WithFilerClient(true, 0, peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@@ -188,7 +188,7 @@ func (ma *MetaAggregator) doSubscribeToOneFiler(f *Filer, self pb.ServerAddress,
ClientEpoch: atomic.LoadInt32(&ma.filer.UniqueFilerEpoch),
})
if err != nil {
- glog.V(0).Infof("SubscribeLocalMetadata %v: %v", peer, err)
+ log.V(3).Infof("SubscribeLocalMetadata %v: %v", peer, err)
return fmt.Errorf("subscribe: %v", err)
}
@@ -198,12 +198,12 @@ func (ma *MetaAggregator) doSubscribeToOneFiler(f *Filer, self pb.ServerAddress,
return nil
}
if listenErr != nil {
- glog.V(0).Infof("SubscribeLocalMetadata stream %v: %v", peer, listenErr)
+ log.V(3).Infof("SubscribeLocalMetadata stream %v: %v", peer, listenErr)
return listenErr
}
if err := processEventFn(resp); err != nil {
- glog.V(0).Infof("SubscribeLocalMetadata process %v: %v", resp, err)
+ log.V(3).Infof("SubscribeLocalMetadata process %v: %v", resp, err)
return fmt.Errorf("process %v: %v", resp, err)
}
@@ -248,7 +248,7 @@ func (ma *MetaAggregator) readOffset(f *Filer, peer pb.ServerAddress, peerSignat
lastTsNs = int64(util.BytesToUint64(value))
- glog.V(0).Infof("readOffset %s : %d", peer, lastTsNs)
+ log.V(3).Infof("readOffset %s : %d", peer, lastTsNs)
return
}
@@ -266,7 +266,7 @@ func (ma *MetaAggregator) updateOffset(f *Filer, peer pb.ServerAddress, peerSign
return fmt.Errorf("updateOffset %s : %v", peer, err)
}
- glog.V(4).Infof("updateOffset %s : %d", peer, lastTsNs)
+ log.V(-1).Infof("updateOffset %s : %d", peer, lastTsNs)
return
}