aboutsummaryrefslogtreecommitdiff
path: root/weed/replication
diff options
context:
space:
mode:
Diffstat (limited to 'weed/replication')
-rw-r--r--weed/replication/repl_util/replication_util.go6
-rw-r--r--weed/replication/replicator.go18
-rw-r--r--weed/replication/sink/azuresink/azure_sink.go6
-rw-r--r--weed/replication/sink/filersink/fetch_write.go10
-rw-r--r--weed/replication/sink/filersink/filer_sink.go30
-rw-r--r--weed/replication/sink/gcssink/gcs_sink.go6
-rw-r--r--weed/replication/sink/localsink/local_sink.go16
-rw-r--r--weed/replication/sink/s3sink/s3_sink.go34
-rw-r--r--weed/replication/source/filer_source.go8
-rw-r--r--weed/replication/sub/notification_aws_sqs.go8
-rw-r--r--weed/replication/sub/notification_gocdk_pub_sub.go26
-rw-r--r--weed/replication/sub/notification_google_pub_sub.go18
-rw-r--r--weed/replication/sub/notification_kafka.go14
13 files changed, 100 insertions, 100 deletions
diff --git a/weed/replication/repl_util/replication_util.go b/weed/replication/repl_util/replication_util.go
index 4a77fd04a..c3d970f69 100644
--- a/weed/replication/repl_util/replication_util.go
+++ b/weed/replication/repl_util/replication_util.go
@@ -2,7 +2,7 @@ package repl_util
import (
"github.com/seaweedfs/seaweedfs/weed/filer"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/replication/source"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
@@ -25,9 +25,9 @@ func CopyFromChunkViews(chunkViews *filer.IntervalList[*filer.ChunkView], filerS
writeErr = writeFunc(data)
})
if err != nil {
- glog.V(1).Infof("read from %s: %v", fileUrl, err)
+ log.V(2).Infof("read from %s: %v", fileUrl, err)
} else if writeErr != nil {
- glog.V(1).Infof("copy from %s: %v", fileUrl, writeErr)
+ log.V(2).Infof("copy from %s: %v", fileUrl, writeErr)
} else {
break
}
diff --git a/weed/replication/replicator.go b/weed/replication/replicator.go
index 57aa63e5f..b8595098f 100644
--- a/weed/replication/replicator.go
+++ b/weed/replication/replicator.go
@@ -8,7 +8,7 @@ import (
"strings"
"time"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/replication/sink"
"github.com/seaweedfs/seaweedfs/weed/replication/source"
@@ -40,12 +40,12 @@ func (r *Replicator) Replicate(ctx context.Context, key string, message *filer_p
return nil
}
if !strings.HasPrefix(key, r.source.Dir) {
- glog.V(4).Infof("skipping %v outside of %v", key, r.source.Dir)
+ log.V(-1).Infof("skipping %v outside of %v", key, r.source.Dir)
return nil
}
for _, excludeDir := range r.excludeDirs {
if strings.HasPrefix(key, excludeDir) {
- glog.V(4).Infof("skipping %v of exclude dir %v", key, excludeDir)
+ log.V(-1).Infof("skipping %v of exclude dir %v", key, excludeDir)
return nil
}
}
@@ -61,24 +61,24 @@ func (r *Replicator) Replicate(ctx context.Context, key string, message *filer_p
dateKey = time.Unix(mTime, 0).Format("2006-01-02")
}
newKey := util.Join(r.sink.GetSinkToDirectory(), dateKey, key[len(r.source.Dir):])
- glog.V(3).Infof("replicate %s => %s", key, newKey)
+ log.V(0).Infof("replicate %s => %s", key, newKey)
key = newKey
if message.OldEntry != nil && message.NewEntry == nil {
- glog.V(4).Infof("deleting %v", key)
+ log.V(-1).Infof("deleting %v", key)
return r.sink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks, message.Signatures)
}
if message.OldEntry == nil && message.NewEntry != nil {
- glog.V(4).Infof("creating %v", key)
+ log.V(-1).Infof("creating %v", key)
return r.sink.CreateEntry(key, message.NewEntry, message.Signatures)
}
if message.OldEntry == nil && message.NewEntry == nil {
- glog.V(0).Infof("weird message %+v", message)
+ log.V(3).Infof("weird message %+v", message)
return nil
}
foundExisting, err := r.sink.UpdateEntry(key, message.OldEntry, message.NewParentPath, message.NewEntry, message.DeleteChunks, message.Signatures)
if foundExisting {
- glog.V(4).Infof("updated %v", key)
+ log.V(-1).Infof("updated %v", key)
return err
}
@@ -87,7 +87,7 @@ func (r *Replicator) Replicate(ctx context.Context, key string, message *filer_p
return fmt.Errorf("delete old entry %v: %v", key, err)
}
- glog.V(4).Infof("creating missing %v", key)
+ log.V(-1).Infof("creating missing %v", key)
return r.sink.CreateEntry(key, message.NewEntry, message.Signatures)
}
diff --git a/weed/replication/sink/azuresink/azure_sink.go b/weed/replication/sink/azuresink/azure_sink.go
index fb2f9ff82..26908804d 100644
--- a/weed/replication/sink/azuresink/azure_sink.go
+++ b/weed/replication/sink/azuresink/azure_sink.go
@@ -12,7 +12,7 @@ import (
"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/seaweedfs/seaweedfs/weed/filer"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/replication/sink"
"github.com/seaweedfs/seaweedfs/weed/replication/source"
@@ -64,7 +64,7 @@ func (g *AzureSink) initialize(accountName, accountKey, container, dir string) e
// Use your Storage account's name and key to create a credential object.
credential, err := azblob.NewSharedKeyCredential(accountName, accountKey)
if err != nil {
- glog.Fatalf("failed to create Azure credential with account name:%s: %v", accountName, err)
+ log.Fatalf("failed to create Azure credential with account name:%s: %v", accountName, err)
}
// Create a request pipeline that is used to process HTTP(S) requests and responses.
@@ -118,7 +118,7 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []
res, err := appendBlobURL.Create(context.Background(), azblob.BlobHTTPHeaders{}, azblob.Metadata{}, accessCondition, azblob.BlobTagsMap{}, azblob.ClientProvidedKeyOptions{}, azblob.ImmutabilityPolicyOptions{})
if res != nil && res.StatusCode() == http.StatusPreconditionFailed {
- glog.V(0).Infof("skip overwriting %s/%s: %v", g.container, key, err)
+ log.V(3).Infof("skip overwriting %s/%s: %v", g.container, key, err)
return nil
}
if err != nil {
diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go
index 4bcbc7898..a9a422c9b 100644
--- a/weed/replication/sink/filersink/fetch_write.go
+++ b/weed/replication/sink/filersink/fetch_write.go
@@ -10,7 +10,7 @@ import (
"google.golang.org/grpc"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
@@ -93,7 +93,7 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string)
uploader, err := operation.NewUploader()
if err != nil {
- glog.V(0).Infof("upload source data %v: %v", sourceChunk.GetFileIdString(), err)
+ log.V(3).Infof("upload source data %v: %v", sourceChunk.GetFileIdString(), err)
return "", fmt.Errorf("upload data: %v", err)
}
@@ -120,18 +120,18 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string)
if fs.writeChunkByFiler {
fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", fs.address, fileId)
}
- glog.V(4).Infof("replicating %s to %s header:%+v", filename, fileUrl, header)
+ log.V(-1).Infof("replicating %s to %s header:%+v", filename, fileUrl, header)
return fileUrl
},
resp.Body,
)
if err != nil {
- glog.V(0).Infof("upload source data %v: %v", sourceChunk.GetFileIdString(), err)
+ log.V(3).Infof("upload source data %v: %v", sourceChunk.GetFileIdString(), err)
return "", fmt.Errorf("upload data: %v", err)
}
if uploadResult.Error != "" {
- glog.V(0).Infof("upload failure %v: %v", filename, err)
+ log.V(3).Infof("upload failure %v: %v", filename, err)
return "", fmt.Errorf("upload result: %v", uploadResult.Error)
}
diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go
index 49f6877a0..6e5d52327 100644
--- a/weed/replication/sink/filersink/filer_sink.go
+++ b/weed/replication/sink/filersink/filer_sink.go
@@ -12,7 +12,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/filer"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/replication/sink"
"github.com/seaweedfs/seaweedfs/weed/replication/source"
@@ -94,10 +94,10 @@ func (fs *FilerSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bo
dir, name := util.FullPath(key).DirAndName()
- glog.V(4).Infof("delete entry: %v", key)
+ log.V(-1).Infof("delete entry: %v", key)
err := filer_pb.Remove(fs, dir, name, deleteIncludeChunks, true, true, true, signatures)
if err != nil {
- glog.V(0).Infof("delete entry %s: %v", key, err)
+ log.V(3).Infof("delete entry %s: %v", key, err)
return fmt.Errorf("delete entry %s: %v", key, err)
}
return nil
@@ -114,14 +114,14 @@ func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [
Directory: dir,
Name: name,
}
- // glog.V(1).Infof("lookup: %v", lookupRequest)
+ // log.V(2).Infof("lookup: %v", lookupRequest)
if resp, err := filer_pb.LookupEntry(client, lookupRequest); err == nil {
if filer.ETag(resp.Entry) == filer.ETag(entry) {
- glog.V(3).Infof("already replicated %s", key)
+ log.V(0).Infof("already replicated %s", key)
return nil
}
if resp.Entry.Attributes != nil && resp.Entry.Attributes.Mtime >= entry.Attributes.Mtime {
- glog.V(3).Infof("skip overwriting %s", key)
+ log.V(0).Infof("skip overwriting %s", key)
return nil
}
}
@@ -130,11 +130,11 @@ func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [
if err != nil {
// only warning here since the source chunk may have been deleted already
- glog.Warningf("replicate entry chunks %s: %v", key, err)
+ log.Warningf("replicate entry chunks %s: %v", key, err)
return nil
}
- // glog.V(4).Infof("replicated %s %+v ===> %+v", key, entry.GetChunks(), replicatedChunks)
+ // log.V(-1).Infof("replicated %s %+v ===> %+v", key, entry.GetChunks(), replicatedChunks)
request := &filer_pb.CreateEntryRequest{
Directory: dir,
@@ -151,9 +151,9 @@ func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [
Signatures: signatures,
}
- glog.V(3).Infof("create: %v", request)
+ log.V(0).Infof("create: %v", request)
if err := filer_pb.CreateEntry(client, request); err != nil {
- glog.V(0).Infof("create entry %s: %v", key, err)
+ log.V(3).Infof("create entry %s: %v", key, err)
return fmt.Errorf("create entry %s: %v", key, err)
}
@@ -174,10 +174,10 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent
Name: name,
}
- glog.V(4).Infof("lookup entry: %v", request)
+ log.V(-1).Infof("lookup entry: %v", request)
resp, err := filer_pb.LookupEntry(client, request)
if err != nil {
- glog.V(0).Infof("lookup %s: %v", key, err)
+ log.V(3).Infof("lookup %s: %v", key, err)
return err
}
@@ -190,12 +190,12 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent
return false, fmt.Errorf("lookup %s: %v", key, err)
}
- glog.V(4).Infof("oldEntry %+v, newEntry %+v, existingEntry: %+v", oldEntry, newEntry, existingEntry)
+ log.V(-1).Infof("oldEntry %+v, newEntry %+v, existingEntry: %+v", oldEntry, newEntry, existingEntry)
if existingEntry.Attributes.Mtime > newEntry.Attributes.Mtime {
// skip if already changed
// this usually happens when the messages are not ordered
- glog.V(2).Infof("late updates %s", key)
+ log.V(1).Infof("late updates %s", key)
} else {
// find out what changed
deletedChunks, newChunks, err := compareChunks(filer.LookupFn(fs), oldEntry, newEntry)
@@ -212,7 +212,7 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent
// replicate the chunks that are new in the source
replicatedChunks, err := fs.replicateChunks(newChunks, key)
if err != nil {
- glog.Warningf("replicate entry chunks %s: %v", key, err)
+ log.Warningf("replicate entry chunks %s: %v", key, err)
return true, nil
}
existingEntry.Chunks = append(existingEntry.GetChunks(), replicatedChunks...)
diff --git a/weed/replication/sink/gcssink/gcs_sink.go b/weed/replication/sink/gcssink/gcs_sink.go
index db6ea4aec..f820ae9a6 100644
--- a/weed/replication/sink/gcssink/gcs_sink.go
+++ b/weed/replication/sink/gcssink/gcs_sink.go
@@ -10,7 +10,7 @@ import (
"google.golang.org/api/option"
"github.com/seaweedfs/seaweedfs/weed/filer"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/replication/sink"
"github.com/seaweedfs/seaweedfs/weed/replication/source"
@@ -63,12 +63,12 @@ func (g *GcsSink) initialize(google_application_credentials, bucketName, dir str
var found bool
google_application_credentials, found = os.LookupEnv("GOOGLE_APPLICATION_CREDENTIALS")
if !found {
- glog.Fatalf("need to specific GOOGLE_APPLICATION_CREDENTIALS env variable or google_application_credentials in replication.toml")
+ log.Fatalf("need to specific GOOGLE_APPLICATION_CREDENTIALS env variable or google_application_credentials in replication.toml")
}
}
client, err := storage.NewClient(context.Background(), option.WithCredentialsFile(google_application_credentials))
if err != nil {
- glog.Fatalf("Failed to create client: %v", err)
+ log.Fatalf("Failed to create client: %v", err)
}
g.client = client
diff --git a/weed/replication/sink/localsink/local_sink.go b/weed/replication/sink/localsink/local_sink.go
index c6dddb80a..fd6745b5d 100644
--- a/weed/replication/sink/localsink/local_sink.go
+++ b/weed/replication/sink/localsink/local_sink.go
@@ -2,7 +2,7 @@ package localsink
import (
"github.com/seaweedfs/seaweedfs/weed/filer"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/replication/repl_util"
"github.com/seaweedfs/seaweedfs/weed/replication/sink"
@@ -45,7 +45,7 @@ func (localsink *LocalSink) initialize(dir string, isIncremental bool) error {
func (localsink *LocalSink) Initialize(configuration util.Configuration, prefix string) error {
dir := configuration.GetString(prefix + "directory")
isIncremental := configuration.GetBool(prefix + "is_incremental")
- glog.V(4).Infof("sink.local.directory: %v", dir)
+ log.V(-1).Infof("sink.local.directory: %v", dir)
return localsink.initialize(dir, isIncremental)
}
@@ -61,9 +61,9 @@ func (localsink *LocalSink) DeleteEntry(key string, isDirectory, deleteIncludeCh
if localsink.isMultiPartEntry(key) {
return nil
}
- glog.V(4).Infof("Delete Entry key: %s", key)
+ log.V(-1).Infof("Delete Entry key: %s", key)
if err := os.Remove(key); err != nil {
- glog.V(0).Infof("remove entry key %s: %s", key, err)
+ log.V(3).Infof("remove entry key %s: %s", key, err)
}
return nil
}
@@ -72,7 +72,7 @@ func (localsink *LocalSink) CreateEntry(key string, entry *filer_pb.Entry, signa
if entry.IsDirectory || localsink.isMultiPartEntry(key) {
return nil
}
- glog.V(4).Infof("Create Entry key: %s", key)
+ log.V(-1).Infof("Create Entry key: %s", key)
totalSize := filer.FileSize(entry)
chunkViews := filer.ViewFromChunks(localsink.filerSource.LookupFileId, entry.GetChunks(), 0, int64(totalSize))
@@ -80,7 +80,7 @@ func (localsink *LocalSink) CreateEntry(key string, entry *filer_pb.Entry, signa
dir := filepath.Dir(key)
if _, err := os.Stat(dir); os.IsNotExist(err) {
- glog.V(4).Infof("Create Directory key: %s", dir)
+ log.V(-1).Infof("Create Directory key: %s", dir)
if err = os.MkdirAll(dir, 0755); err != nil {
return err
}
@@ -102,7 +102,7 @@ func (localsink *LocalSink) CreateEntry(key string, entry *filer_pb.Entry, signa
return err
}
if fi.Mode() != mode {
- glog.V(4).Infof("Modify file mode: %o -> %o", fi.Mode(), mode)
+ log.V(-1).Infof("Modify file mode: %o -> %o", fi.Mode(), mode)
if err := dstFile.Chmod(mode); err != nil {
return err
}
@@ -128,7 +128,7 @@ func (localsink *LocalSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, ne
if localsink.isMultiPartEntry(key) {
return true, nil
}
- glog.V(4).Infof("Update Entry key: %s", key)
+ log.V(-1).Infof("Update Entry key: %s", key)
// do delete and create
foundExistingEntry = util.FileExists(key)
err = localsink.CreateEntry(key, newEntry, signatures)
diff --git a/weed/replication/sink/s3sink/s3_sink.go b/weed/replication/sink/s3sink/s3_sink.go
index 279108e16..9e74cf9c1 100644
--- a/weed/replication/sink/s3sink/s3_sink.go
+++ b/weed/replication/sink/s3sink/s3_sink.go
@@ -14,7 +14,7 @@ import (
"strings"
"github.com/seaweedfs/seaweedfs/weed/filer"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/replication/sink"
"github.com/seaweedfs/seaweedfs/weed/replication/source"
@@ -76,24 +76,24 @@ func (s3sink *S3Sink) Initialize(configuration util.Configuration, prefix string
s3sink.uploaderPartSizeMb = configuration.GetInt(prefix + "uploader_part_size")
s3sink.uploaderConcurrency = configuration.GetInt(prefix + "uploader_concurrency")
- glog.V(0).Infof("sink.s3.region: %v", s3sink.region)
- glog.V(0).Infof("sink.s3.bucket: %v", s3sink.bucket)
- glog.V(0).Infof("sink.s3.directory: %v", s3sink.dir)
- glog.V(0).Infof("sink.s3.endpoint: %v", s3sink.endpoint)
- glog.V(0).Infof("sink.s3.acl: %v", s3sink.acl)
- glog.V(0).Infof("sink.s3.is_incremental: %v", s3sink.isIncremental)
- glog.V(0).Infof("sink.s3.s3_disable_content_md5_validation: %v", s3sink.s3DisableContentMD5Validation)
- glog.V(0).Infof("sink.s3.s3_force_path_style: %v", s3sink.s3ForcePathStyle)
- glog.V(0).Infof("sink.s3.keep_part_size: %v", s3sink.keepPartSize)
+ log.V(3).Infof("sink.s3.region: %v", s3sink.region)
+ log.V(3).Infof("sink.s3.bucket: %v", s3sink.bucket)
+ log.V(3).Infof("sink.s3.directory: %v", s3sink.dir)
+ log.V(3).Infof("sink.s3.endpoint: %v", s3sink.endpoint)
+ log.V(3).Infof("sink.s3.acl: %v", s3sink.acl)
+ log.V(3).Infof("sink.s3.is_incremental: %v", s3sink.isIncremental)
+ log.V(3).Infof("sink.s3.s3_disable_content_md5_validation: %v", s3sink.s3DisableContentMD5Validation)
+ log.V(3).Infof("sink.s3.s3_force_path_style: %v", s3sink.s3ForcePathStyle)
+ log.V(3).Infof("sink.s3.keep_part_size: %v", s3sink.keepPartSize)
if s3sink.uploaderMaxUploadParts > s3manager.MaxUploadParts {
s3sink.uploaderMaxUploadParts = s3manager.MaxUploadParts
- glog.Warningf("uploader_max_upload_parts is greater than the maximum number of parts allowed when uploading multiple parts to Amazon S3")
- glog.V(0).Infof("sink.s3.uploader_max_upload_parts: %v => %v", s3sink.uploaderMaxUploadParts, s3manager.MaxUploadParts)
+ log.Warningf("uploader_max_upload_parts is greater than the maximum number of parts allowed when uploading multiple parts to Amazon S3")
+ log.V(3).Infof("sink.s3.uploader_max_upload_parts: %v => %v", s3sink.uploaderMaxUploadParts, s3manager.MaxUploadParts)
} else {
- glog.V(0).Infof("sink.s3.uploader_max_upload_parts: %v", s3sink.uploaderMaxUploadParts)
+ log.V(3).Infof("sink.s3.uploader_max_upload_parts: %v", s3sink.uploaderMaxUploadParts)
}
- glog.V(0).Infof("sink.s3.uploader_part_size_mb: %v", s3sink.uploaderPartSizeMb)
- glog.V(0).Infof("sink.s3.uploader_concurrency: %v", s3sink.uploaderConcurrency)
+ log.V(3).Infof("sink.s3.uploader_part_size_mb: %v", s3sink.uploaderPartSizeMb)
+ log.V(3).Infof("sink.s3.uploader_concurrency: %v", s3sink.uploaderConcurrency)
return s3sink.initialize(
configuration.GetString(prefix+"aws_access_key_id"),
@@ -141,9 +141,9 @@ func (s3sink *S3Sink) DeleteEntry(key string, isDirectory, deleteIncludeChunks b
result, err := s3sink.conn.DeleteObject(input)
if err == nil {
- glog.V(2).Infof("[%s] delete %s: %v", s3sink.bucket, key, result)
+ log.V(1).Infof("[%s] delete %s: %v", s3sink.bucket, key, result)
} else {
- glog.Errorf("[%s] delete %s: %v", s3sink.bucket, key, err)
+ log.Errorf("[%s] delete %s: %v", s3sink.bucket, key, err)
}
return err
diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go
index 768e251a4..f06b56fec 100644
--- a/weed/replication/source/filer_source.go
+++ b/weed/replication/source/filer_source.go
@@ -12,7 +12,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/security"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
@@ -76,14 +76,14 @@ func (fs *FilerSource) LookupFileId(part string) (fileUrls []string, err error)
})
if err != nil {
- glog.V(1).Infof("LookupFileId volume id %s: %v", vid, err)
+ log.V(2).Infof("LookupFileId volume id %s: %v", vid, err)
return nil, fmt.Errorf("LookupFileId volume id %s: %v", vid, err)
}
locations := vid2Locations[vid]
if locations == nil || len(locations.Locations) == 0 {
- glog.V(1).Infof("LookupFileId locate volume id %s: %v", vid, err)
+ log.V(2).Infof("LookupFileId locate volume id %s: %v", vid, err)
return nil, fmt.Errorf("LookupFileId locate volume id %s: %v", vid, err)
}
@@ -118,7 +118,7 @@ func (fs *FilerSource) ReadPart(fileId string) (filename string, header http.Hea
for _, fileUrl := range fileUrls {
filename, header, resp, err = util_http.DownloadFile(fileUrl, "")
if err != nil {
- glog.V(1).Infof("fail to read from %s: %v", fileUrl, err)
+ log.V(2).Infof("fail to read from %s: %v", fileUrl, err)
} else {
break
}
diff --git a/weed/replication/sub/notification_aws_sqs.go b/weed/replication/sub/notification_aws_sqs.go
index 7fc5c3f46..0456961ae 100644
--- a/weed/replication/sub/notification_aws_sqs.go
+++ b/weed/replication/sub/notification_aws_sqs.go
@@ -8,7 +8,7 @@ import (
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"google.golang.org/protobuf/proto"
@@ -28,8 +28,8 @@ func (k *AwsSqsInput) GetName() string {
}
func (k *AwsSqsInput) Initialize(configuration util.Configuration, prefix string) error {
- glog.V(0).Infof("replication.notification.aws_sqs.region: %v", configuration.GetString(prefix+"region"))
- glog.V(0).Infof("replication.notification.aws_sqs.sqs_queue_name: %v", configuration.GetString(prefix+"sqs_queue_name"))
+ log.V(3).Infof("replication.notification.aws_sqs.region: %v", configuration.GetString(prefix+"region"))
+ log.V(3).Infof("replication.notification.aws_sqs.sqs_queue_name: %v", configuration.GetString(prefix+"sqs_queue_name"))
return k.initialize(
configuration.GetString(prefix+"aws_access_key_id"),
configuration.GetString(prefix+"aws_secret_access_key"),
@@ -110,7 +110,7 @@ func (k *AwsSqsInput) ReceiveMessage() (key string, message *filer_pb.EventNotif
})
if err != nil {
- glog.V(1).Infof("delete message from sqs %s: %v", k.queueUrl, err)
+ log.V(2).Infof("delete message from sqs %s: %v", k.queueUrl, err)
}
return
diff --git a/weed/replication/sub/notification_gocdk_pub_sub.go b/weed/replication/sub/notification_gocdk_pub_sub.go
index 2e7640af4..bfeb0ebb8 100644
--- a/weed/replication/sub/notification_gocdk_pub_sub.go
+++ b/weed/replication/sub/notification_gocdk_pub_sub.go
@@ -6,7 +6,7 @@ package sub
import (
"context"
amqp "github.com/rabbitmq/amqp091-go"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"gocloud.dev/pubsub"
@@ -41,38 +41,38 @@ func QueueDeclareAndBind(conn *amqp.Connection, exchangeUrl string, queueUrl str
queueNameDLX := "DLX." + queueName
ch, err := conn.Channel()
if err != nil {
- glog.Error(err)
+ log.Error(err)
return err
}
defer ch.Close()
if err := ch.ExchangeDeclare(
exchangeNameDLX, "fanout", true, false, false, false, nil); err != nil {
- glog.Error(err)
+ log.Error(err)
return err
}
if err := ch.ExchangeDeclare(
exchangeName, "fanout", true, false, false, false, nil); err != nil {
- glog.Error(err)
+ log.Error(err)
return err
}
if _, err := ch.QueueDeclare(
queueName, true, false, false, false,
amqp.Table{"x-dead-letter-exchange": exchangeNameDLX}); err != nil {
- glog.Error(err)
+ log.Error(err)
return err
}
if err := ch.QueueBind(queueName, "", exchangeName, false, nil); err != nil {
- glog.Error(err)
+ log.Error(err)
return err
}
if _, err := ch.QueueDeclare(
queueNameDLX, true, false, false, false,
amqp.Table{"x-dead-letter-exchange": exchangeName, "x-message-ttl": 600000}); err != nil {
- glog.Error(err)
+ log.Error(err)
return err
}
if err := ch.QueueBind(queueNameDLX, "", exchangeNameDLX, false, nil); err != nil {
- glog.Error(err)
+ log.Error(err)
return err
}
return nil
@@ -90,7 +90,7 @@ func (k *GoCDKPubSubInput) GetName() string {
func (k *GoCDKPubSubInput) Initialize(configuration util.Configuration, prefix string) error {
topicUrl := configuration.GetString(prefix + "topic_url")
k.subURL = configuration.GetString(prefix + "sub_url")
- glog.V(0).Infof("notification.gocdk_pub_sub.sub_url: %v", k.subURL)
+ log.V(3).Infof("notification.gocdk_pub_sub.sub_url: %v", k.subURL)
sub, err := pubsub.OpenSubscription(context.Background(), k.subURL)
if err != nil {
return err
@@ -127,7 +127,7 @@ func (k *GoCDKPubSubInput) ReceiveMessage() (key string, message *filer_pb.Event
k.sub.Shutdown(ctx)
conn, err = amqp.Dial(os.Getenv("RABBIT_SERVER_URL"))
if err != nil {
- glog.Error(err)
+ log.Error(err)
time.Sleep(time.Second)
return
}
@@ -135,7 +135,7 @@ func (k *GoCDKPubSubInput) ReceiveMessage() (key string, message *filer_pb.Event
return
}
// This is permanent cached sub err
- glog.Fatal(err)
+ log.Fatal(err)
}
onFailureFn = func() {
if msg.Nackable() {
@@ -143,11 +143,11 @@ func (k *GoCDKPubSubInput) ReceiveMessage() (key string, message *filer_pb.Event
var delivery amqp.Delivery
if msg.As(&delivery) {
isRedelivered = delivery.Redelivered
- glog.Warningf("onFailureFn() metadata: %+v, redelivered: %v", msg.Metadata, delivery.Redelivered)
+ log.Warningf("onFailureFn() metadata: %+v, redelivered: %v", msg.Metadata, delivery.Redelivered)
}
if isRedelivered {
if err := delivery.Nack(false, false); err != nil {
- glog.Error(err)
+ log.Error(err)
}
} else {
msg.Nack()
diff --git a/weed/replication/sub/notification_google_pub_sub.go b/weed/replication/sub/notification_google_pub_sub.go
index c7509abf2..f431a2c1d 100644
--- a/weed/replication/sub/notification_google_pub_sub.go
+++ b/weed/replication/sub/notification_google_pub_sub.go
@@ -6,7 +6,7 @@ import (
"os"
"cloud.google.com/go/pubsub"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"google.golang.org/api/option"
@@ -28,8 +28,8 @@ func (k *GooglePubSubInput) GetName() string {
}
func (k *GooglePubSubInput) Initialize(configuration util.Configuration, prefix string) error {
- glog.V(0).Infof("notification.google_pub_sub.project_id: %v", configuration.GetString(prefix+"project_id"))
- glog.V(0).Infof("notification.google_pub_sub.topic: %v", configuration.GetString(prefix+"topic"))
+ log.V(3).Infof("notification.google_pub_sub.project_id: %v", configuration.GetString(prefix+"project_id"))
+ log.V(3).Infof("notification.google_pub_sub.topic: %v", configuration.GetString(prefix+"topic"))
return k.initialize(
configuration.GetString(prefix+"google_application_credentials"),
configuration.GetString(prefix+"project_id"),
@@ -45,13 +45,13 @@ func (k *GooglePubSubInput) initialize(google_application_credentials, projectId
var found bool
google_application_credentials, found = os.LookupEnv("GOOGLE_APPLICATION_CREDENTIALS")
if !found {
- glog.Fatalf("need to specific GOOGLE_APPLICATION_CREDENTIALS env variable or google_application_credentials in filer.toml")
+ log.Fatalf("need to specific GOOGLE_APPLICATION_CREDENTIALS env variable or google_application_credentials in filer.toml")
}
}
client, err := pubsub.NewClient(ctx, projectId, option.WithCredentialsFile(google_application_credentials))
if err != nil {
- glog.Fatalf("Failed to create client: %v", err)
+ log.Fatalf("Failed to create client: %v", err)
}
k.topicName = topicName
@@ -60,11 +60,11 @@ func (k *GooglePubSubInput) initialize(google_application_credentials, projectId
if !exists {
topic, err = client.CreateTopic(ctx, topicName)
if err != nil {
- glog.Fatalf("Failed to create topic %s: %v", topicName, err)
+ log.Fatalf("Failed to create topic %s: %v", topicName, err)
}
}
} else {
- glog.Fatalf("Failed to check topic %s: %v", topicName, err)
+ log.Fatalf("Failed to check topic %s: %v", topicName, err)
}
subscriptionName := "seaweedfs_sub"
@@ -74,11 +74,11 @@ func (k *GooglePubSubInput) initialize(google_application_credentials, projectId
if !exists {
k.sub, err = client.CreateSubscription(ctx, subscriptionName, pubsub.SubscriptionConfig{Topic: topic})
if err != nil {
- glog.Fatalf("Failed to create subscription %s: %v", subscriptionName, err)
+ log.Fatalf("Failed to create subscription %s: %v", subscriptionName, err)
}
}
} else {
- glog.Fatalf("Failed to check subscription %s: %v", topicName, err)
+ log.Fatalf("Failed to check subscription %s: %v", topicName, err)
}
k.messageChan = make(chan *pubsub.Message, 1)
diff --git a/weed/replication/sub/notification_kafka.go b/weed/replication/sub/notification_kafka.go
index 92f7ce609..4738b5612 100644
--- a/weed/replication/sub/notification_kafka.go
+++ b/weed/replication/sub/notification_kafka.go
@@ -8,7 +8,7 @@ import (
"time"
"github.com/Shopify/sarama"
- "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"google.golang.org/protobuf/proto"
@@ -29,8 +29,8 @@ func (k *KafkaInput) GetName() string {
}
func (k *KafkaInput) Initialize(configuration util.Configuration, prefix string) error {
- glog.V(0).Infof("replication.notification.kafka.hosts: %v\n", configuration.GetStringSlice(prefix+"hosts"))
- glog.V(0).Infof("replication.notification.kafka.topic: %v\n", configuration.GetString(prefix+"topic"))
+ log.V(3).Infof("replication.notification.kafka.hosts: %v\n", configuration.GetStringSlice(prefix+"hosts"))
+ log.V(3).Infof("replication.notification.kafka.topic: %v\n", configuration.GetString(prefix+"topic"))
return k.initialize(
configuration.GetStringSlice(prefix+"hosts"),
configuration.GetString(prefix+"topic"),
@@ -46,7 +46,7 @@ func (k *KafkaInput) initialize(hosts []string, topic string, offsetFile string,
if err != nil {
panic(err)
} else {
- glog.V(0).Infof("connected to %v", hosts)
+ log.V(3).Infof("connected to %v", hosts)
}
k.topic = topic
@@ -87,7 +87,7 @@ func (k *KafkaInput) initialize(hosts []string, topic string, offsetFile string,
case msg := <-partitionConsumer.Messages():
k.messageChan <- msg
if err := progress.setOffset(msg.Partition, msg.Offset); err != nil {
- glog.Warningf("set kafka offset: %v", err)
+ log.Warningf("set kafka offset: %v", err)
}
}
}
@@ -121,12 +121,12 @@ func loadProgress(offsetFile string) *KafkaProgress {
progress := &KafkaProgress{}
data, err := os.ReadFile(offsetFile)
if err != nil {
- glog.Warningf("failed to read kafka progress file: %s", offsetFile)
+ log.Warningf("failed to read kafka progress file: %s", offsetFile)
return nil
}
err = json.Unmarshal(data, progress)
if err != nil {
- glog.Warningf("failed to read kafka progress message: %s", string(data))
+ log.Warningf("failed to read kafka progress message: %s", string(data))
return nil
}
return progress