diff options
Diffstat (limited to 'weed/replication/sink')
| -rw-r--r-- | weed/replication/sink/azuresink/azure_sink.go | 4 | ||||
| -rw-r--r-- | weed/replication/sink/filersink/fetch_write.go | 10 | ||||
| -rw-r--r-- | weed/replication/sink/filersink/filer_sink.go | 28 | ||||
| -rw-r--r-- | weed/replication/sink/gcssink/gcs_sink.go | 6 | ||||
| -rw-r--r-- | weed/replication/sink/s3sink/s3_sink.go | 10 | ||||
| -rw-r--r-- | weed/replication/sink/s3sink/s3_write.go | 34 |
6 files changed, 46 insertions, 46 deletions
diff --git a/weed/replication/sink/azuresink/azure_sink.go b/weed/replication/sink/azuresink/azure_sink.go index df70be64b..ff2325685 100644 --- a/weed/replication/sink/azuresink/azure_sink.go +++ b/weed/replication/sink/azuresink/azure_sink.go @@ -10,7 +10,7 @@ import ( "github.com/Azure/azure-storage-blob-go/azblob" "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util/log" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/replication/sink" "github.com/chrislusf/seaweedfs/weed/replication/source" @@ -56,7 +56,7 @@ func (g *AzureSink) initialize(accountName, accountKey, container, dir string) e // Use your Storage account's name and key to create a credential object. credential, err := azblob.NewSharedKeyCredential(accountName, accountKey) if err != nil { - glog.Fatalf("failed to create Azure credential with account name:%s key:%s", accountName, accountKey) + log.Fatalf("failed to create Azure credential with account name:%s key:%s", accountName, accountKey) } // Create a request pipeline that is used to process HTTP(S) requests and responses. diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go index d193ff81c..f26b4876c 100644 --- a/weed/replication/sink/filersink/fetch_write.go +++ b/weed/replication/sink/filersink/fetch_write.go @@ -8,7 +8,7 @@ import ( "google.golang.org/grpc" - "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util/log" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" @@ -82,7 +82,7 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string) resp, err := client.AssignVolume(context.Background(), request) if err != nil { - glog.V(0).Infof("assign volume failure %v: %v", request, err) + log.Infof("assign volume failure %v: %v", request, err) return err } if resp.Error != "" { @@ -98,16 +98,16 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string) fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) - glog.V(4).Infof("replicating %s to %s header:%+v", filename, fileUrl, header) + log.Tracef("replicating %s to %s header:%+v", filename, fileUrl, header) // fetch data as is, regardless whether it is encrypted or not uploadResult, err, _ := operation.Upload(fileUrl, filename, false, resp.Body, "gzip" == header.Get("Content-Encoding"), header.Get("Content-Type"), nil, auth) if err != nil { - glog.V(0).Infof("upload source data %v to %s: %v", sourceChunk.GetFileIdString(), fileUrl, err) + log.Infof("upload source data %v to %s: %v", sourceChunk.GetFileIdString(), fileUrl, err) return "", fmt.Errorf("upload data: %v", err) } if uploadResult.Error != "" { - glog.V(0).Infof("upload failure %v to %s: %v", filename, fileUrl, err) + log.Infof("upload failure %v to %s: %v", filename, fileUrl, err) return "", fmt.Errorf("upload result: %v", uploadResult.Error) } diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go index 6f467ea58..948d5913c 100644 --- a/weed/replication/sink/filersink/filer_sink.go +++ b/weed/replication/sink/filersink/filer_sink.go @@ -9,7 +9,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util/log" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/replication/sink" "github.com/chrislusf/seaweedfs/weed/replication/source" @@ -68,10 +68,10 @@ func (fs *FilerSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bo dir, name := util.FullPath(key).DirAndName() - glog.V(4).Infof("delete entry: %v", key) + log.Tracef("delete entry: %v", key) err := filer_pb.Remove(fs, dir, name, deleteIncludeChunks, true, true, true, signatures) if err != nil { - glog.V(0).Infof("delete entry %s: %v", key, err) + log.Infof("delete entry %s: %v", key, err) return fmt.Errorf("delete entry %s: %v", key, err) } return nil @@ -88,10 +88,10 @@ func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [ Directory: dir, Name: name, } - glog.V(1).Infof("lookup: %v", lookupRequest) + log.Debugf("lookup: %v", lookupRequest) if resp, err := filer_pb.LookupEntry(client, lookupRequest); err == nil { if filer.ETag(resp.Entry) == filer.ETag(entry) { - glog.V(3).Infof("already replicated %s", key) + log.Tracef("already replicated %s", key) return nil } } @@ -100,10 +100,10 @@ func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [ if err != nil { // only warning here since the source chunk may have been deleted already - glog.Warningf("replicate entry chunks %s: %v", key, err) + log.Warnf("replicate entry chunks %s: %v", key, err) } - glog.V(4).Infof("replicated %s %+v ===> %+v", key, entry.Chunks, replicatedChunks) + log.Tracef("replicated %s %+v ===> %+v", key, entry.Chunks, replicatedChunks) request := &filer_pb.CreateEntryRequest{ Directory: dir, @@ -117,9 +117,9 @@ func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [ Signatures: signatures, } - glog.V(3).Infof("create: %v", request) + log.Tracef("create: %v", request) if err := filer_pb.CreateEntry(client, request); err != nil { - glog.V(0).Infof("create entry %s: %v", key, err) + log.Infof("create entry %s: %v", key, err) return fmt.Errorf("create entry %s: %v", key, err) } @@ -140,10 +140,10 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent Name: name, } - glog.V(4).Infof("lookup entry: %v", request) + log.Tracef("lookup entry: %v", request) resp, err := filer_pb.LookupEntry(client, request) if err != nil { - glog.V(0).Infof("lookup %s: %v", key, err) + log.Infof("lookup %s: %v", key, err) return err } @@ -156,16 +156,16 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent return false, fmt.Errorf("lookup %s: %v", key, err) } - glog.V(4).Infof("oldEntry %+v, newEntry %+v, existingEntry: %+v", oldEntry, newEntry, existingEntry) + log.Tracef("oldEntry %+v, newEntry %+v, existingEntry: %+v", oldEntry, newEntry, existingEntry) if existingEntry.Attributes.Mtime > newEntry.Attributes.Mtime { // skip if already changed // this usually happens when the messages are not ordered - glog.V(2).Infof("late updates %s", key) + log.Debugf("late updates %s", key) } else if filer.ETag(newEntry) == filer.ETag(existingEntry) { // skip if no change // this usually happens when retrying the replication - glog.V(3).Infof("already replicated %s", key) + log.Tracef("already replicated %s", key) } else { // find out what changed deletedChunks, newChunks, err := compareChunks(filer.LookupFn(fs), oldEntry, newEntry) diff --git a/weed/replication/sink/gcssink/gcs_sink.go b/weed/replication/sink/gcssink/gcs_sink.go index badabc32c..80feb2cbb 100644 --- a/weed/replication/sink/gcssink/gcs_sink.go +++ b/weed/replication/sink/gcssink/gcs_sink.go @@ -10,7 +10,7 @@ import ( "google.golang.org/api/option" "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util/log" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/replication/sink" "github.com/chrislusf/seaweedfs/weed/replication/source" @@ -57,12 +57,12 @@ func (g *GcsSink) initialize(google_application_credentials, bucketName, dir str var found bool google_application_credentials, found = os.LookupEnv("GOOGLE_APPLICATION_CREDENTIALS") if !found { - glog.Fatalf("need to specific GOOGLE_APPLICATION_CREDENTIALS env variable or google_application_credentials in replication.toml") + log.Fatalf("need to specific GOOGLE_APPLICATION_CREDENTIALS env variable or google_application_credentials in replication.toml") } } client, err := storage.NewClient(context.Background(), option.WithCredentialsFile(google_application_credentials)) if err != nil { - glog.Fatalf("Failed to create client: %v", err) + log.Fatalf("Failed to create client: %v", err) } g.client = client diff --git a/weed/replication/sink/s3sink/s3_sink.go b/weed/replication/sink/s3sink/s3_sink.go index 58432ee6b..71da89df3 100644 --- a/weed/replication/sink/s3sink/s3_sink.go +++ b/weed/replication/sink/s3sink/s3_sink.go @@ -13,7 +13,7 @@ import ( "github.com/aws/aws-sdk-go/service/s3/s3iface" "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util/log" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/replication/sink" "github.com/chrislusf/seaweedfs/weed/replication/source" @@ -42,10 +42,10 @@ func (s3sink *S3Sink) GetSinkToDirectory() string { } func (s3sink *S3Sink) Initialize(configuration util.Configuration, prefix string) error { - glog.V(0).Infof("sink.s3.region: %v", configuration.GetString(prefix+"region")) - glog.V(0).Infof("sink.s3.bucket: %v", configuration.GetString(prefix+"bucket")) - glog.V(0).Infof("sink.s3.directory: %v", configuration.GetString(prefix+"directory")) - glog.V(0).Infof("sink.s3.endpoint: %v", configuration.GetString(prefix+"endpoint")) + log.Infof("sink.s3.region: %v", configuration.GetString(prefix+"region")) + log.Infof("sink.s3.bucket: %v", configuration.GetString(prefix+"bucket")) + log.Infof("sink.s3.directory: %v", configuration.GetString(prefix+"directory")) + log.Infof("sink.s3.endpoint: %v", configuration.GetString(prefix+"endpoint")) return s3sink.initialize( configuration.GetString(prefix+"aws_access_key_id"), configuration.GetString(prefix+"aws_secret_access_key"), diff --git a/weed/replication/sink/s3sink/s3_write.go b/weed/replication/sink/s3sink/s3_write.go index b172ea2c3..51d6ae1b5 100644 --- a/weed/replication/sink/s3sink/s3_write.go +++ b/weed/replication/sink/s3sink/s3_write.go @@ -10,7 +10,7 @@ import ( "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/service/s3" "github.com/chrislusf/seaweedfs/weed/filer" - "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util/log" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -24,9 +24,9 @@ func (s3sink *S3Sink) deleteObject(key string) error { result, err := s3sink.conn.DeleteObject(input) if err == nil { - glog.V(0).Infof("[%s] delete %s: %v", s3sink.bucket, key, result) + log.Infof("[%s] delete %s: %v", s3sink.bucket, key, result) } else { - glog.Errorf("[%s] delete %s: %v", s3sink.bucket, key, err) + log.Errorf("[%s] delete %s: %v", s3sink.bucket, key, err) } return err @@ -43,9 +43,9 @@ func (s3sink *S3Sink) createMultipartUpload(key string, entry *filer_pb.Entry) ( result, err := s3sink.conn.CreateMultipartUpload(input) if err == nil { - glog.V(0).Infof("[%s] createMultipartUpload %s: %v", s3sink.bucket, key, result) + log.Infof("[%s] createMultipartUpload %s: %v", s3sink.bucket, key, result) } else { - glog.Errorf("[%s] createMultipartUpload %s: %v", s3sink.bucket, key, err) + log.Errorf("[%s] createMultipartUpload %s: %v", s3sink.bucket, key, err) return "", err } @@ -64,19 +64,19 @@ func (s3sink *S3Sink) abortMultipartUpload(key, uploadId string) error { if aerr, ok := err.(awserr.Error); ok { switch aerr.Code() { case s3.ErrCodeNoSuchUpload: - glog.Errorf("[%s] abortMultipartUpload %s: %v %v", s3sink.bucket, key, s3.ErrCodeNoSuchUpload, aerr.Error()) + log.Errorf("[%s] abortMultipartUpload %s: %v %v", s3sink.bucket, key, s3.ErrCodeNoSuchUpload, aerr.Error()) default: - glog.Errorf("[%s] abortMultipartUpload %s: %v", s3sink.bucket, key, aerr.Error()) + log.Errorf("[%s] abortMultipartUpload %s: %v", s3sink.bucket, key, aerr.Error()) } } else { // Print the error, cast err to awserr.Error to get the Code and // Message from an error. - glog.Errorf("[%s] abortMultipartUpload %s: %v", s3sink.bucket, key, aerr.Error()) + log.Errorf("[%s] abortMultipartUpload %s: %v", s3sink.bucket, key, aerr.Error()) } return err } - glog.V(0).Infof("[%s] abortMultipartUpload %s: %v", s3sink.bucket, key, result) + log.Infof("[%s] abortMultipartUpload %s: %v", s3sink.bucket, key, result) return nil } @@ -94,9 +94,9 @@ func (s3sink *S3Sink) completeMultipartUpload(ctx context.Context, key, uploadId result, err := s3sink.conn.CompleteMultipartUpload(input) if err == nil { - glog.V(0).Infof("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, result) + log.Infof("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, result) } else { - glog.Errorf("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, err) + log.Errorf("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, err) } return err @@ -108,7 +108,7 @@ func (s3sink *S3Sink) uploadPart(key, uploadId string, partId int, chunk *filer. readSeeker, err := s3sink.buildReadSeeker(chunk) if err != nil { - glog.Errorf("[%s] uploadPart %s %d read: %v", s3sink.bucket, key, partId, err) + log.Errorf("[%s] uploadPart %s %d read: %v", s3sink.bucket, key, partId, err) return nil, fmt.Errorf("[%s] uploadPart %s %d read: %v", s3sink.bucket, key, partId, err) } @@ -122,9 +122,9 @@ func (s3sink *S3Sink) uploadPart(key, uploadId string, partId int, chunk *filer. result, err := s3sink.conn.UploadPart(input) if err == nil { - glog.V(0).Infof("[%s] uploadPart %s %d upload: %v", s3sink.bucket, key, partId, result) + log.Infof("[%s] uploadPart %s %d upload: %v", s3sink.bucket, key, partId, result) } else { - glog.Errorf("[%s] uploadPart %s %d upload: %v", s3sink.bucket, key, partId, err) + log.Errorf("[%s] uploadPart %s %d upload: %v", s3sink.bucket, key, partId, err) } part := &s3.CompletedPart{ @@ -148,9 +148,9 @@ func (s3sink *S3Sink) uploadPartCopy(key, uploadId string, partId int64, copySou result, err := s3sink.conn.UploadPartCopy(input) if err == nil { - glog.V(0).Infof("[%s] uploadPartCopy %s %d: %v", s3sink.bucket, key, partId, result) + log.Infof("[%s] uploadPartCopy %s %d: %v", s3sink.bucket, key, partId, result) } else { - glog.Errorf("[%s] uploadPartCopy %s %d: %v", s3sink.bucket, key, partId, err) + log.Errorf("[%s] uploadPartCopy %s %d: %v", s3sink.bucket, key, partId, err) } return err @@ -165,7 +165,7 @@ func (s3sink *S3Sink) buildReadSeeker(chunk *filer.ChunkView) (io.ReadSeeker, er for _, fileUrl := range fileUrls { _, err = util.ReadUrl(fileUrl+"?readDeleted=true", nil, false, false, chunk.Offset, int(chunk.Size), buf) if err != nil { - glog.V(1).Infof("read from %s: %v", fileUrl, err) + log.Debugf("read from %s: %v", fileUrl, err) } else { break } |
