aboutsummaryrefslogtreecommitdiff
path: root/weed/replication/sink
diff options
context:
space:
mode:
Diffstat (limited to 'weed/replication/sink')
-rw-r--r--weed/replication/sink/azuresink/azure_sink.go4
-rw-r--r--weed/replication/sink/filersink/fetch_write.go10
-rw-r--r--weed/replication/sink/filersink/filer_sink.go28
-rw-r--r--weed/replication/sink/gcssink/gcs_sink.go6
-rw-r--r--weed/replication/sink/s3sink/s3_sink.go10
-rw-r--r--weed/replication/sink/s3sink/s3_write.go34
6 files changed, 46 insertions, 46 deletions
diff --git a/weed/replication/sink/azuresink/azure_sink.go b/weed/replication/sink/azuresink/azure_sink.go
index df70be64b..ff2325685 100644
--- a/weed/replication/sink/azuresink/azure_sink.go
+++ b/weed/replication/sink/azuresink/azure_sink.go
@@ -10,7 +10,7 @@ import (
"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util/log"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/replication/sink"
"github.com/chrislusf/seaweedfs/weed/replication/source"
@@ -56,7 +56,7 @@ func (g *AzureSink) initialize(accountName, accountKey, container, dir string) e
// Use your Storage account's name and key to create a credential object.
credential, err := azblob.NewSharedKeyCredential(accountName, accountKey)
if err != nil {
- glog.Fatalf("failed to create Azure credential with account name:%s key:%s", accountName, accountKey)
+ log.Fatalf("failed to create Azure credential with account name:%s key:%s", accountName, accountKey)
}
// Create a request pipeline that is used to process HTTP(S) requests and responses.
diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go
index d193ff81c..f26b4876c 100644
--- a/weed/replication/sink/filersink/fetch_write.go
+++ b/weed/replication/sink/filersink/fetch_write.go
@@ -8,7 +8,7 @@ import (
"google.golang.org/grpc"
- "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util/log"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
@@ -82,7 +82,7 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string)
resp, err := client.AssignVolume(context.Background(), request)
if err != nil {
- glog.V(0).Infof("assign volume failure %v: %v", request, err)
+ log.Infof("assign volume failure %v: %v", request, err)
return err
}
if resp.Error != "" {
@@ -98,16 +98,16 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string)
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
- glog.V(4).Infof("replicating %s to %s header:%+v", filename, fileUrl, header)
+ log.Tracef("replicating %s to %s header:%+v", filename, fileUrl, header)
// fetch data as is, regardless whether it is encrypted or not
uploadResult, err, _ := operation.Upload(fileUrl, filename, false, resp.Body, "gzip" == header.Get("Content-Encoding"), header.Get("Content-Type"), nil, auth)
if err != nil {
- glog.V(0).Infof("upload source data %v to %s: %v", sourceChunk.GetFileIdString(), fileUrl, err)
+ log.Infof("upload source data %v to %s: %v", sourceChunk.GetFileIdString(), fileUrl, err)
return "", fmt.Errorf("upload data: %v", err)
}
if uploadResult.Error != "" {
- glog.V(0).Infof("upload failure %v to %s: %v", filename, fileUrl, err)
+ log.Infof("upload failure %v to %s: %v", filename, fileUrl, err)
return "", fmt.Errorf("upload result: %v", uploadResult.Error)
}
diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go
index 6f467ea58..948d5913c 100644
--- a/weed/replication/sink/filersink/filer_sink.go
+++ b/weed/replication/sink/filersink/filer_sink.go
@@ -9,7 +9,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util/log"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/replication/sink"
"github.com/chrislusf/seaweedfs/weed/replication/source"
@@ -68,10 +68,10 @@ func (fs *FilerSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bo
dir, name := util.FullPath(key).DirAndName()
- glog.V(4).Infof("delete entry: %v", key)
+ log.Tracef("delete entry: %v", key)
err := filer_pb.Remove(fs, dir, name, deleteIncludeChunks, true, true, true, signatures)
if err != nil {
- glog.V(0).Infof("delete entry %s: %v", key, err)
+ log.Infof("delete entry %s: %v", key, err)
return fmt.Errorf("delete entry %s: %v", key, err)
}
return nil
@@ -88,10 +88,10 @@ func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [
Directory: dir,
Name: name,
}
- glog.V(1).Infof("lookup: %v", lookupRequest)
+ log.Debugf("lookup: %v", lookupRequest)
if resp, err := filer_pb.LookupEntry(client, lookupRequest); err == nil {
if filer.ETag(resp.Entry) == filer.ETag(entry) {
- glog.V(3).Infof("already replicated %s", key)
+ log.Tracef("already replicated %s", key)
return nil
}
}
@@ -100,10 +100,10 @@ func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [
if err != nil {
// only warning here since the source chunk may have been deleted already
- glog.Warningf("replicate entry chunks %s: %v", key, err)
+ log.Warnf("replicate entry chunks %s: %v", key, err)
}
- glog.V(4).Infof("replicated %s %+v ===> %+v", key, entry.Chunks, replicatedChunks)
+ log.Tracef("replicated %s %+v ===> %+v", key, entry.Chunks, replicatedChunks)
request := &filer_pb.CreateEntryRequest{
Directory: dir,
@@ -117,9 +117,9 @@ func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [
Signatures: signatures,
}
- glog.V(3).Infof("create: %v", request)
+ log.Tracef("create: %v", request)
if err := filer_pb.CreateEntry(client, request); err != nil {
- glog.V(0).Infof("create entry %s: %v", key, err)
+ log.Infof("create entry %s: %v", key, err)
return fmt.Errorf("create entry %s: %v", key, err)
}
@@ -140,10 +140,10 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent
Name: name,
}
- glog.V(4).Infof("lookup entry: %v", request)
+ log.Tracef("lookup entry: %v", request)
resp, err := filer_pb.LookupEntry(client, request)
if err != nil {
- glog.V(0).Infof("lookup %s: %v", key, err)
+ log.Infof("lookup %s: %v", key, err)
return err
}
@@ -156,16 +156,16 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent
return false, fmt.Errorf("lookup %s: %v", key, err)
}
- glog.V(4).Infof("oldEntry %+v, newEntry %+v, existingEntry: %+v", oldEntry, newEntry, existingEntry)
+ log.Tracef("oldEntry %+v, newEntry %+v, existingEntry: %+v", oldEntry, newEntry, existingEntry)
if existingEntry.Attributes.Mtime > newEntry.Attributes.Mtime {
// skip if already changed
// this usually happens when the messages are not ordered
- glog.V(2).Infof("late updates %s", key)
+ log.Debugf("late updates %s", key)
} else if filer.ETag(newEntry) == filer.ETag(existingEntry) {
// skip if no change
// this usually happens when retrying the replication
- glog.V(3).Infof("already replicated %s", key)
+ log.Tracef("already replicated %s", key)
} else {
// find out what changed
deletedChunks, newChunks, err := compareChunks(filer.LookupFn(fs), oldEntry, newEntry)
diff --git a/weed/replication/sink/gcssink/gcs_sink.go b/weed/replication/sink/gcssink/gcs_sink.go
index badabc32c..80feb2cbb 100644
--- a/weed/replication/sink/gcssink/gcs_sink.go
+++ b/weed/replication/sink/gcssink/gcs_sink.go
@@ -10,7 +10,7 @@ import (
"google.golang.org/api/option"
"github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util/log"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/replication/sink"
"github.com/chrislusf/seaweedfs/weed/replication/source"
@@ -57,12 +57,12 @@ func (g *GcsSink) initialize(google_application_credentials, bucketName, dir str
var found bool
google_application_credentials, found = os.LookupEnv("GOOGLE_APPLICATION_CREDENTIALS")
if !found {
- glog.Fatalf("need to specific GOOGLE_APPLICATION_CREDENTIALS env variable or google_application_credentials in replication.toml")
+ log.Fatalf("need to specific GOOGLE_APPLICATION_CREDENTIALS env variable or google_application_credentials in replication.toml")
}
}
client, err := storage.NewClient(context.Background(), option.WithCredentialsFile(google_application_credentials))
if err != nil {
- glog.Fatalf("Failed to create client: %v", err)
+ log.Fatalf("Failed to create client: %v", err)
}
g.client = client
diff --git a/weed/replication/sink/s3sink/s3_sink.go b/weed/replication/sink/s3sink/s3_sink.go
index 58432ee6b..71da89df3 100644
--- a/weed/replication/sink/s3sink/s3_sink.go
+++ b/weed/replication/sink/s3sink/s3_sink.go
@@ -13,7 +13,7 @@ import (
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util/log"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/replication/sink"
"github.com/chrislusf/seaweedfs/weed/replication/source"
@@ -42,10 +42,10 @@ func (s3sink *S3Sink) GetSinkToDirectory() string {
}
func (s3sink *S3Sink) Initialize(configuration util.Configuration, prefix string) error {
- glog.V(0).Infof("sink.s3.region: %v", configuration.GetString(prefix+"region"))
- glog.V(0).Infof("sink.s3.bucket: %v", configuration.GetString(prefix+"bucket"))
- glog.V(0).Infof("sink.s3.directory: %v", configuration.GetString(prefix+"directory"))
- glog.V(0).Infof("sink.s3.endpoint: %v", configuration.GetString(prefix+"endpoint"))
+ log.Infof("sink.s3.region: %v", configuration.GetString(prefix+"region"))
+ log.Infof("sink.s3.bucket: %v", configuration.GetString(prefix+"bucket"))
+ log.Infof("sink.s3.directory: %v", configuration.GetString(prefix+"directory"))
+ log.Infof("sink.s3.endpoint: %v", configuration.GetString(prefix+"endpoint"))
return s3sink.initialize(
configuration.GetString(prefix+"aws_access_key_id"),
configuration.GetString(prefix+"aws_secret_access_key"),
diff --git a/weed/replication/sink/s3sink/s3_write.go b/weed/replication/sink/s3sink/s3_write.go
index b172ea2c3..51d6ae1b5 100644
--- a/weed/replication/sink/s3sink/s3_write.go
+++ b/weed/replication/sink/s3sink/s3_write.go
@@ -10,7 +10,7 @@ import (
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util/log"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -24,9 +24,9 @@ func (s3sink *S3Sink) deleteObject(key string) error {
result, err := s3sink.conn.DeleteObject(input)
if err == nil {
- glog.V(0).Infof("[%s] delete %s: %v", s3sink.bucket, key, result)
+ log.Infof("[%s] delete %s: %v", s3sink.bucket, key, result)
} else {
- glog.Errorf("[%s] delete %s: %v", s3sink.bucket, key, err)
+ log.Errorf("[%s] delete %s: %v", s3sink.bucket, key, err)
}
return err
@@ -43,9 +43,9 @@ func (s3sink *S3Sink) createMultipartUpload(key string, entry *filer_pb.Entry) (
result, err := s3sink.conn.CreateMultipartUpload(input)
if err == nil {
- glog.V(0).Infof("[%s] createMultipartUpload %s: %v", s3sink.bucket, key, result)
+ log.Infof("[%s] createMultipartUpload %s: %v", s3sink.bucket, key, result)
} else {
- glog.Errorf("[%s] createMultipartUpload %s: %v", s3sink.bucket, key, err)
+ log.Errorf("[%s] createMultipartUpload %s: %v", s3sink.bucket, key, err)
return "", err
}
@@ -64,19 +64,19 @@ func (s3sink *S3Sink) abortMultipartUpload(key, uploadId string) error {
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
case s3.ErrCodeNoSuchUpload:
- glog.Errorf("[%s] abortMultipartUpload %s: %v %v", s3sink.bucket, key, s3.ErrCodeNoSuchUpload, aerr.Error())
+ log.Errorf("[%s] abortMultipartUpload %s: %v %v", s3sink.bucket, key, s3.ErrCodeNoSuchUpload, aerr.Error())
default:
- glog.Errorf("[%s] abortMultipartUpload %s: %v", s3sink.bucket, key, aerr.Error())
+ log.Errorf("[%s] abortMultipartUpload %s: %v", s3sink.bucket, key, aerr.Error())
}
} else {
// Print the error, cast err to awserr.Error to get the Code and
// Message from an error.
- glog.Errorf("[%s] abortMultipartUpload %s: %v", s3sink.bucket, key, aerr.Error())
+ log.Errorf("[%s] abortMultipartUpload %s: %v", s3sink.bucket, key, aerr.Error())
}
return err
}
- glog.V(0).Infof("[%s] abortMultipartUpload %s: %v", s3sink.bucket, key, result)
+ log.Infof("[%s] abortMultipartUpload %s: %v", s3sink.bucket, key, result)
return nil
}
@@ -94,9 +94,9 @@ func (s3sink *S3Sink) completeMultipartUpload(ctx context.Context, key, uploadId
result, err := s3sink.conn.CompleteMultipartUpload(input)
if err == nil {
- glog.V(0).Infof("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, result)
+ log.Infof("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, result)
} else {
- glog.Errorf("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, err)
+ log.Errorf("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, err)
}
return err
@@ -108,7 +108,7 @@ func (s3sink *S3Sink) uploadPart(key, uploadId string, partId int, chunk *filer.
readSeeker, err := s3sink.buildReadSeeker(chunk)
if err != nil {
- glog.Errorf("[%s] uploadPart %s %d read: %v", s3sink.bucket, key, partId, err)
+ log.Errorf("[%s] uploadPart %s %d read: %v", s3sink.bucket, key, partId, err)
return nil, fmt.Errorf("[%s] uploadPart %s %d read: %v", s3sink.bucket, key, partId, err)
}
@@ -122,9 +122,9 @@ func (s3sink *S3Sink) uploadPart(key, uploadId string, partId int, chunk *filer.
result, err := s3sink.conn.UploadPart(input)
if err == nil {
- glog.V(0).Infof("[%s] uploadPart %s %d upload: %v", s3sink.bucket, key, partId, result)
+ log.Infof("[%s] uploadPart %s %d upload: %v", s3sink.bucket, key, partId, result)
} else {
- glog.Errorf("[%s] uploadPart %s %d upload: %v", s3sink.bucket, key, partId, err)
+ log.Errorf("[%s] uploadPart %s %d upload: %v", s3sink.bucket, key, partId, err)
}
part := &s3.CompletedPart{
@@ -148,9 +148,9 @@ func (s3sink *S3Sink) uploadPartCopy(key, uploadId string, partId int64, copySou
result, err := s3sink.conn.UploadPartCopy(input)
if err == nil {
- glog.V(0).Infof("[%s] uploadPartCopy %s %d: %v", s3sink.bucket, key, partId, result)
+ log.Infof("[%s] uploadPartCopy %s %d: %v", s3sink.bucket, key, partId, result)
} else {
- glog.Errorf("[%s] uploadPartCopy %s %d: %v", s3sink.bucket, key, partId, err)
+ log.Errorf("[%s] uploadPartCopy %s %d: %v", s3sink.bucket, key, partId, err)
}
return err
@@ -165,7 +165,7 @@ func (s3sink *S3Sink) buildReadSeeker(chunk *filer.ChunkView) (io.ReadSeeker, er
for _, fileUrl := range fileUrls {
_, err = util.ReadUrl(fileUrl+"?readDeleted=true", nil, false, false, chunk.Offset, int(chunk.Size), buf)
if err != nil {
- glog.V(1).Infof("read from %s: %v", fileUrl, err)
+ log.Debugf("read from %s: %v", fileUrl, err)
} else {
break
}