aboutsummaryrefslogtreecommitdiff
path: root/weed/replication
diff options
context:
space:
mode:
Diffstat (limited to 'weed/replication')
-rw-r--r--weed/replication/repl_util/replication_utli.go6
-rw-r--r--weed/replication/replicator.go16
-rw-r--r--weed/replication/sink/azuresink/azure_sink.go4
-rw-r--r--weed/replication/sink/filersink/fetch_write.go10
-rw-r--r--weed/replication/sink/filersink/filer_sink.go28
-rw-r--r--weed/replication/sink/gcssink/gcs_sink.go6
-rw-r--r--weed/replication/sink/s3sink/s3_sink.go10
-rw-r--r--weed/replication/sink/s3sink/s3_write.go34
-rw-r--r--weed/replication/source/filer_source.go10
-rw-r--r--weed/replication/sub/notification_aws_sqs.go8
-rw-r--r--weed/replication/sub/notification_gocdk_pub_sub.go4
-rw-r--r--weed/replication/sub/notification_google_pub_sub.go18
-rw-r--r--weed/replication/sub/notification_kafka.go14
13 files changed, 84 insertions, 84 deletions
diff --git a/weed/replication/repl_util/replication_utli.go b/weed/replication/repl_util/replication_utli.go
index 42777f4ad..cc4c5d806 100644
--- a/weed/replication/repl_util/replication_utli.go
+++ b/weed/replication/repl_util/replication_utli.go
@@ -2,7 +2,7 @@ package repl_util
import (
"github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util/log"
"github.com/chrislusf/seaweedfs/weed/replication/source"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -23,9 +23,9 @@ func CopyFromChunkViews(chunkViews []*filer.ChunkView, filerSource *source.Filer
writeErr = writeFunc(data)
})
if err != nil {
- glog.V(1).Infof("read from %s: %v", fileUrl, err)
+ log.Debugf("read from %s: %v", fileUrl, err)
} else if writeErr != nil {
- glog.V(1).Infof("copy from %s: %v", fileUrl, writeErr)
+ log.Debugf("copy from %s: %v", fileUrl, writeErr)
} else {
break
}
diff --git a/weed/replication/replicator.go b/weed/replication/replicator.go
index c4228434f..5e8d47d86 100644
--- a/weed/replication/replicator.go
+++ b/weed/replication/replicator.go
@@ -7,7 +7,7 @@ import (
"google.golang.org/grpc"
"strings"
- "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util/log"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/replication/sink"
"github.com/chrislusf/seaweedfs/weed/replication/source"
@@ -37,28 +37,28 @@ func (r *Replicator) Replicate(ctx context.Context, key string, message *filer_p
return nil
}
if !strings.HasPrefix(key, r.source.Dir) {
- glog.V(4).Infof("skipping %v outside of %v", key, r.source.Dir)
+ log.Tracef("skipping %v outside of %v", key, r.source.Dir)
return nil
}
newKey := util.Join(r.sink.GetSinkToDirectory(), key[len(r.source.Dir):])
- glog.V(3).Infof("replicate %s => %s", key, newKey)
+ log.Tracef("replicate %s => %s", key, newKey)
key = newKey
if message.OldEntry != nil && message.NewEntry == nil {
- glog.V(4).Infof("deleting %v", key)
+ log.Tracef("deleting %v", key)
return r.sink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
}
if message.OldEntry == nil && message.NewEntry != nil {
- glog.V(4).Infof("creating %v", key)
+ log.Tracef("creating %v", key)
return r.sink.CreateEntry(key, message.NewEntry, message.Signatures)
}
if message.OldEntry == nil && message.NewEntry == nil {
- glog.V(0).Infof("weird message %+v", message)
+ log.Infof("weird message %+v", message)
return nil
}
foundExisting, err := r.sink.UpdateEntry(key, message.OldEntry, message.NewParentPath, message.NewEntry, message.DeleteChunks, message.Signatures)
if foundExisting {
- glog.V(4).Infof("updated %v", key)
+ log.Tracef("updated %v", key)
return err
}
@@ -67,7 +67,7 @@ func (r *Replicator) Replicate(ctx context.Context, key string, message *filer_p
return fmt.Errorf("delete old entry %v: %v", key, err)
}
- glog.V(4).Infof("creating missing %v", key)
+ log.Tracef("creating missing %v", key)
return r.sink.CreateEntry(key, message.NewEntry, message.Signatures)
}
diff --git a/weed/replication/sink/azuresink/azure_sink.go b/weed/replication/sink/azuresink/azure_sink.go
index df70be64b..ff2325685 100644
--- a/weed/replication/sink/azuresink/azure_sink.go
+++ b/weed/replication/sink/azuresink/azure_sink.go
@@ -10,7 +10,7 @@ import (
"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util/log"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/replication/sink"
"github.com/chrislusf/seaweedfs/weed/replication/source"
@@ -56,7 +56,7 @@ func (g *AzureSink) initialize(accountName, accountKey, container, dir string) e
// Use your Storage account's name and key to create a credential object.
credential, err := azblob.NewSharedKeyCredential(accountName, accountKey)
if err != nil {
- glog.Fatalf("failed to create Azure credential with account name:%s key:%s", accountName, accountKey)
+ log.Fatalf("failed to create Azure credential with account name:%s key:%s", accountName, accountKey)
}
// Create a request pipeline that is used to process HTTP(S) requests and responses.
diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go
index d193ff81c..f26b4876c 100644
--- a/weed/replication/sink/filersink/fetch_write.go
+++ b/weed/replication/sink/filersink/fetch_write.go
@@ -8,7 +8,7 @@ import (
"google.golang.org/grpc"
- "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util/log"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
@@ -82,7 +82,7 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string)
resp, err := client.AssignVolume(context.Background(), request)
if err != nil {
- glog.V(0).Infof("assign volume failure %v: %v", request, err)
+ log.Infof("assign volume failure %v: %v", request, err)
return err
}
if resp.Error != "" {
@@ -98,16 +98,16 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string)
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
- glog.V(4).Infof("replicating %s to %s header:%+v", filename, fileUrl, header)
+ log.Tracef("replicating %s to %s header:%+v", filename, fileUrl, header)
// fetch data as is, regardless whether it is encrypted or not
uploadResult, err, _ := operation.Upload(fileUrl, filename, false, resp.Body, "gzip" == header.Get("Content-Encoding"), header.Get("Content-Type"), nil, auth)
if err != nil {
- glog.V(0).Infof("upload source data %v to %s: %v", sourceChunk.GetFileIdString(), fileUrl, err)
+ log.Infof("upload source data %v to %s: %v", sourceChunk.GetFileIdString(), fileUrl, err)
return "", fmt.Errorf("upload data: %v", err)
}
if uploadResult.Error != "" {
- glog.V(0).Infof("upload failure %v to %s: %v", filename, fileUrl, err)
+ log.Infof("upload failure %v to %s: %v", filename, fileUrl, err)
return "", fmt.Errorf("upload result: %v", uploadResult.Error)
}
diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go
index 6f467ea58..948d5913c 100644
--- a/weed/replication/sink/filersink/filer_sink.go
+++ b/weed/replication/sink/filersink/filer_sink.go
@@ -9,7 +9,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util/log"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/replication/sink"
"github.com/chrislusf/seaweedfs/weed/replication/source"
@@ -68,10 +68,10 @@ func (fs *FilerSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bo
dir, name := util.FullPath(key).DirAndName()
- glog.V(4).Infof("delete entry: %v", key)
+ log.Tracef("delete entry: %v", key)
err := filer_pb.Remove(fs, dir, name, deleteIncludeChunks, true, true, true, signatures)
if err != nil {
- glog.V(0).Infof("delete entry %s: %v", key, err)
+ log.Infof("delete entry %s: %v", key, err)
return fmt.Errorf("delete entry %s: %v", key, err)
}
return nil
@@ -88,10 +88,10 @@ func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [
Directory: dir,
Name: name,
}
- glog.V(1).Infof("lookup: %v", lookupRequest)
+ log.Debugf("lookup: %v", lookupRequest)
if resp, err := filer_pb.LookupEntry(client, lookupRequest); err == nil {
if filer.ETag(resp.Entry) == filer.ETag(entry) {
- glog.V(3).Infof("already replicated %s", key)
+ log.Tracef("already replicated %s", key)
return nil
}
}
@@ -100,10 +100,10 @@ func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [
if err != nil {
// only warning here since the source chunk may have been deleted already
- glog.Warningf("replicate entry chunks %s: %v", key, err)
+ log.Warnf("replicate entry chunks %s: %v", key, err)
}
- glog.V(4).Infof("replicated %s %+v ===> %+v", key, entry.Chunks, replicatedChunks)
+ log.Tracef("replicated %s %+v ===> %+v", key, entry.Chunks, replicatedChunks)
request := &filer_pb.CreateEntryRequest{
Directory: dir,
@@ -117,9 +117,9 @@ func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [
Signatures: signatures,
}
- glog.V(3).Infof("create: %v", request)
+ log.Tracef("create: %v", request)
if err := filer_pb.CreateEntry(client, request); err != nil {
- glog.V(0).Infof("create entry %s: %v", key, err)
+ log.Infof("create entry %s: %v", key, err)
return fmt.Errorf("create entry %s: %v", key, err)
}
@@ -140,10 +140,10 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent
Name: name,
}
- glog.V(4).Infof("lookup entry: %v", request)
+ log.Tracef("lookup entry: %v", request)
resp, err := filer_pb.LookupEntry(client, request)
if err != nil {
- glog.V(0).Infof("lookup %s: %v", key, err)
+ log.Infof("lookup %s: %v", key, err)
return err
}
@@ -156,16 +156,16 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent
return false, fmt.Errorf("lookup %s: %v", key, err)
}
- glog.V(4).Infof("oldEntry %+v, newEntry %+v, existingEntry: %+v", oldEntry, newEntry, existingEntry)
+ log.Tracef("oldEntry %+v, newEntry %+v, existingEntry: %+v", oldEntry, newEntry, existingEntry)
if existingEntry.Attributes.Mtime > newEntry.Attributes.Mtime {
// skip if already changed
// this usually happens when the messages are not ordered
- glog.V(2).Infof("late updates %s", key)
+ log.Debugf("late updates %s", key)
} else if filer.ETag(newEntry) == filer.ETag(existingEntry) {
// skip if no change
// this usually happens when retrying the replication
- glog.V(3).Infof("already replicated %s", key)
+ log.Tracef("already replicated %s", key)
} else {
// find out what changed
deletedChunks, newChunks, err := compareChunks(filer.LookupFn(fs), oldEntry, newEntry)
diff --git a/weed/replication/sink/gcssink/gcs_sink.go b/weed/replication/sink/gcssink/gcs_sink.go
index badabc32c..80feb2cbb 100644
--- a/weed/replication/sink/gcssink/gcs_sink.go
+++ b/weed/replication/sink/gcssink/gcs_sink.go
@@ -10,7 +10,7 @@ import (
"google.golang.org/api/option"
"github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util/log"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/replication/sink"
"github.com/chrislusf/seaweedfs/weed/replication/source"
@@ -57,12 +57,12 @@ func (g *GcsSink) initialize(google_application_credentials, bucketName, dir str
var found bool
google_application_credentials, found = os.LookupEnv("GOOGLE_APPLICATION_CREDENTIALS")
if !found {
- glog.Fatalf("need to specific GOOGLE_APPLICATION_CREDENTIALS env variable or google_application_credentials in replication.toml")
+ log.Fatalf("need to specific GOOGLE_APPLICATION_CREDENTIALS env variable or google_application_credentials in replication.toml")
}
}
client, err := storage.NewClient(context.Background(), option.WithCredentialsFile(google_application_credentials))
if err != nil {
- glog.Fatalf("Failed to create client: %v", err)
+ log.Fatalf("Failed to create client: %v", err)
}
g.client = client
diff --git a/weed/replication/sink/s3sink/s3_sink.go b/weed/replication/sink/s3sink/s3_sink.go
index 58432ee6b..71da89df3 100644
--- a/weed/replication/sink/s3sink/s3_sink.go
+++ b/weed/replication/sink/s3sink/s3_sink.go
@@ -13,7 +13,7 @@ import (
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util/log"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/replication/sink"
"github.com/chrislusf/seaweedfs/weed/replication/source"
@@ -42,10 +42,10 @@ func (s3sink *S3Sink) GetSinkToDirectory() string {
}
func (s3sink *S3Sink) Initialize(configuration util.Configuration, prefix string) error {
- glog.V(0).Infof("sink.s3.region: %v", configuration.GetString(prefix+"region"))
- glog.V(0).Infof("sink.s3.bucket: %v", configuration.GetString(prefix+"bucket"))
- glog.V(0).Infof("sink.s3.directory: %v", configuration.GetString(prefix+"directory"))
- glog.V(0).Infof("sink.s3.endpoint: %v", configuration.GetString(prefix+"endpoint"))
+ log.Infof("sink.s3.region: %v", configuration.GetString(prefix+"region"))
+ log.Infof("sink.s3.bucket: %v", configuration.GetString(prefix+"bucket"))
+ log.Infof("sink.s3.directory: %v", configuration.GetString(prefix+"directory"))
+ log.Infof("sink.s3.endpoint: %v", configuration.GetString(prefix+"endpoint"))
return s3sink.initialize(
configuration.GetString(prefix+"aws_access_key_id"),
configuration.GetString(prefix+"aws_secret_access_key"),
diff --git a/weed/replication/sink/s3sink/s3_write.go b/weed/replication/sink/s3sink/s3_write.go
index b172ea2c3..51d6ae1b5 100644
--- a/weed/replication/sink/s3sink/s3_write.go
+++ b/weed/replication/sink/s3sink/s3_write.go
@@ -10,7 +10,7 @@ import (
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/chrislusf/seaweedfs/weed/filer"
- "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util/log"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -24,9 +24,9 @@ func (s3sink *S3Sink) deleteObject(key string) error {
result, err := s3sink.conn.DeleteObject(input)
if err == nil {
- glog.V(0).Infof("[%s] delete %s: %v", s3sink.bucket, key, result)
+ log.Infof("[%s] delete %s: %v", s3sink.bucket, key, result)
} else {
- glog.Errorf("[%s] delete %s: %v", s3sink.bucket, key, err)
+ log.Errorf("[%s] delete %s: %v", s3sink.bucket, key, err)
}
return err
@@ -43,9 +43,9 @@ func (s3sink *S3Sink) createMultipartUpload(key string, entry *filer_pb.Entry) (
result, err := s3sink.conn.CreateMultipartUpload(input)
if err == nil {
- glog.V(0).Infof("[%s] createMultipartUpload %s: %v", s3sink.bucket, key, result)
+ log.Infof("[%s] createMultipartUpload %s: %v", s3sink.bucket, key, result)
} else {
- glog.Errorf("[%s] createMultipartUpload %s: %v", s3sink.bucket, key, err)
+ log.Errorf("[%s] createMultipartUpload %s: %v", s3sink.bucket, key, err)
return "", err
}
@@ -64,19 +64,19 @@ func (s3sink *S3Sink) abortMultipartUpload(key, uploadId string) error {
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
case s3.ErrCodeNoSuchUpload:
- glog.Errorf("[%s] abortMultipartUpload %s: %v %v", s3sink.bucket, key, s3.ErrCodeNoSuchUpload, aerr.Error())
+ log.Errorf("[%s] abortMultipartUpload %s: %v %v", s3sink.bucket, key, s3.ErrCodeNoSuchUpload, aerr.Error())
default:
- glog.Errorf("[%s] abortMultipartUpload %s: %v", s3sink.bucket, key, aerr.Error())
+ log.Errorf("[%s] abortMultipartUpload %s: %v", s3sink.bucket, key, aerr.Error())
}
} else {
// Print the error, cast err to awserr.Error to get the Code and
// Message from an error.
- glog.Errorf("[%s] abortMultipartUpload %s: %v", s3sink.bucket, key, aerr.Error())
+ log.Errorf("[%s] abortMultipartUpload %s: %v", s3sink.bucket, key, aerr.Error())
}
return err
}
- glog.V(0).Infof("[%s] abortMultipartUpload %s: %v", s3sink.bucket, key, result)
+ log.Infof("[%s] abortMultipartUpload %s: %v", s3sink.bucket, key, result)
return nil
}
@@ -94,9 +94,9 @@ func (s3sink *S3Sink) completeMultipartUpload(ctx context.Context, key, uploadId
result, err := s3sink.conn.CompleteMultipartUpload(input)
if err == nil {
- glog.V(0).Infof("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, result)
+ log.Infof("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, result)
} else {
- glog.Errorf("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, err)
+ log.Errorf("[%s] completeMultipartUpload %s: %v", s3sink.bucket, key, err)
}
return err
@@ -108,7 +108,7 @@ func (s3sink *S3Sink) uploadPart(key, uploadId string, partId int, chunk *filer.
readSeeker, err := s3sink.buildReadSeeker(chunk)
if err != nil {
- glog.Errorf("[%s] uploadPart %s %d read: %v", s3sink.bucket, key, partId, err)
+ log.Errorf("[%s] uploadPart %s %d read: %v", s3sink.bucket, key, partId, err)
return nil, fmt.Errorf("[%s] uploadPart %s %d read: %v", s3sink.bucket, key, partId, err)
}
@@ -122,9 +122,9 @@ func (s3sink *S3Sink) uploadPart(key, uploadId string, partId int, chunk *filer.
result, err := s3sink.conn.UploadPart(input)
if err == nil {
- glog.V(0).Infof("[%s] uploadPart %s %d upload: %v", s3sink.bucket, key, partId, result)
+ log.Infof("[%s] uploadPart %s %d upload: %v", s3sink.bucket, key, partId, result)
} else {
- glog.Errorf("[%s] uploadPart %s %d upload: %v", s3sink.bucket, key, partId, err)
+ log.Errorf("[%s] uploadPart %s %d upload: %v", s3sink.bucket, key, partId, err)
}
part := &s3.CompletedPart{
@@ -148,9 +148,9 @@ func (s3sink *S3Sink) uploadPartCopy(key, uploadId string, partId int64, copySou
result, err := s3sink.conn.UploadPartCopy(input)
if err == nil {
- glog.V(0).Infof("[%s] uploadPartCopy %s %d: %v", s3sink.bucket, key, partId, result)
+ log.Infof("[%s] uploadPartCopy %s %d: %v", s3sink.bucket, key, partId, result)
} else {
- glog.Errorf("[%s] uploadPartCopy %s %d: %v", s3sink.bucket, key, partId, err)
+ log.Errorf("[%s] uploadPartCopy %s %d: %v", s3sink.bucket, key, partId, err)
}
return err
@@ -165,7 +165,7 @@ func (s3sink *S3Sink) buildReadSeeker(chunk *filer.ChunkView) (io.ReadSeeker, er
for _, fileUrl := range fileUrls {
_, err = util.ReadUrl(fileUrl+"?readDeleted=true", nil, false, false, chunk.Offset, int(chunk.Size), buf)
if err != nil {
- glog.V(1).Infof("read from %s: %v", fileUrl, err)
+ log.Debugf("read from %s: %v", fileUrl, err)
} else {
break
}
diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go
index ff4f2eb26..a54bc99fd 100644
--- a/weed/replication/source/filer_source.go
+++ b/weed/replication/source/filer_source.go
@@ -12,7 +12,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/security"
- "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util/log"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -49,7 +49,7 @@ func (fs *FilerSource) LookupFileId(part string) (fileUrls []string, err error)
err = fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
- glog.V(4).Infof("read lookup volume id locations: %v", vid)
+ log.Tracef("read lookup volume id locations: %v", vid)
resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
VolumeIds: []string{vid},
})
@@ -63,14 +63,14 @@ func (fs *FilerSource) LookupFileId(part string) (fileUrls []string, err error)
})
if err != nil {
- glog.V(1).Infof("LookupFileId volume id %s: %v", vid, err)
+ log.Debugf("LookupFileId volume id %s: %v", vid, err)
return nil, fmt.Errorf("LookupFileId volume id %s: %v", vid, err)
}
locations := vid2Locations[vid]
if locations == nil || len(locations.Locations) == 0 {
- glog.V(1).Infof("LookupFileId locate volume id %s: %v", vid, err)
+ log.Debugf("LookupFileId locate volume id %s: %v", vid, err)
return nil, fmt.Errorf("LookupFileId locate volume id %s: %v", vid, err)
}
@@ -91,7 +91,7 @@ func (fs *FilerSource) ReadPart(part string) (filename string, header http.Heade
for _, fileUrl := range fileUrls {
filename, header, resp, err = util.DownloadFile(fileUrl)
if err != nil {
- glog.V(1).Infof("fail to read from %s: %v", fileUrl, err)
+ log.Debugf("fail to read from %s: %v", fileUrl, err)
} else {
break
}
diff --git a/weed/replication/sub/notification_aws_sqs.go b/weed/replication/sub/notification_aws_sqs.go
index 1dd386ba7..fc114d3c2 100644
--- a/weed/replication/sub/notification_aws_sqs.go
+++ b/weed/replication/sub/notification_aws_sqs.go
@@ -8,7 +8,7 @@ import (
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
- "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util/log"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/golang/protobuf/proto"
@@ -28,8 +28,8 @@ func (k *AwsSqsInput) GetName() string {
}
func (k *AwsSqsInput) Initialize(configuration util.Configuration, prefix string) error {
- glog.V(0).Infof("replication.notification.aws_sqs.region: %v", configuration.GetString(prefix+"region"))
- glog.V(0).Infof("replication.notification.aws_sqs.sqs_queue_name: %v", configuration.GetString(prefix+"sqs_queue_name"))
+ log.Infof("replication.notification.aws_sqs.region: %v", configuration.GetString(prefix+"region"))
+ log.Infof("replication.notification.aws_sqs.sqs_queue_name: %v", configuration.GetString(prefix+"sqs_queue_name"))
return k.initialize(
configuration.GetString(prefix+"aws_access_key_id"),
configuration.GetString(prefix+"aws_secret_access_key"),
@@ -106,7 +106,7 @@ func (k *AwsSqsInput) ReceiveMessage() (key string, message *filer_pb.EventNotif
})
if err != nil {
- glog.V(1).Infof("delete message from sqs %s: %v", k.queueUrl, err)
+ log.Debugf("delete message from sqs %s: %v", k.queueUrl, err)
}
return
diff --git a/weed/replication/sub/notification_gocdk_pub_sub.go b/weed/replication/sub/notification_gocdk_pub_sub.go
index 9726096e5..edbb11bc0 100644
--- a/weed/replication/sub/notification_gocdk_pub_sub.go
+++ b/weed/replication/sub/notification_gocdk_pub_sub.go
@@ -3,7 +3,7 @@ package sub
import (
"context"
- "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util/log"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/golang/protobuf/proto"
@@ -29,7 +29,7 @@ func (k *GoCDKPubSubInput) GetName() string {
func (k *GoCDKPubSubInput) Initialize(configuration util.Configuration, prefix string) error {
subURL := configuration.GetString(prefix + "sub_url")
- glog.V(0).Infof("notification.gocdk_pub_sub.sub_url: %v", subURL)
+ log.Infof("notification.gocdk_pub_sub.sub_url: %v", subURL)
sub, err := pubsub.OpenSubscription(context.Background(), subURL)
if err != nil {
return err
diff --git a/weed/replication/sub/notification_google_pub_sub.go b/weed/replication/sub/notification_google_pub_sub.go
index a950bb42b..c21246606 100644
--- a/weed/replication/sub/notification_google_pub_sub.go
+++ b/weed/replication/sub/notification_google_pub_sub.go
@@ -6,7 +6,7 @@ import (
"os"
"cloud.google.com/go/pubsub"
- "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util/log"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/golang/protobuf/proto"
@@ -28,8 +28,8 @@ func (k *GooglePubSubInput) GetName() string {
}
func (k *GooglePubSubInput) Initialize(configuration util.Configuration, prefix string) error {
- glog.V(0).Infof("notification.google_pub_sub.project_id: %v", configuration.GetString(prefix+"project_id"))
- glog.V(0).Infof("notification.google_pub_sub.topic: %v", configuration.GetString(prefix+"topic"))
+ log.Infof("notification.google_pub_sub.project_id: %v", configuration.GetString(prefix+"project_id"))
+ log.Infof("notification.google_pub_sub.topic: %v", configuration.GetString(prefix+"topic"))
return k.initialize(
configuration.GetString(prefix+"google_application_credentials"),
configuration.GetString(prefix+"project_id"),
@@ -45,13 +45,13 @@ func (k *GooglePubSubInput) initialize(google_application_credentials, projectId
var found bool
google_application_credentials, found = os.LookupEnv("GOOGLE_APPLICATION_CREDENTIALS")
if !found {
- glog.Fatalf("need to specific GOOGLE_APPLICATION_CREDENTIALS env variable or google_application_credentials in filer.toml")
+ log.Fatalf("need to specific GOOGLE_APPLICATION_CREDENTIALS env variable or google_application_credentials in filer.toml")
}
}
client, err := pubsub.NewClient(ctx, projectId, option.WithCredentialsFile(google_application_credentials))
if err != nil {
- glog.Fatalf("Failed to create client: %v", err)
+ log.Fatalf("Failed to create client: %v", err)
}
k.topicName = topicName
@@ -60,11 +60,11 @@ func (k *GooglePubSubInput) initialize(google_application_credentials, projectId
if !exists {
topic, err = client.CreateTopic(ctx, topicName)
if err != nil {
- glog.Fatalf("Failed to create topic %s: %v", topicName, err)
+ log.Fatalf("Failed to create topic %s: %v", topicName, err)
}
}
} else {
- glog.Fatalf("Failed to check topic %s: %v", topicName, err)
+ log.Fatalf("Failed to check topic %s: %v", topicName, err)
}
subscriptionName := "seaweedfs_sub"
@@ -74,11 +74,11 @@ func (k *GooglePubSubInput) initialize(google_application_credentials, projectId
if !exists {
k.sub, err = client.CreateSubscription(ctx, subscriptionName, pubsub.SubscriptionConfig{Topic: topic})
if err != nil {
- glog.Fatalf("Failed to create subscription %s: %v", subscriptionName, err)
+ log.Fatalf("Failed to create subscription %s: %v", subscriptionName, err)
}
}
} else {
- glog.Fatalf("Failed to check subscription %s: %v", topicName, err)
+ log.Fatalf("Failed to check subscription %s: %v", topicName, err)
}
k.messageChan = make(chan *pubsub.Message, 1)
diff --git a/weed/replication/sub/notification_kafka.go b/weed/replication/sub/notification_kafka.go
index fa9cfad9b..2b570ab30 100644
--- a/weed/replication/sub/notification_kafka.go
+++ b/weed/replication/sub/notification_kafka.go
@@ -8,7 +8,7 @@ import (
"time"
"github.com/Shopify/sarama"
- "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util/log"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/golang/protobuf/proto"
@@ -29,8 +29,8 @@ func (k *KafkaInput) GetName() string {
}
func (k *KafkaInput) Initialize(configuration util.Configuration, prefix string) error {
- glog.V(0).Infof("replication.notification.kafka.hosts: %v\n", configuration.GetStringSlice(prefix+"hosts"))
- glog.V(0).Infof("replication.notification.kafka.topic: %v\n", configuration.GetString(prefix+"topic"))
+ log.Infof("replication.notification.kafka.hosts: %v\n", configuration.GetStringSlice(prefix+"hosts"))
+ log.Infof("replication.notification.kafka.topic: %v\n", configuration.GetString(prefix+"topic"))
return k.initialize(
configuration.GetStringSlice(prefix+"hosts"),
configuration.GetString(prefix+"topic"),
@@ -46,7 +46,7 @@ func (k *KafkaInput) initialize(hosts []string, topic string, offsetFile string,
if err != nil {
panic(err)
} else {
- glog.V(0).Infof("connected to %v", hosts)
+ log.Infof("connected to %v", hosts)
}
k.topic = topic
@@ -87,7 +87,7 @@ func (k *KafkaInput) initialize(hosts []string, topic string, offsetFile string,
case msg := <-partitionConsumer.Messages():
k.messageChan <- msg
if err := progress.setOffset(msg.Partition, msg.Offset); err != nil {
- glog.Warningf("set kafka offset: %v", err)
+ log.Warnf("set kafka offset: %v", err)
}
}
}
@@ -121,12 +121,12 @@ func loadProgress(offsetFile string) *KafkaProgress {
progress := &KafkaProgress{}
data, err := ioutil.ReadFile(offsetFile)
if err != nil {
- glog.Warningf("failed to read kafka progress file: %s", offsetFile)
+ log.Warnf("failed to read kafka progress file: %s", offsetFile)
return nil
}
err = json.Unmarshal(data, progress)
if err != nil {
- glog.Warningf("failed to read kafka progress message: %s", string(data))
+ log.Warnf("failed to read kafka progress message: %s", string(data))
return nil
}
return progress