aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/topic/local_partition.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/topic/local_partition.go')
-rw-r--r--weed/mq/topic/local_partition.go20
1 files changed, 10 insertions, 10 deletions
diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go
index d1433775a..158551747 100644
--- a/weed/mq/topic/local_partition.go
+++ b/weed/mq/topic/local_partition.go
@@ -3,7 +3,7 @@ package topic
import (
"context"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
@@ -82,7 +82,7 @@ func (p *LocalPartition) Subscribe(clientName string, startPosition log_buffer.M
for {
processedPosition, isDone, readPersistedLogErr = p.LogBuffer.ReadFromDiskFn(startPosition, 0, eachMessageFn)
if readPersistedLogErr != nil {
- glog.V(0).Infof("%s read %v persisted log: %v", clientName, p.Partition, readPersistedLogErr)
+ log.V(3).Infof("%s read %v persisted log: %v", clientName, p.Partition, readPersistedLogErr)
return readPersistedLogErr
}
if isDone {
@@ -104,7 +104,7 @@ func (p *LocalPartition) Subscribe(clientName string, startPosition log_buffer.M
continue
}
if readInMemoryLogErr != nil {
- glog.V(0).Infof("%s read %v in memory log: %v", clientName, p.Partition, readInMemoryLogErr)
+ log.V(3).Infof("%s read %v in memory log: %v", clientName, p.Partition, readInMemoryLogErr)
return readInMemoryLogErr
}
}
@@ -179,10 +179,10 @@ func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessa
if err != nil {
e, _ := status.FromError(err)
if e.Code() == codes.Canceled {
- glog.V(0).Infof("local partition %v follower %v stopped", p.Partition, p.Follower)
+ log.V(3).Infof("local partition %v follower %v stopped", p.Partition, p.Follower)
return
}
- glog.Errorf("Receiving local partition %v follower %s ack: %v", p.Partition, p.Follower, err)
+ log.Errorf("Receiving local partition %v follower %s ack: %v", p.Partition, p.Follower, err)
return
}
atomic.StoreInt64(&p.AckTsNs, ack.AckTsNs)
@@ -206,9 +206,9 @@ func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) {
Close: &mq_pb.PublishFollowMeRequest_CloseMessage{},
},
}); followErr != nil {
- glog.Errorf("Error closing follower stream: %v", followErr)
+ log.Errorf("Error closing follower stream: %v", followErr)
}
- glog.V(4).Infof("closing grpcConnection to follower")
+ log.V(-1).Infof("closing grpcConnection to follower")
p.followerGrpcConnection.Close()
p.publishFolloweMeStream = nil
p.Follower = ""
@@ -217,7 +217,7 @@ func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) {
hasShutdown = true
}
- glog.V(0).Infof("local partition %v Publisher:%d Subscriber:%d follower:%s shutdown %v", p.Partition, p.Publishers.Size(), p.Subscribers.Size(), p.Follower, hasShutdown)
+ log.V(3).Infof("local partition %v Publisher:%d Subscriber:%d follower:%s shutdown %v", p.Partition, p.Publishers.Size(), p.Subscribers.Size(), p.Follower, hasShutdown)
return
}
@@ -225,7 +225,7 @@ func (p *LocalPartition) Shutdown() {
p.closePublishers()
p.closeSubscribers()
p.LogBuffer.ShutdownLogBuffer()
- glog.V(0).Infof("local partition %v shutting down", p.Partition)
+ log.V(3).Infof("local partition %v shutting down", p.Partition)
}
func (p *LocalPartition) NotifyLogFlushed(flushTsNs int64) {
@@ -237,7 +237,7 @@ func (p *LocalPartition) NotifyLogFlushed(flushTsNs int64) {
},
},
}); followErr != nil {
- glog.Errorf("send follower %s flush message: %v", p.Follower, followErr)
+ log.Errorf("send follower %s flush message: %v", p.Follower, followErr)
}
// println("notifying", p.Follower, "flushed at", flushTsNs)
}