aboutsummaryrefslogtreecommitdiff
path: root/weed/server/filer_grpc_server_sub_meta.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server/filer_grpc_server_sub_meta.go')
-rw-r--r--weed/server/filer_grpc_server_sub_meta.go36
1 files changed, 18 insertions, 18 deletions
diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go
index dfe594b46..583fd06c6 100644
--- a/weed/server/filer_grpc_server_sub_meta.go
+++ b/weed/server/filer_grpc_server_sub_meta.go
@@ -12,7 +12,7 @@ import (
"google.golang.org/protobuf/proto"
"github.com/seaweedfs/seaweedfs/weed/filer"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
@@ -36,13 +36,13 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
return fmt.Errorf("duplicated subscription detected for client %s id %d", clientName, req.ClientId)
}
defer func() {
- glog.V(0).Infof("disconnect %v subscriber %s clientId:%d", clientName, req.PathPrefix, req.ClientId)
+ log.V(3).Infof("disconnect %v subscriber %s clientId:%d", clientName, req.PathPrefix, req.ClientId)
fs.deleteClient("", clientName, req.ClientId, req.ClientEpoch)
fs.filer.MetaAggregator.ListenersCond.Broadcast() // nudges the subscribers that are waiting
}()
lastReadTime := log_buffer.NewMessagePosition(req.SinceNs, -2)
- glog.V(0).Infof(" %v starts to subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
+ log.V(3).Infof(" %v starts to subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName)
@@ -55,7 +55,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
for {
- glog.V(4).Infof("read on disk %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
+ log.V(-1).Infof("read on disk %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
processedTsNs, isDone, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, req.UntilNs, eachLogEntryFn)
if readPersistedLogErr != nil {
@@ -65,7 +65,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
return nil
}
- glog.V(4).Infof("processed to %v: %v", clientName, processedTsNs)
+ log.V(-1).Infof("processed to %v: %v", clientName, processedTsNs)
if processedTsNs != 0 {
lastReadTime = log_buffer.NewMessagePosition(processedTsNs, -2)
} else {
@@ -80,7 +80,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
}
}
- glog.V(4).Infof("read in memory %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
+ log.V(-1).Infof("read in memory %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
lastReadTime, isDone, readInMemoryLogErr = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData("aggMeta:"+clientName, lastReadTime, req.UntilNs, func() bool {
// Check if the client has disconnected by monitoring the context
@@ -99,7 +99,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
if errors.Is(readInMemoryLogErr, log_buffer.ResumeFromDiskError) {
continue
}
- glog.Errorf("processed to %v: %v", lastReadTime, readInMemoryLogErr)
+ log.Errorf("processed to %v: %v", lastReadTime, readInMemoryLogErr)
if !errors.Is(readInMemoryLogErr, log_buffer.ResumeError) {
break
}
@@ -108,7 +108,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
return nil
}
if !fs.hasClient(req.ClientId, req.ClientEpoch) {
- glog.V(0).Infof("client %v is closed", clientName)
+ log.V(3).Infof("client %v is closed", clientName)
return nil
}
@@ -134,13 +134,13 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
return fmt.Errorf("duplicated local subscription detected for client %s clientId:%d", clientName, req.ClientId)
}
defer func() {
- glog.V(0).Infof("disconnect %v local subscriber %s clientId:%d", clientName, req.PathPrefix, req.ClientId)
+ log.V(3).Infof("disconnect %v local subscriber %s clientId:%d", clientName, req.PathPrefix, req.ClientId)
fs.deleteClient("local", clientName, req.ClientId, req.ClientEpoch)
fs.listenersCond.Broadcast() // nudges the subscribers that are waiting
}()
lastReadTime := log_buffer.NewMessagePosition(req.SinceNs, -2)
- glog.V(0).Infof(" + %v local subscribe %s from %+v clientId:%d", clientName, req.PathPrefix, lastReadTime, req.ClientId)
+ log.V(3).Infof(" + %v local subscribe %s from %+v clientId:%d", clientName, req.PathPrefix, lastReadTime, req.ClientId)
eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName)
@@ -153,10 +153,10 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
for {
// println("reading from persisted logs ...")
- glog.V(0).Infof("read on disk %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
+ log.V(3).Infof("read on disk %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
processedTsNs, isDone, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, req.UntilNs, eachLogEntryFn)
if readPersistedLogErr != nil {
- glog.V(0).Infof("read on disk %v local subscribe %s from %+v: %v", clientName, req.PathPrefix, lastReadTime, readPersistedLogErr)
+ log.V(3).Infof("read on disk %v local subscribe %s from %+v: %v", clientName, req.PathPrefix, lastReadTime, readPersistedLogErr)
return fmt.Errorf("reading from persisted logs: %v", readPersistedLogErr)
}
if isDone {
@@ -172,7 +172,7 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
}
}
- glog.V(0).Infof("read in memory %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
+ log.V(3).Infof("read in memory %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
lastReadTime, isDone, readInMemoryLogErr = fs.filer.LocalMetaLogBuffer.LoopProcessLogData("localMeta:"+clientName, lastReadTime, req.UntilNs, func() bool {
@@ -197,7 +197,7 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
if readInMemoryLogErr == log_buffer.ResumeFromDiskError {
continue
}
- glog.Errorf("processed to %v: %v", lastReadTime, readInMemoryLogErr)
+ log.Errorf("processed to %v: %v", lastReadTime, readInMemoryLogErr)
if readInMemoryLogErr != log_buffer.ResumeError {
break
}
@@ -218,7 +218,7 @@ func eachLogEntryFn(eachEventNotificationFn func(dirPath string, eventNotificati
return func(logEntry *filer_pb.LogEntry) (bool, error) {
event := &filer_pb.SubscribeMetadataResponse{}
if err := proto.Unmarshal(logEntry.Data, event); err != nil {
- glog.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err)
+ log.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err)
return false, fmt.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err)
}
@@ -301,7 +301,7 @@ func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRe
}
// println("sending", dirPath, entryName)
if err := stream.Send(message); err != nil {
- glog.V(0).Infof("=> client %v: %+v", clientName, err)
+ log.V(3).Infof("=> client %v: %+v", clientName, err)
return err
}
filtered = 0
@@ -329,7 +329,7 @@ func matchByDirectory(dirPath string, directories []string) bool {
func (fs *FilerServer) addClient(prefix string, clientType string, clientAddress string, clientId int32, clientEpoch int32) (isReplacing, alreadyKnown bool, clientName string) {
clientName = clientType + "@" + clientAddress
- glog.V(0).Infof("+ %v listener %v clientId %v clientEpoch %v", prefix, clientName, clientId, clientEpoch)
+ log.V(3).Infof("+ %v listener %v clientId %v clientEpoch %v", prefix, clientName, clientId, clientEpoch)
if clientId != 0 {
fs.knownListenersLock.Lock()
defer fs.knownListenersLock.Unlock()
@@ -345,7 +345,7 @@ func (fs *FilerServer) addClient(prefix string, clientType string, clientAddress
}
func (fs *FilerServer) deleteClient(prefix string, clientName string, clientId int32, clientEpoch int32) {
- glog.V(0).Infof("- %v listener %v clientId %v clientEpoch %v", prefix, clientName, clientId, clientEpoch)
+ log.V(3).Infof("- %v listener %v clientId %v clientEpoch %v", prefix, clientName, clientId, clientEpoch)
if clientId != 0 {
fs.knownListenersLock.Lock()
defer fs.knownListenersLock.Unlock()