diff options
| author | chrislu <chris.lu@gmail.com> | 2025-05-22 09:54:31 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2025-05-22 09:54:31 -0700 |
| commit | 0d62be44846354c3c37b857028297edd4b8df17b (patch) | |
| tree | c89320a7d58351030f1b740c7267f56bf0206429 /weed/server/filer_grpc_server_sub_meta.go | |
| parent | d8c574a5ef1a811f9a0d447097d9edfcc0c1d84c (diff) | |
| download | seaweedfs-origin/changing-to-zap.tar.xz seaweedfs-origin/changing-to-zap.zip | |
Diffstat (limited to 'weed/server/filer_grpc_server_sub_meta.go')
| -rw-r--r-- | weed/server/filer_grpc_server_sub_meta.go | 36 |
1 files changed, 18 insertions, 18 deletions
diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go index dfe594b46..583fd06c6 100644 --- a/weed/server/filer_grpc_server_sub_meta.go +++ b/weed/server/filer_grpc_server_sub_meta.go @@ -12,7 +12,7 @@ import ( "google.golang.org/protobuf/proto" "github.com/seaweedfs/seaweedfs/weed/filer" - "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util/log" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" @@ -36,13 +36,13 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, return fmt.Errorf("duplicated subscription detected for client %s id %d", clientName, req.ClientId) } defer func() { - glog.V(0).Infof("disconnect %v subscriber %s clientId:%d", clientName, req.PathPrefix, req.ClientId) + log.V(3).Infof("disconnect %v subscriber %s clientId:%d", clientName, req.PathPrefix, req.ClientId) fs.deleteClient("", clientName, req.ClientId, req.ClientEpoch) fs.filer.MetaAggregator.ListenersCond.Broadcast() // nudges the subscribers that are waiting }() lastReadTime := log_buffer.NewMessagePosition(req.SinceNs, -2) - glog.V(0).Infof(" %v starts to subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) + log.V(3).Infof(" %v starts to subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName) @@ -55,7 +55,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, for { - glog.V(4).Infof("read on disk %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) + log.V(-1).Infof("read on disk %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) processedTsNs, isDone, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, req.UntilNs, eachLogEntryFn) if readPersistedLogErr != nil { @@ -65,7 +65,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, return nil } - glog.V(4).Infof("processed to %v: %v", clientName, processedTsNs) + log.V(-1).Infof("processed to %v: %v", clientName, processedTsNs) if processedTsNs != 0 { lastReadTime = log_buffer.NewMessagePosition(processedTsNs, -2) } else { @@ -80,7 +80,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, } } - glog.V(4).Infof("read in memory %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) + log.V(-1).Infof("read in memory %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) lastReadTime, isDone, readInMemoryLogErr = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData("aggMeta:"+clientName, lastReadTime, req.UntilNs, func() bool { // Check if the client has disconnected by monitoring the context @@ -99,7 +99,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, if errors.Is(readInMemoryLogErr, log_buffer.ResumeFromDiskError) { continue } - glog.Errorf("processed to %v: %v", lastReadTime, readInMemoryLogErr) + log.Errorf("processed to %v: %v", lastReadTime, readInMemoryLogErr) if !errors.Is(readInMemoryLogErr, log_buffer.ResumeError) { break } @@ -108,7 +108,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, return nil } if !fs.hasClient(req.ClientId, req.ClientEpoch) { - glog.V(0).Infof("client %v is closed", clientName) + log.V(3).Infof("client %v is closed", clientName) return nil } @@ -134,13 +134,13 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq return fmt.Errorf("duplicated local subscription detected for client %s clientId:%d", clientName, req.ClientId) } defer func() { - glog.V(0).Infof("disconnect %v local subscriber %s clientId:%d", clientName, req.PathPrefix, req.ClientId) + log.V(3).Infof("disconnect %v local subscriber %s clientId:%d", clientName, req.PathPrefix, req.ClientId) fs.deleteClient("local", clientName, req.ClientId, req.ClientEpoch) fs.listenersCond.Broadcast() // nudges the subscribers that are waiting }() lastReadTime := log_buffer.NewMessagePosition(req.SinceNs, -2) - glog.V(0).Infof(" + %v local subscribe %s from %+v clientId:%d", clientName, req.PathPrefix, lastReadTime, req.ClientId) + log.V(3).Infof(" + %v local subscribe %s from %+v clientId:%d", clientName, req.PathPrefix, lastReadTime, req.ClientId) eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName) @@ -153,10 +153,10 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq for { // println("reading from persisted logs ...") - glog.V(0).Infof("read on disk %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) + log.V(3).Infof("read on disk %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) processedTsNs, isDone, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, req.UntilNs, eachLogEntryFn) if readPersistedLogErr != nil { - glog.V(0).Infof("read on disk %v local subscribe %s from %+v: %v", clientName, req.PathPrefix, lastReadTime, readPersistedLogErr) + log.V(3).Infof("read on disk %v local subscribe %s from %+v: %v", clientName, req.PathPrefix, lastReadTime, readPersistedLogErr) return fmt.Errorf("reading from persisted logs: %v", readPersistedLogErr) } if isDone { @@ -172,7 +172,7 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq } } - glog.V(0).Infof("read in memory %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) + log.V(3).Infof("read in memory %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) lastReadTime, isDone, readInMemoryLogErr = fs.filer.LocalMetaLogBuffer.LoopProcessLogData("localMeta:"+clientName, lastReadTime, req.UntilNs, func() bool { @@ -197,7 +197,7 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq if readInMemoryLogErr == log_buffer.ResumeFromDiskError { continue } - glog.Errorf("processed to %v: %v", lastReadTime, readInMemoryLogErr) + log.Errorf("processed to %v: %v", lastReadTime, readInMemoryLogErr) if readInMemoryLogErr != log_buffer.ResumeError { break } @@ -218,7 +218,7 @@ func eachLogEntryFn(eachEventNotificationFn func(dirPath string, eventNotificati return func(logEntry *filer_pb.LogEntry) (bool, error) { event := &filer_pb.SubscribeMetadataResponse{} if err := proto.Unmarshal(logEntry.Data, event); err != nil { - glog.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err) + log.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err) return false, fmt.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err) } @@ -301,7 +301,7 @@ func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRe } // println("sending", dirPath, entryName) if err := stream.Send(message); err != nil { - glog.V(0).Infof("=> client %v: %+v", clientName, err) + log.V(3).Infof("=> client %v: %+v", clientName, err) return err } filtered = 0 @@ -329,7 +329,7 @@ func matchByDirectory(dirPath string, directories []string) bool { func (fs *FilerServer) addClient(prefix string, clientType string, clientAddress string, clientId int32, clientEpoch int32) (isReplacing, alreadyKnown bool, clientName string) { clientName = clientType + "@" + clientAddress - glog.V(0).Infof("+ %v listener %v clientId %v clientEpoch %v", prefix, clientName, clientId, clientEpoch) + log.V(3).Infof("+ %v listener %v clientId %v clientEpoch %v", prefix, clientName, clientId, clientEpoch) if clientId != 0 { fs.knownListenersLock.Lock() defer fs.knownListenersLock.Unlock() @@ -345,7 +345,7 @@ func (fs *FilerServer) addClient(prefix string, clientType string, clientAddress } func (fs *FilerServer) deleteClient(prefix string, clientName string, clientId int32, clientEpoch int32) { - glog.V(0).Infof("- %v listener %v clientId %v clientEpoch %v", prefix, clientName, clientId, clientEpoch) + log.V(3).Infof("- %v listener %v clientId %v clientEpoch %v", prefix, clientName, clientId, clientEpoch) if clientId != 0 { fs.knownListenersLock.Lock() defer fs.knownListenersLock.Unlock() |
