diff options
| author | Chris Lu <chris.lu@gmail.com> | 2020-11-16 22:26:58 -0800 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2020-11-16 22:26:58 -0800 |
| commit | 6c9156b25f8b1c28fb0cc909310a20aeeec0e087 (patch) | |
| tree | 343e30d98e46a081aa57adfc334b807d0b3255dc /weed/replication | |
| parent | 9add554feb53706d1d878cc9636d234e622b8a80 (diff) | |
| download | seaweedfs-origin/logrus.tar.xz seaweedfs-origin/logrus.zip | |
switch to logrusorigin/logrus
losing filename and line number. Critical for debugging.
Diffstat (limited to 'weed/replication')
| -rw-r--r-- | weed/replication/repl_util/replication_utli.go | 6 | ||||
| -rw-r--r-- | weed/replication/replicator.go | 16 | ||||
| -rw-r--r-- | weed/replication/sink/azuresink/azure_sink.go | 4 | ||||
| -rw-r--r-- | weed/replication/sink/filersink/fetch_write.go | 10 | ||||
| -rw-r--r-- | weed/replication/sink/filersink/filer_sink.go | 28 | ||||
| -rw-r--r-- | weed/replication/sink/gcssink/gcs_sink.go | 6 | ||||
| -rw-r--r-- | weed/replication/sink/s3sink/s3_sink.go | 10 | ||||
| -rw-r--r-- | weed/replication/sink/s3sink/s3_write.go | 34 | ||||
| -rw-r--r-- | weed/replication/source/filer_source.go | 10 | ||||
| -rw-r--r-- | weed/replication/sub/notification_aws_sqs.go | 8 | ||||
| -rw-r--r-- | weed/replication/sub/notification_gocdk_pub_sub.go | 4 | ||||
| -rw-r--r-- | weed/replication/sub/notification_google_pub_sub.go | 18 | ||||
| -rw-r--r-- | weed/replication/sub/notification_kafka.go | 14 |
13 files changed, 84 insertions, 84 deletions
diff --git a/weed/replication/repl_util/replication_utli.go b/weed/replication/repl_util/replication_utli.go index 42777f4ad..cc4c5d806 100644 --- a/weed/replication/repl_util/replication_utli.go +++ b/weed/replication/repl_util/replication_utli.go @@ -2,7 +2,7 @@ package repl_util import ( "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util/log" "github.com/chrislusf/seaweedfs/weed/replication/source" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -23,9 +23,9 @@ func CopyFromChunkViews(chunkViews []*filer.ChunkView, filerSource *source.Filer writeErr = writeFunc(data) }) if err != nil { - glog.V(1).Infof("read from %s: %v", fileUrl, err) + log.Debugf("read from %s: %v", fileUrl, err) } else if writeErr != nil { - glog.V(1).Infof("copy from %s: %v", fileUrl, writeErr) + log.Debugf("copy from %s: %v", fileUrl, writeErr) } else { break } diff --git a/weed/replication/replicator.go b/weed/replication/replicator.go index c4228434f..5e8d47d86 100644 --- a/weed/replication/replicator.go +++ b/weed/replication/replicator.go @@ -7,7 +7,7 @@ import ( "google.golang.org/grpc" "strings" - "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util/log" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/replication/sink" "github.com/chrislusf/seaweedfs/weed/replication/source" @@ -37,28 +37,28 @@ func (r *Replicator) Replicate(ctx context.Context, key string, message *filer_p return nil } if !strings.HasPrefix(key, r.source.Dir) { - glog.V(4).Infof("skipping %v outside of %v", key, r.source.Dir) + log.Tracef("skipping %v outside of %v", key, r.source.Dir) return nil } newKey := util.Join(r.sink.GetSinkToDirectory(), key[len(r.source.Dir):]) - glog.V(3).Infof("replicate %s => %s", key, newKey) + log.Tracef("replicate %s => %s", key, newKey) key = newKey if message.OldEntry != nil && message.NewEntry == nil { - glog.V(4).Infof("deleting %v", key) + log.Tracef("deleting %v", key) return r.sink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures) } if message.OldEntry == nil && message.NewEntry != nil { - glog.V(4).Infof("creating %v", key) + log.Tracef("creating %v", key) return r.sink.CreateEntry(key, message.NewEntry, message.Signatures) } if message.OldEntry == nil && message.NewEntry == nil { - glog.V(0).Infof("weird message %+v", message) + log.Infof("weird message %+v", message) return nil } foundExisting, err := r.sink.UpdateEntry(key, message.OldEntry, message.NewParentPath, message.NewEntry, message.DeleteChunks, message.Signatures) if foundExisting { - glog.V(4).Infof("updated %v", key) + log.Tracef("updated %v", key) return err } @@ -67,7 +67,7 @@ func (r *Replicator) Replicate(ctx context.Context, key string, message *filer_p return fmt.Errorf("delete old entry %v: %v", key, err) } - glog.V(4).Infof("creating missing %v", key) + log.Tracef("creating missing %v", key) return r.sink.CreateEntry(key, message.NewEntry, message.Signatures) } diff --git a/weed/replication/sink/azuresink/azure_sink.go b/weed/replication/sink/azuresink/azure_sink.go index df70be64b..ff2325685 100644 --- a/weed/replication/sink/azuresink/azure_sink.go +++ b/weed/replication/sink/azuresink/azure_sink.go @@ -10,7 +10,7 @@ import ( "github.com/Azure/azure-storage-blob-go/azblob" "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util/log" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/replication/sink" "github.com/chrislusf/seaweedfs/weed/replication/source" @@ -56,7 +56,7 @@ func (g *AzureSink) initialize(accountName, accountKey, container, dir string) e // Use your Storage account's name and key to create a credential object. credential, err := azblob.NewSharedKeyCredential(accountName, accountKey) if err != nil { - glog.Fatalf("failed to create Azure credential with account name:%s key:%s", accountName, accountKey) + log.Fatalf("failed to create Azure credential with account name:%s key:%s", accountName, accountKey) } // Create a request pipeline that is used to process HTTP(S) requests and responses. diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go index d193ff81c..f26b4876c 100644 --- a/weed/replication/sink/filersink/fetch_write.go +++ b/weed/replication/sink/filersink/fetch_write.go @@ -8,7 +8,7 @@ import ( "google.golang.org/grpc" - "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util/log" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" @@ -82,7 +82,7 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string) resp, err := client.AssignVolume(context.Background(), request) if err != nil { - glog.V(0).Infof("assign volume failure %v: %v", request, err) + log.Infof("assign volume failure %v: %v", request, err) return err } if resp.Error != "" { @@ -98,16 +98,16 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string) fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) - glog.V(4).Infof("replicating %s to %s header:%+v", filename, fileUrl, header) + log.Tracef("replicating %s to %s header:%+v", filename, fileUrl, header) // fetch data as is, regardless whether it is encrypted or not uploadResult, err, _ := operation.Upload(fileUrl, filename, false, resp.Body, "gzip" == header.Get("Content-Encoding"), header.Get("Content-Type"), nil, auth) if err != nil { - glog.V(0).Infof("upload source data %v to %s: %v", sourceChunk.GetFileIdString(), fileUrl, err) + log.Infof("upload source data %v to %s: %v", sourceChunk.GetFileIdString(), fileUrl, err) return "", fmt.Errorf("upload data: %v", err) } if uploadResult.Error != "" { - glog.V(0).Infof("upload failure %v to %s: %v", filename, fileUrl, err) + log.Infof("upload failure %v to %s: %v", filename, fileUrl, err) return "", fmt.Errorf("upload result: %v", uploadResult.Error) } diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go index 6f467ea58..948d5913c 100644 --- a/weed/replication/sink/filersink/filer_sink.go +++ b/weed/replication/sink/filersink/filer_sink.go @@ -9,7 +9,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util/log" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/replication/sink" "github.com/chrislusf/seaweedfs/weed/replication/source" @@ -68,10 +68,10 @@ func (fs *FilerSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bo dir, name := util.FullPath(key).DirAndName() - glog.V(4).Infof("delete entry: %v", key) + log.Tracef("delete entry: %v", key) err := filer_pb.Remove(fs, dir, name, deleteIncludeChunks, true, true, true, signatures) if err != nil { - glog.V(0).Infof("delete entry %s: %v", key, err) + log.Infof("delete entry %s: %v", key, err) return fmt.Errorf("delete entry %s: %v", key, err) } return nil @@ -88,10 +88,10 @@ func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [ Directory: dir, Name: name, } - glog.V(1).Infof("lookup: %v", lookupRequest) + log.Debugf("lookup: %v", lookupRequest) if resp, err := filer_pb.LookupEntry(client, lookupRequest); err == nil { if filer.ETag(resp.Entry) == filer.ETag(entry) { - glog.V(3).Infof("already replicated %s", key) + log.Tracef("already replicated %s", key) return nil } } @@ -100,10 +100,10 @@ func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [ if err != nil { // only warning here since the source chunk may have been deleted already - glog.Warningf("replicate entry chunks %s: %v", key, err) + log.Warnf("replicate entry chunks %s: %v", key, err) } - glog.V(4).Infof("replicated %s %+v ===> %+v", key, entry.Chunks, replicatedChunks) + log.Tracef("replicated %s %+v ===> %+v", key, entry.Chunks, replicatedChunks) request := &filer_pb.CreateEntryRequest{ Directory: dir, @@ -117,9 +117,9 @@ func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [ Signatures: signatures, } - glog.V(3).Infof("create: %v", request) + log.Tracef("create: %v", request) if err := filer_pb.CreateEntry(client, request); err != nil { - glog.V(0).Infof("create entry %s: %v", key, err) + log.Infof("create entry %s: %v", key, err) return fmt.Errorf("create entry %s: %v", key, err) } @@ -140,10 +140,10 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent Name: name, } - glog.V(4).Infof("lookup entry: %v", request) + log.Tracef("lookup entry: %v", request) resp, err := filer_pb.LookupEntry(client, request) if err != nil { - glog.V(0).Infof("lookup %s: %v", key, err) + log.Infof("lookup %s: %v", key, err) return err } @@ -156,16 +156,16 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent return false, fmt.Errorf("lookup %s: %v", key, err) } - glog.V(4).Infof("oldEntry %+v, newEntry %+v, existingEntry: %+v", oldEntry, newEntry, existingEntry) + log.Tracef("oldEntry %+v, newEntry %+v, existingEntry: %+v", oldEntry, newEntry, existingEntry) if existingEntry.Attributes.Mtime > newEntry.Attributes.Mtime { // skip if already changed // this usually happens when the messages are not ordered - glog.V(2).Infof("late updates %s", key) + log.Debugf("late updates %s", key) } else if filer.ETag(newEntry) == filer.ETag(existingEntry) { // skip if no change // this usually happens when retrying the replication - glog.V(3).Infof("already replicated %s", key) + log.Tracef("already replicated %s", key) } else { // find out what changed deletedChunks, newChunks, err := compareChunks(filer.LookupFn(fs), oldEntry, newEntry) diff --git a/weed/replication/sink/gcssink/gcs_sink.go b/weed/replication/sink/gcssink/gcs_sink.go index badabc32c..80feb2cbb 100644 --- a/weed/replication/sink/gcssink/gcs_sink.go +++ b/weed/replication/sink/gcssink/gcs_sink.go @@ -10,7 +10,7 @@ import ( "google.golang.org/api/option" "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util/log" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/replication/sink" "github.com/chrislusf/seaweedfs/weed/replication/source" @@ -57,12 +57,12 @@ func (g *GcsSink) initialize(google_application_credentials, bucketName, dir str var found bool google_application_credentials, found = os.LookupEnv("GOOGLE_APPLICATION_CREDENTIALS") if !found { - glog.Fatalf("need to specific GOOGLE_APPLICATION_CREDENTIALS env variable or google_application_credentials in replication.toml") + log.Fatalf("need to specific GOOGLE_APPLICATION_CREDENTIALS env variable or google_application_credentials in replication.toml") } } client, err := storage.NewClient(context.Background(), option.WithCredentialsFile(google_application_credentials)) if err != nil { - glog.Fatalf("Failed to create client: %v", err) + log.Fatalf("Failed to create client: %v", err) } g.client = client diff --git a/weed/replication/sink/s3sink/s3_sink.go b/weed/replication/sink/s3sink/s3_sink.go index 58432ee6b..71da89df3 100644 --- a/weed/replication/sink/s3sink/s3_sink.go +++ b/weed/replication/sink/s3sink/s3_sink.go @@ -13,7 +13,7 @@ import ( "github.com/aws/aws-sdk-go/service/s3/s3iface" "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util/log" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/replication/sink" "github.com/chrislusf/seaweedfs/weed/replication/source" @@ -42,10 +42,10 @@ func (s3sink *S3Sink) GetSinkToDirectory() string { } func (s3sink *S3Sink) Initialize(configuration util.Configuration, prefix string) error { - glog.V(0).Infof("sink.s3.region: %v", configuration.GetString(prefix+"region")) - glog.V(0).Infof("sink.s3.bucket: %v", configuration.GetString(prefix+"bucket")) - glog.V(0).Infof("sink.s3.directory: %v", configuration.GetString(prefix+"directory")) - glog.V(0).Infof("sink.s3.endpoint: %v", configuration.GetString(prefix+"endpoint")) + log.Infof("sink.s3.region: %v", configuration.GetString(prefix+"region")) + log.Infof("sink.s3.bucket: %v", configuration.GetString(prefix+"bucket")) + log.Infof("sink.s3.directory: %v", configuration.GetString(prefix+"directory")) + log.Infof("sink.s3.endpoint: %v", configuration.GetString(prefix+"endpoint")) return s3sink.initialize( configuration.GetString(prefix+"aws_access_key_id"), configuration.GetString(prefix+"aws_secret_access_key"), diff --git a/weed/replication/sink/s3sink/s3_write.go b/weed/replication/sink/s3sink/s3_write.go index b172ea2c3..51d6ae1b5 100644 --- a/weed/replication/sink/s3sink/s3_write.go +++ b/weed/replication/sink/s3sink/s3_write.go @@ -10,7 +10,7 @@ import ( "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/service/s3" "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util/log" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -24,9 +24,9 @@ func (s3sink *S3Sink) deleteObject(key string) error { result, err := s3sink.conn.DeleteObject(input) if err == nil { - glog.V(0).Infof("[%s] delete %s: %v", s3sink.bucket, key, result) + log.Infof("[%s] delete %s: %v", s3sink.bucket, key, result) } else { - glog.Errorf("[%s] delete %s: %v", s3sink.bucket, key, err) + log.Errorf("[%s] delete %s: %v", s3sink.bucket, key, err) } return err @@ -43,9 +43,9 @@ func (s3sink *S3Sink) createMultipartUpload(key string, entry *filer_pb.Entry) ( result, err := s3sink.conn.CreateMultipartUpload(input) if err == nil { - glog.V(0).Infof("[%s] createMultipartUpload %s: %v", s3sink.bucket, key, result) + log.Infof("[%s] createMultipartUpload %s: %v", s3sink.bucket, key, result) } else { - glog.Errorf("[%s] createMultipartUpload %s: %v", s3sink.bucket, key, err) + log.Errorf("[%s] createMultipartUpload %s: %v", s3sink.bucket, key, err) return "", err } @@ -64,19 +64,19 @@ func (s3sink *S3Sink) abortMultipartUpload(key, uploadId string) error { if aerr, ok := err.(awserr.Error); ok { switch aerr.Code() { case s3.ErrCodeNoSuchUpload: - glog.Errorf("[%s] abortMultipartUpload %s: %v %v", s3sink.bucket, key, s3.ErrCodeNoSuchUpload, aerr.Error()) + log.Errorf("[%s] abortMultipartUpload %s: %v %v", s3sink.bucket, key, s3.ErrCodeNoSuchUpload, aerr.Error()) default: - glog.Errorf("[%s] abortMultipartUpload %s: %v", s3sink.bucket, key, aerr.Error()) + log.Errorf("[%s] abortMultipartUpload %s: %v", s3sink.bucket, key, aerr.Error()) } } else { // Print the error, cast err to awserr.Error to get the Code and // Message from an error. - glog.Errorf("[%s] abortMultipartUpload %s: %v", s3sink.bucket, key, aerr.Error()) + log.Errorf("[%s] abortMultipartUpload %s: %v", s3sink.bucket, key, aerr.Error()) } return err } - glog.V(0).Infof("[%s] abortMultipartUpload %s: %v", s3sink.bucket, key, result) + log.Infof("[%s] abortMultipartUpload %s: %v", s3sink.bucket, key, result) return nil } @@ -94,9 +94,9 @@ func (s3sink *S3Sink) completeMultipartUpload(ctx context.Context, key, uploadId result, err := s3sink.conn.CompleteMultipartUpload(input) if err == nil { - glog.V(0).Infof("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, result) + log.Infof("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, result) } else { - glog.Errorf("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, err) + log.Errorf("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, err) } return err @@ -108,7 +108,7 @@ func (s3sink *S3Sink) uploadPart(key, uploadId string, partId int, chunk *filer. readSeeker, err := s3sink.buildReadSeeker(chunk) if err != nil { - glog.Errorf("[%s] uploadPart %s %d read: %v", s3sink.bucket, key, partId, err) + log.Errorf("[%s] uploadPart %s %d read: %v", s3sink.bucket, key, partId, err) return nil, fmt.Errorf("[%s] uploadPart %s %d read: %v", s3sink.bucket, key, partId, err) } @@ -122,9 +122,9 @@ func (s3sink *S3Sink) uploadPart(key, uploadId string, partId int, chunk *filer. result, err := s3sink.conn.UploadPart(input) if err == nil { - glog.V(0).Infof("[%s] uploadPart %s %d upload: %v", s3sink.bucket, key, partId, result) + log.Infof("[%s] uploadPart %s %d upload: %v", s3sink.bucket, key, partId, result) } else { - glog.Errorf("[%s] uploadPart %s %d upload: %v", s3sink.bucket, key, partId, err) + log.Errorf("[%s] uploadPart %s %d upload: %v", s3sink.bucket, key, partId, err) } part := &s3.CompletedPart{ @@ -148,9 +148,9 @@ func (s3sink *S3Sink) uploadPartCopy(key, uploadId string, partId int64, copySou result, err := s3sink.conn.UploadPartCopy(input) if err == nil { - glog.V(0).Infof("[%s] uploadPartCopy %s %d: %v", s3sink.bucket, key, partId, result) + log.Infof("[%s] uploadPartCopy %s %d: %v", s3sink.bucket, key, partId, result) } else { - glog.Errorf("[%s] uploadPartCopy %s %d: %v", s3sink.bucket, key, partId, err) + log.Errorf("[%s] uploadPartCopy %s %d: %v", s3sink.bucket, key, partId, err) } return err @@ -165,7 +165,7 @@ func (s3sink *S3Sink) buildReadSeeker(chunk *filer.ChunkView) (io.ReadSeeker, er for _, fileUrl := range fileUrls { _, err = util.ReadUrl(fileUrl+"?readDeleted=true", nil, false, false, chunk.Offset, int(chunk.Size), buf) if err != nil { - glog.V(1).Infof("read from %s: %v", fileUrl, err) + log.Debugf("read from %s: %v", fileUrl, err) } else { break } diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go index ff4f2eb26..a54bc99fd 100644 --- a/weed/replication/source/filer_source.go +++ b/weed/replication/source/filer_source.go @@ -12,7 +12,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util/log" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -49,7 +49,7 @@ func (fs *FilerSource) LookupFileId(part string) (fileUrls []string, err error) err = fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - glog.V(4).Infof("read lookup volume id locations: %v", vid) + log.Tracef("read lookup volume id locations: %v", vid) resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{ VolumeIds: []string{vid}, }) @@ -63,14 +63,14 @@ func (fs *FilerSource) LookupFileId(part string) (fileUrls []string, err error) }) if err != nil { - glog.V(1).Infof("LookupFileId volume id %s: %v", vid, err) + log.Debugf("LookupFileId volume id %s: %v", vid, err) return nil, fmt.Errorf("LookupFileId volume id %s: %v", vid, err) } locations := vid2Locations[vid] if locations == nil || len(locations.Locations) == 0 { - glog.V(1).Infof("LookupFileId locate volume id %s: %v", vid, err) + log.Debugf("LookupFileId locate volume id %s: %v", vid, err) return nil, fmt.Errorf("LookupFileId locate volume id %s: %v", vid, err) } @@ -91,7 +91,7 @@ func (fs *FilerSource) ReadPart(part string) (filename string, header http.Heade for _, fileUrl := range fileUrls { filename, header, resp, err = util.DownloadFile(fileUrl) if err != nil { - glog.V(1).Infof("fail to read from %s: %v", fileUrl, err) + log.Debugf("fail to read from %s: %v", fileUrl, err) } else { break } diff --git a/weed/replication/sub/notification_aws_sqs.go b/weed/replication/sub/notification_aws_sqs.go index 1dd386ba7..fc114d3c2 100644 --- a/weed/replication/sub/notification_aws_sqs.go +++ b/weed/replication/sub/notification_aws_sqs.go @@ -8,7 +8,7 @@ import ( "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/sqs" - "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util/log" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" "github.com/golang/protobuf/proto" @@ -28,8 +28,8 @@ func (k *AwsSqsInput) GetName() string { } func (k *AwsSqsInput) Initialize(configuration util.Configuration, prefix string) error { - glog.V(0).Infof("replication.notification.aws_sqs.region: %v", configuration.GetString(prefix+"region")) - glog.V(0).Infof("replication.notification.aws_sqs.sqs_queue_name: %v", configuration.GetString(prefix+"sqs_queue_name")) + log.Infof("replication.notification.aws_sqs.region: %v", configuration.GetString(prefix+"region")) + log.Infof("replication.notification.aws_sqs.sqs_queue_name: %v", configuration.GetString(prefix+"sqs_queue_name")) return k.initialize( configuration.GetString(prefix+"aws_access_key_id"), configuration.GetString(prefix+"aws_secret_access_key"), @@ -106,7 +106,7 @@ func (k *AwsSqsInput) ReceiveMessage() (key string, message *filer_pb.EventNotif }) if err != nil { - glog.V(1).Infof("delete message from sqs %s: %v", k.queueUrl, err) + log.Debugf("delete message from sqs %s: %v", k.queueUrl, err) } return diff --git a/weed/replication/sub/notification_gocdk_pub_sub.go b/weed/replication/sub/notification_gocdk_pub_sub.go index 9726096e5..edbb11bc0 100644 --- a/weed/replication/sub/notification_gocdk_pub_sub.go +++ b/weed/replication/sub/notification_gocdk_pub_sub.go @@ -3,7 +3,7 @@ package sub import ( "context" - "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util/log" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" "github.com/golang/protobuf/proto" @@ -29,7 +29,7 @@ func (k *GoCDKPubSubInput) GetName() string { func (k *GoCDKPubSubInput) Initialize(configuration util.Configuration, prefix string) error { subURL := configuration.GetString(prefix + "sub_url") - glog.V(0).Infof("notification.gocdk_pub_sub.sub_url: %v", subURL) + log.Infof("notification.gocdk_pub_sub.sub_url: %v", subURL) sub, err := pubsub.OpenSubscription(context.Background(), subURL) if err != nil { return err diff --git a/weed/replication/sub/notification_google_pub_sub.go b/weed/replication/sub/notification_google_pub_sub.go index a950bb42b..c21246606 100644 --- a/weed/replication/sub/notification_google_pub_sub.go +++ b/weed/replication/sub/notification_google_pub_sub.go @@ -6,7 +6,7 @@ import ( "os" "cloud.google.com/go/pubsub" - "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util/log" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" "github.com/golang/protobuf/proto" @@ -28,8 +28,8 @@ func (k *GooglePubSubInput) GetName() string { } func (k *GooglePubSubInput) Initialize(configuration util.Configuration, prefix string) error { - glog.V(0).Infof("notification.google_pub_sub.project_id: %v", configuration.GetString(prefix+"project_id")) - glog.V(0).Infof("notification.google_pub_sub.topic: %v", configuration.GetString(prefix+"topic")) + log.Infof("notification.google_pub_sub.project_id: %v", configuration.GetString(prefix+"project_id")) + log.Infof("notification.google_pub_sub.topic: %v", configuration.GetString(prefix+"topic")) return k.initialize( configuration.GetString(prefix+"google_application_credentials"), configuration.GetString(prefix+"project_id"), @@ -45,13 +45,13 @@ func (k *GooglePubSubInput) initialize(google_application_credentials, projectId var found bool google_application_credentials, found = os.LookupEnv("GOOGLE_APPLICATION_CREDENTIALS") if !found { - glog.Fatalf("need to specific GOOGLE_APPLICATION_CREDENTIALS env variable or google_application_credentials in filer.toml") + log.Fatalf("need to specific GOOGLE_APPLICATION_CREDENTIALS env variable or google_application_credentials in filer.toml") } } client, err := pubsub.NewClient(ctx, projectId, option.WithCredentialsFile(google_application_credentials)) if err != nil { - glog.Fatalf("Failed to create client: %v", err) + log.Fatalf("Failed to create client: %v", err) } k.topicName = topicName @@ -60,11 +60,11 @@ func (k *GooglePubSubInput) initialize(google_application_credentials, projectId if !exists { topic, err = client.CreateTopic(ctx, topicName) if err != nil { - glog.Fatalf("Failed to create topic %s: %v", topicName, err) + log.Fatalf("Failed to create topic %s: %v", topicName, err) } } } else { - glog.Fatalf("Failed to check topic %s: %v", topicName, err) + log.Fatalf("Failed to check topic %s: %v", topicName, err) } subscriptionName := "seaweedfs_sub" @@ -74,11 +74,11 @@ func (k *GooglePubSubInput) initialize(google_application_credentials, projectId if !exists { k.sub, err = client.CreateSubscription(ctx, subscriptionName, pubsub.SubscriptionConfig{Topic: topic}) if err != nil { - glog.Fatalf("Failed to create subscription %s: %v", subscriptionName, err) + log.Fatalf("Failed to create subscription %s: %v", subscriptionName, err) } } } else { - glog.Fatalf("Failed to check subscription %s: %v", topicName, err) + log.Fatalf("Failed to check subscription %s: %v", topicName, err) } k.messageChan = make(chan *pubsub.Message, 1) diff --git a/weed/replication/sub/notification_kafka.go b/weed/replication/sub/notification_kafka.go index fa9cfad9b..2b570ab30 100644 --- a/weed/replication/sub/notification_kafka.go +++ b/weed/replication/sub/notification_kafka.go @@ -8,7 +8,7 @@ import ( "time" "github.com/Shopify/sarama" - "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util/log" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" "github.com/golang/protobuf/proto" @@ -29,8 +29,8 @@ func (k *KafkaInput) GetName() string { } func (k *KafkaInput) Initialize(configuration util.Configuration, prefix string) error { - glog.V(0).Infof("replication.notification.kafka.hosts: %v\n", configuration.GetStringSlice(prefix+"hosts")) - glog.V(0).Infof("replication.notification.kafka.topic: %v\n", configuration.GetString(prefix+"topic")) + log.Infof("replication.notification.kafka.hosts: %v\n", configuration.GetStringSlice(prefix+"hosts")) + log.Infof("replication.notification.kafka.topic: %v\n", configuration.GetString(prefix+"topic")) return k.initialize( configuration.GetStringSlice(prefix+"hosts"), configuration.GetString(prefix+"topic"), @@ -46,7 +46,7 @@ func (k *KafkaInput) initialize(hosts []string, topic string, offsetFile string, if err != nil { panic(err) } else { - glog.V(0).Infof("connected to %v", hosts) + log.Infof("connected to %v", hosts) } k.topic = topic @@ -87,7 +87,7 @@ func (k *KafkaInput) initialize(hosts []string, topic string, offsetFile string, case msg := <-partitionConsumer.Messages(): k.messageChan <- msg if err := progress.setOffset(msg.Partition, msg.Offset); err != nil { - glog.Warningf("set kafka offset: %v", err) + log.Warnf("set kafka offset: %v", err) } } } @@ -121,12 +121,12 @@ func loadProgress(offsetFile string) *KafkaProgress { progress := &KafkaProgress{} data, err := ioutil.ReadFile(offsetFile) if err != nil { - glog.Warningf("failed to read kafka progress file: %s", offsetFile) + log.Warnf("failed to read kafka progress file: %s", offsetFile) return nil } err = json.Unmarshal(data, progress) if err != nil { - glog.Warningf("failed to read kafka progress message: %s", string(data)) + log.Warnf("failed to read kafka progress message: %s", string(data)) return nil } return progress |
