diff options
Diffstat (limited to 'weed/mq/topic')
| -rw-r--r-- | weed/mq/topic/local_partition.go | 20 |
1 files changed, 10 insertions, 10 deletions
diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go index d1433775a..158551747 100644 --- a/weed/mq/topic/local_partition.go +++ b/weed/mq/topic/local_partition.go @@ -3,7 +3,7 @@ package topic import ( "context" "fmt" - "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util/log" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" @@ -82,7 +82,7 @@ func (p *LocalPartition) Subscribe(clientName string, startPosition log_buffer.M for { processedPosition, isDone, readPersistedLogErr = p.LogBuffer.ReadFromDiskFn(startPosition, 0, eachMessageFn) if readPersistedLogErr != nil { - glog.V(0).Infof("%s read %v persisted log: %v", clientName, p.Partition, readPersistedLogErr) + log.V(3).Infof("%s read %v persisted log: %v", clientName, p.Partition, readPersistedLogErr) return readPersistedLogErr } if isDone { @@ -104,7 +104,7 @@ func (p *LocalPartition) Subscribe(clientName string, startPosition log_buffer.M continue } if readInMemoryLogErr != nil { - glog.V(0).Infof("%s read %v in memory log: %v", clientName, p.Partition, readInMemoryLogErr) + log.V(3).Infof("%s read %v in memory log: %v", clientName, p.Partition, readInMemoryLogErr) return readInMemoryLogErr } } @@ -179,10 +179,10 @@ func (p *LocalPartition) MaybeConnectToFollowers(initMessage *mq_pb.PublishMessa if err != nil { e, _ := status.FromError(err) if e.Code() == codes.Canceled { - glog.V(0).Infof("local partition %v follower %v stopped", p.Partition, p.Follower) + log.V(3).Infof("local partition %v follower %v stopped", p.Partition, p.Follower) return } - glog.Errorf("Receiving local partition %v follower %s ack: %v", p.Partition, p.Follower, err) + log.Errorf("Receiving local partition %v follower %s ack: %v", p.Partition, p.Follower, err) return } atomic.StoreInt64(&p.AckTsNs, ack.AckTsNs) @@ -206,9 +206,9 @@ func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) { Close: &mq_pb.PublishFollowMeRequest_CloseMessage{}, }, }); followErr != nil { - glog.Errorf("Error closing follower stream: %v", followErr) + log.Errorf("Error closing follower stream: %v", followErr) } - glog.V(4).Infof("closing grpcConnection to follower") + log.V(-1).Infof("closing grpcConnection to follower") p.followerGrpcConnection.Close() p.publishFolloweMeStream = nil p.Follower = "" @@ -217,7 +217,7 @@ func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) { hasShutdown = true } - glog.V(0).Infof("local partition %v Publisher:%d Subscriber:%d follower:%s shutdown %v", p.Partition, p.Publishers.Size(), p.Subscribers.Size(), p.Follower, hasShutdown) + log.V(3).Infof("local partition %v Publisher:%d Subscriber:%d follower:%s shutdown %v", p.Partition, p.Publishers.Size(), p.Subscribers.Size(), p.Follower, hasShutdown) return } @@ -225,7 +225,7 @@ func (p *LocalPartition) Shutdown() { p.closePublishers() p.closeSubscribers() p.LogBuffer.ShutdownLogBuffer() - glog.V(0).Infof("local partition %v shutting down", p.Partition) + log.V(3).Infof("local partition %v shutting down", p.Partition) } func (p *LocalPartition) NotifyLogFlushed(flushTsNs int64) { @@ -237,7 +237,7 @@ func (p *LocalPartition) NotifyLogFlushed(flushTsNs int64) { }, }, }); followErr != nil { - glog.Errorf("send follower %s flush message: %v", p.Follower, followErr) + log.Errorf("send follower %s flush message: %v", p.Follower, followErr) } // println("notifying", p.Follower, "flushed at", flushTsNs) } |
