aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2021-01-27 00:15:16 -0800
committerGitHub <noreply@github.com>2021-01-27 00:15:16 -0800
commit2266b5c5286fd4e70d05fc4c5c64cd617809982f (patch)
tree470124377fe37747b740fa6ff000018d62f122aa
parent5e07afb0f0c1eca20da0780c806973db40ea6a24 (diff)
parent36348114084e7ff583f52ef93a73e1cd8c8f58b5 (diff)
downloadseaweedfs-2266b5c5286fd4e70d05fc4c5c64cd617809982f.tar.xz
seaweedfs-2266b5c5286fd4e70d05fc4c5c64cd617809982f.zip
Merge pull request #1761 from kmlebedev/todaysDateFormat
replication to todays date directory
-rw-r--r--docker/notification.toml3
-rw-r--r--docker/replication.toml3
-rw-r--r--weed/command/scaffold.go5
-rw-r--r--weed/replication/sink/localsink/local_sink.go17
4 files changed, 20 insertions, 8 deletions
diff --git a/docker/notification.toml b/docker/notification.toml
index 4ed76825e..dcd5f2c6f 100644
--- a/docker/notification.toml
+++ b/docker/notification.toml
@@ -11,6 +11,7 @@ enabled = true
# This URL will Dial the RabbitMQ server at the URL in the environment
# variable RABBIT_SERVER_URL and open the exchange "myexchange".
# The exchange must have already been created by some other means, like
-# the RabbitMQ management plugin.
+# the RabbitMQ management plugin. Сreate myexchange of type fanout and myqueue then
+# create binding myexchange => myqueue
topic_url = "rabbit://swexchange"
sub_url = "rabbit://swqueue" \ No newline at end of file
diff --git a/docker/replication.toml b/docker/replication.toml
index 84c0bec28..2cee755a5 100644
--- a/docker/replication.toml
+++ b/docker/replication.toml
@@ -8,4 +8,5 @@ directory = "/buckets"
[sink.local]
enabled = true
-directory = "/data" \ No newline at end of file
+directory = "/data"
+todays_date_format = "2006-02-01" \ No newline at end of file
diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go
index 415e0cba3..3b88e107d 100644
--- a/weed/command/scaffold.go
+++ b/weed/command/scaffold.go
@@ -329,7 +329,8 @@ enabled = false
# This URL will Dial the RabbitMQ server at the URL in the environment
# variable RABBIT_SERVER_URL and open the exchange "myexchange".
# The exchange must have already been created by some other means, like
-# the RabbitMQ management plugin.
+# the RabbitMQ management plugin. Сreate myexchange of type fanout and myqueue then
+# create binding myexchange => myqueue
topic_url = "rabbit://myexchange"
sub_url = "rabbit://myqueue"
`
@@ -353,6 +354,8 @@ directory = "/buckets"
[sink.local]
enabled = false
directory = "/backup"
+# all replicated files are under todays date directory tree
+todays_date_format = ""
[sink.filer]
enabled = false
diff --git a/weed/replication/sink/localsink/local_sink.go b/weed/replication/sink/localsink/local_sink.go
index 76e0384e5..5ca562ec8 100644
--- a/weed/replication/sink/localsink/local_sink.go
+++ b/weed/replication/sink/localsink/local_sink.go
@@ -12,11 +12,13 @@ import (
"os"
"path/filepath"
"strings"
+ "time"
)
type LocalSink struct {
- dir string
- filerSource *source.FilerSource
+ dir string
+ todaysDateFormat string
+ filerSource *source.FilerSource
}
func init() {
@@ -35,18 +37,23 @@ func (localsink *LocalSink) isMultiPartEntry(key string) bool {
return strings.HasSuffix(key, ".part") && strings.Contains(key, "/.uploads/")
}
-func (localsink *LocalSink) initialize(dir string) error {
+func (localsink *LocalSink) initialize(dir string, todaysDateFormat string) error {
localsink.dir = dir
+ localsink.todaysDateFormat = todaysDateFormat
return nil
}
func (localsink *LocalSink) Initialize(configuration util.Configuration, prefix string) error {
dir := configuration.GetString(prefix + "directory")
+ todaysDateFormat := configuration.GetString(prefix + "todays_date_format")
glog.V(4).Infof("sink.local.directory: %v", dir)
- return localsink.initialize(dir)
+ return localsink.initialize(dir, todaysDateFormat)
}
func (localsink *LocalSink) GetSinkToDirectory() string {
+ if localsink.todaysDateFormat != "" {
+ return filepath.Join(localsink.dir, time.Now().Format(localsink.todaysDateFormat))
+ }
return localsink.dir
}
@@ -56,7 +63,7 @@ func (localsink *LocalSink) DeleteEntry(key string, isDirectory, deleteIncludeCh
}
glog.V(4).Infof("Delete Entry key: %s", key)
if err := os.Remove(key); err != nil {
- return err
+ glog.V(0).Infof("remove entry key %s: %s", key, err)
}
return nil
}