diff options
Diffstat (limited to 'weed/replication/sink')
| -rw-r--r-- | weed/replication/sink/azuresink/azure_sink.go | 6 | ||||
| -rw-r--r-- | weed/replication/sink/filersink/fetch_write.go | 10 | ||||
| -rw-r--r-- | weed/replication/sink/filersink/filer_sink.go | 30 | ||||
| -rw-r--r-- | weed/replication/sink/gcssink/gcs_sink.go | 6 | ||||
| -rw-r--r-- | weed/replication/sink/localsink/local_sink.go | 16 | ||||
| -rw-r--r-- | weed/replication/sink/s3sink/s3_sink.go | 34 |
6 files changed, 51 insertions, 51 deletions
diff --git a/weed/replication/sink/azuresink/azure_sink.go b/weed/replication/sink/azuresink/azure_sink.go index fb2f9ff82..26908804d 100644 --- a/weed/replication/sink/azuresink/azure_sink.go +++ b/weed/replication/sink/azuresink/azure_sink.go @@ -12,7 +12,7 @@ import ( "github.com/Azure/azure-storage-blob-go/azblob" "github.com/seaweedfs/seaweedfs/weed/filer" - "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util/log" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/replication/sink" "github.com/seaweedfs/seaweedfs/weed/replication/source" @@ -64,7 +64,7 @@ func (g *AzureSink) initialize(accountName, accountKey, container, dir string) e // Use your Storage account's name and key to create a credential object. credential, err := azblob.NewSharedKeyCredential(accountName, accountKey) if err != nil { - glog.Fatalf("failed to create Azure credential with account name:%s: %v", accountName, err) + log.Fatalf("failed to create Azure credential with account name:%s: %v", accountName, err) } // Create a request pipeline that is used to process HTTP(S) requests and responses. @@ -118,7 +118,7 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [] res, err := appendBlobURL.Create(context.Background(), azblob.BlobHTTPHeaders{}, azblob.Metadata{}, accessCondition, azblob.BlobTagsMap{}, azblob.ClientProvidedKeyOptions{}, azblob.ImmutabilityPolicyOptions{}) if res != nil && res.StatusCode() == http.StatusPreconditionFailed { - glog.V(0).Infof("skip overwriting %s/%s: %v", g.container, key, err) + log.V(3).Infof("skip overwriting %s/%s: %v", g.container, key, err) return nil } if err != nil { diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go index 4bcbc7898..a9a422c9b 100644 --- a/weed/replication/sink/filersink/fetch_write.go +++ b/weed/replication/sink/filersink/fetch_write.go @@ -10,7 +10,7 @@ import ( "google.golang.org/grpc" - "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util/log" "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" @@ -93,7 +93,7 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string) uploader, err := operation.NewUploader() if err != nil { - glog.V(0).Infof("upload source data %v: %v", sourceChunk.GetFileIdString(), err) + log.V(3).Infof("upload source data %v: %v", sourceChunk.GetFileIdString(), err) return "", fmt.Errorf("upload data: %v", err) } @@ -120,18 +120,18 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string) if fs.writeChunkByFiler { fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", fs.address, fileId) } - glog.V(4).Infof("replicating %s to %s header:%+v", filename, fileUrl, header) + log.V(-1).Infof("replicating %s to %s header:%+v", filename, fileUrl, header) return fileUrl }, resp.Body, ) if err != nil { - glog.V(0).Infof("upload source data %v: %v", sourceChunk.GetFileIdString(), err) + log.V(3).Infof("upload source data %v: %v", sourceChunk.GetFileIdString(), err) return "", fmt.Errorf("upload data: %v", err) } if uploadResult.Error != "" { - glog.V(0).Infof("upload failure %v: %v", filename, err) + log.V(3).Infof("upload failure %v: %v", filename, err) return "", fmt.Errorf("upload result: %v", uploadResult.Error) } diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go index 49f6877a0..6e5d52327 100644 --- a/weed/replication/sink/filersink/filer_sink.go +++ b/weed/replication/sink/filersink/filer_sink.go @@ -12,7 +12,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/security" "github.com/seaweedfs/seaweedfs/weed/filer" - "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util/log" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/replication/sink" "github.com/seaweedfs/seaweedfs/weed/replication/source" @@ -94,10 +94,10 @@ func (fs *FilerSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bo dir, name := util.FullPath(key).DirAndName() - glog.V(4).Infof("delete entry: %v", key) + log.V(-1).Infof("delete entry: %v", key) err := filer_pb.Remove(fs, dir, name, deleteIncludeChunks, true, true, true, signatures) if err != nil { - glog.V(0).Infof("delete entry %s: %v", key, err) + log.V(3).Infof("delete entry %s: %v", key, err) return fmt.Errorf("delete entry %s: %v", key, err) } return nil @@ -114,14 +114,14 @@ func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [ Directory: dir, Name: name, } - // glog.V(1).Infof("lookup: %v", lookupRequest) + // log.V(2).Infof("lookup: %v", lookupRequest) if resp, err := filer_pb.LookupEntry(client, lookupRequest); err == nil { if filer.ETag(resp.Entry) == filer.ETag(entry) { - glog.V(3).Infof("already replicated %s", key) + log.V(0).Infof("already replicated %s", key) return nil } if resp.Entry.Attributes != nil && resp.Entry.Attributes.Mtime >= entry.Attributes.Mtime { - glog.V(3).Infof("skip overwriting %s", key) + log.V(0).Infof("skip overwriting %s", key) return nil } } @@ -130,11 +130,11 @@ func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [ if err != nil { // only warning here since the source chunk may have been deleted already - glog.Warningf("replicate entry chunks %s: %v", key, err) + log.Warningf("replicate entry chunks %s: %v", key, err) return nil } - // glog.V(4).Infof("replicated %s %+v ===> %+v", key, entry.GetChunks(), replicatedChunks) + // log.V(-1).Infof("replicated %s %+v ===> %+v", key, entry.GetChunks(), replicatedChunks) request := &filer_pb.CreateEntryRequest{ Directory: dir, @@ -151,9 +151,9 @@ func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry, signatures [ Signatures: signatures, } - glog.V(3).Infof("create: %v", request) + log.V(0).Infof("create: %v", request) if err := filer_pb.CreateEntry(client, request); err != nil { - glog.V(0).Infof("create entry %s: %v", key, err) + log.V(3).Infof("create entry %s: %v", key, err) return fmt.Errorf("create entry %s: %v", key, err) } @@ -174,10 +174,10 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent Name: name, } - glog.V(4).Infof("lookup entry: %v", request) + log.V(-1).Infof("lookup entry: %v", request) resp, err := filer_pb.LookupEntry(client, request) if err != nil { - glog.V(0).Infof("lookup %s: %v", key, err) + log.V(3).Infof("lookup %s: %v", key, err) return err } @@ -190,12 +190,12 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent return false, fmt.Errorf("lookup %s: %v", key, err) } - glog.V(4).Infof("oldEntry %+v, newEntry %+v, existingEntry: %+v", oldEntry, newEntry, existingEntry) + log.V(-1).Infof("oldEntry %+v, newEntry %+v, existingEntry: %+v", oldEntry, newEntry, existingEntry) if existingEntry.Attributes.Mtime > newEntry.Attributes.Mtime { // skip if already changed // this usually happens when the messages are not ordered - glog.V(2).Infof("late updates %s", key) + log.V(1).Infof("late updates %s", key) } else { // find out what changed deletedChunks, newChunks, err := compareChunks(filer.LookupFn(fs), oldEntry, newEntry) @@ -212,7 +212,7 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent // replicate the chunks that are new in the source replicatedChunks, err := fs.replicateChunks(newChunks, key) if err != nil { - glog.Warningf("replicate entry chunks %s: %v", key, err) + log.Warningf("replicate entry chunks %s: %v", key, err) return true, nil } existingEntry.Chunks = append(existingEntry.GetChunks(), replicatedChunks...) diff --git a/weed/replication/sink/gcssink/gcs_sink.go b/weed/replication/sink/gcssink/gcs_sink.go index db6ea4aec..f820ae9a6 100644 --- a/weed/replication/sink/gcssink/gcs_sink.go +++ b/weed/replication/sink/gcssink/gcs_sink.go @@ -10,7 +10,7 @@ import ( "google.golang.org/api/option" "github.com/seaweedfs/seaweedfs/weed/filer" - "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util/log" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/replication/sink" "github.com/seaweedfs/seaweedfs/weed/replication/source" @@ -63,12 +63,12 @@ func (g *GcsSink) initialize(google_application_credentials, bucketName, dir str var found bool google_application_credentials, found = os.LookupEnv("GOOGLE_APPLICATION_CREDENTIALS") if !found { - glog.Fatalf("need to specific GOOGLE_APPLICATION_CREDENTIALS env variable or google_application_credentials in replication.toml") + log.Fatalf("need to specific GOOGLE_APPLICATION_CREDENTIALS env variable or google_application_credentials in replication.toml") } } client, err := storage.NewClient(context.Background(), option.WithCredentialsFile(google_application_credentials)) if err != nil { - glog.Fatalf("Failed to create client: %v", err) + log.Fatalf("Failed to create client: %v", err) } g.client = client diff --git a/weed/replication/sink/localsink/local_sink.go b/weed/replication/sink/localsink/local_sink.go index c6dddb80a..fd6745b5d 100644 --- a/weed/replication/sink/localsink/local_sink.go +++ b/weed/replication/sink/localsink/local_sink.go @@ -2,7 +2,7 @@ package localsink import ( "github.com/seaweedfs/seaweedfs/weed/filer" - "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util/log" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/replication/repl_util" "github.com/seaweedfs/seaweedfs/weed/replication/sink" @@ -45,7 +45,7 @@ func (localsink *LocalSink) initialize(dir string, isIncremental bool) error { func (localsink *LocalSink) Initialize(configuration util.Configuration, prefix string) error { dir := configuration.GetString(prefix + "directory") isIncremental := configuration.GetBool(prefix + "is_incremental") - glog.V(4).Infof("sink.local.directory: %v", dir) + log.V(-1).Infof("sink.local.directory: %v", dir) return localsink.initialize(dir, isIncremental) } @@ -61,9 +61,9 @@ func (localsink *LocalSink) DeleteEntry(key string, isDirectory, deleteIncludeCh if localsink.isMultiPartEntry(key) { return nil } - glog.V(4).Infof("Delete Entry key: %s", key) + log.V(-1).Infof("Delete Entry key: %s", key) if err := os.Remove(key); err != nil { - glog.V(0).Infof("remove entry key %s: %s", key, err) + log.V(3).Infof("remove entry key %s: %s", key, err) } return nil } @@ -72,7 +72,7 @@ func (localsink *LocalSink) CreateEntry(key string, entry *filer_pb.Entry, signa if entry.IsDirectory || localsink.isMultiPartEntry(key) { return nil } - glog.V(4).Infof("Create Entry key: %s", key) + log.V(-1).Infof("Create Entry key: %s", key) totalSize := filer.FileSize(entry) chunkViews := filer.ViewFromChunks(localsink.filerSource.LookupFileId, entry.GetChunks(), 0, int64(totalSize)) @@ -80,7 +80,7 @@ func (localsink *LocalSink) CreateEntry(key string, entry *filer_pb.Entry, signa dir := filepath.Dir(key) if _, err := os.Stat(dir); os.IsNotExist(err) { - glog.V(4).Infof("Create Directory key: %s", dir) + log.V(-1).Infof("Create Directory key: %s", dir) if err = os.MkdirAll(dir, 0755); err != nil { return err } @@ -102,7 +102,7 @@ func (localsink *LocalSink) CreateEntry(key string, entry *filer_pb.Entry, signa return err } if fi.Mode() != mode { - glog.V(4).Infof("Modify file mode: %o -> %o", fi.Mode(), mode) + log.V(-1).Infof("Modify file mode: %o -> %o", fi.Mode(), mode) if err := dstFile.Chmod(mode); err != nil { return err } @@ -128,7 +128,7 @@ func (localsink *LocalSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, ne if localsink.isMultiPartEntry(key) { return true, nil } - glog.V(4).Infof("Update Entry key: %s", key) + log.V(-1).Infof("Update Entry key: %s", key) // do delete and create foundExistingEntry = util.FileExists(key) err = localsink.CreateEntry(key, newEntry, signatures) diff --git a/weed/replication/sink/s3sink/s3_sink.go b/weed/replication/sink/s3sink/s3_sink.go index 279108e16..9e74cf9c1 100644 --- a/weed/replication/sink/s3sink/s3_sink.go +++ b/weed/replication/sink/s3sink/s3_sink.go @@ -14,7 +14,7 @@ import ( "strings" "github.com/seaweedfs/seaweedfs/weed/filer" - "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/util/log" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/replication/sink" "github.com/seaweedfs/seaweedfs/weed/replication/source" @@ -76,24 +76,24 @@ func (s3sink *S3Sink) Initialize(configuration util.Configuration, prefix string s3sink.uploaderPartSizeMb = configuration.GetInt(prefix + "uploader_part_size") s3sink.uploaderConcurrency = configuration.GetInt(prefix + "uploader_concurrency") - glog.V(0).Infof("sink.s3.region: %v", s3sink.region) - glog.V(0).Infof("sink.s3.bucket: %v", s3sink.bucket) - glog.V(0).Infof("sink.s3.directory: %v", s3sink.dir) - glog.V(0).Infof("sink.s3.endpoint: %v", s3sink.endpoint) - glog.V(0).Infof("sink.s3.acl: %v", s3sink.acl) - glog.V(0).Infof("sink.s3.is_incremental: %v", s3sink.isIncremental) - glog.V(0).Infof("sink.s3.s3_disable_content_md5_validation: %v", s3sink.s3DisableContentMD5Validation) - glog.V(0).Infof("sink.s3.s3_force_path_style: %v", s3sink.s3ForcePathStyle) - glog.V(0).Infof("sink.s3.keep_part_size: %v", s3sink.keepPartSize) + log.V(3).Infof("sink.s3.region: %v", s3sink.region) + log.V(3).Infof("sink.s3.bucket: %v", s3sink.bucket) + log.V(3).Infof("sink.s3.directory: %v", s3sink.dir) + log.V(3).Infof("sink.s3.endpoint: %v", s3sink.endpoint) + log.V(3).Infof("sink.s3.acl: %v", s3sink.acl) + log.V(3).Infof("sink.s3.is_incremental: %v", s3sink.isIncremental) + log.V(3).Infof("sink.s3.s3_disable_content_md5_validation: %v", s3sink.s3DisableContentMD5Validation) + log.V(3).Infof("sink.s3.s3_force_path_style: %v", s3sink.s3ForcePathStyle) + log.V(3).Infof("sink.s3.keep_part_size: %v", s3sink.keepPartSize) if s3sink.uploaderMaxUploadParts > s3manager.MaxUploadParts { s3sink.uploaderMaxUploadParts = s3manager.MaxUploadParts - glog.Warningf("uploader_max_upload_parts is greater than the maximum number of parts allowed when uploading multiple parts to Amazon S3") - glog.V(0).Infof("sink.s3.uploader_max_upload_parts: %v => %v", s3sink.uploaderMaxUploadParts, s3manager.MaxUploadParts) + log.Warningf("uploader_max_upload_parts is greater than the maximum number of parts allowed when uploading multiple parts to Amazon S3") + log.V(3).Infof("sink.s3.uploader_max_upload_parts: %v => %v", s3sink.uploaderMaxUploadParts, s3manager.MaxUploadParts) } else { - glog.V(0).Infof("sink.s3.uploader_max_upload_parts: %v", s3sink.uploaderMaxUploadParts) + log.V(3).Infof("sink.s3.uploader_max_upload_parts: %v", s3sink.uploaderMaxUploadParts) } - glog.V(0).Infof("sink.s3.uploader_part_size_mb: %v", s3sink.uploaderPartSizeMb) - glog.V(0).Infof("sink.s3.uploader_concurrency: %v", s3sink.uploaderConcurrency) + log.V(3).Infof("sink.s3.uploader_part_size_mb: %v", s3sink.uploaderPartSizeMb) + log.V(3).Infof("sink.s3.uploader_concurrency: %v", s3sink.uploaderConcurrency) return s3sink.initialize( configuration.GetString(prefix+"aws_access_key_id"), @@ -141,9 +141,9 @@ func (s3sink *S3Sink) DeleteEntry(key string, isDirectory, deleteIncludeChunks b result, err := s3sink.conn.DeleteObject(input) if err == nil { - glog.V(2).Infof("[%s] delete %s: %v", s3sink.bucket, key, result) + log.V(1).Infof("[%s] delete %s: %v", s3sink.bucket, key, result) } else { - glog.Errorf("[%s] delete %s: %v", s3sink.bucket, key, err) + log.Errorf("[%s] delete %s: %v", s3sink.bucket, key, err) } return err |
