1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
|
package shell
import (
"flag"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/remote_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"io"
"sync"
)
func init() {
Commands = append(Commands, &commandRemoteCache{})
}
type commandRemoteCache struct {
}
func (c *commandRemoteCache) Name() string {
return "remote.cache"
}
func (c *commandRemoteCache) Help() string {
return `cache the file content for mounted directories or files
# assume a remote storage is configured to name "cloud1"
remote.configure -name=cloud1 -type=s3 -s3.access_key=xxx -s3.secret_key=yyy
# mount and pull one bucket
remote.mount -dir=/xxx -remote=cloud1/bucket
# after mount, run one of these command to cache the content of the files
remote.cache -dir=/xxx
remote.cache -dir=/xxx/some/sub/dir
remote.cache -dir=/xxx/some/sub/dir -include=*.pdf
remote.cache -dir=/xxx/some/sub/dir -exclude=*.txt
remote.cache -maxSize=1024000 # cache files smaller than 100K
remote.cache -maxAge=3600 # cache files less than 1 hour old
This is designed to run regularly. So you can add it to some cronjob.
If a file is already synchronized with the remote copy, the file will be skipped to avoid unnecessary copy.
The actual data copying goes through volume severs in parallel.
`
}
func (c *commandRemoteCache) HasTag(CommandTag) bool {
return false
}
func (c *commandRemoteCache) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
remoteMountCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
dir := remoteMountCommand.String("dir", "", "a mounted directory or one of its sub folders in filer")
concurrency := remoteMountCommand.Int("concurrent", 32, "concurrent file downloading")
fileFiler := newFileFilter(remoteMountCommand)
if err = remoteMountCommand.Parse(args); err != nil {
return nil
}
if *dir != "" {
if err := c.doCacheOneDirectory(commandEnv, writer, *dir, fileFiler, *concurrency); err != nil {
return err
}
return nil
}
mappings, err := filer.ReadMountMappings(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress)
if err != nil {
return err
}
for key, _ := range mappings.Mappings {
if err := c.doCacheOneDirectory(commandEnv, writer, key, fileFiler, *concurrency); err != nil {
return err
}
}
return nil
}
func (c *commandRemoteCache) doCacheOneDirectory(commandEnv *CommandEnv, writer io.Writer, dir string, fileFiler *FileFilter, concurrency int) error {
mappings, localMountedDir, remoteStorageMountedLocation, remoteStorageConf, detectErr := detectMountInfo(commandEnv, writer, dir)
if detectErr != nil {
jsonPrintln(writer, mappings)
return detectErr
}
// pull content from remote
if err := c.cacheContentData(commandEnv, writer, util.FullPath(localMountedDir), remoteStorageMountedLocation, util.FullPath(dir), fileFiler, remoteStorageConf, concurrency); err != nil {
return fmt.Errorf("cache content data on %s: %v", localMountedDir, err)
}
return nil
}
func recursivelyTraverseDirectory(filerClient filer_pb.FilerClient, dirPath util.FullPath, visitEntry func(dir util.FullPath, entry *filer_pb.Entry) bool) (err error) {
err = filer_pb.ReadDirAllEntries(filerClient, dirPath, "", func(entry *filer_pb.Entry, isLast bool) error {
if entry.IsDirectory {
if !visitEntry(dirPath, entry) {
return nil
}
subDir := dirPath.Child(entry.Name)
if err := recursivelyTraverseDirectory(filerClient, subDir, visitEntry); err != nil {
return err
}
} else {
if !visitEntry(dirPath, entry) {
return nil
}
}
return nil
})
return
}
func shouldCacheToLocal(entry *filer_pb.Entry) bool {
if entry.IsDirectory {
return false
}
if entry.RemoteEntry == nil {
return false
}
if entry.RemoteEntry.LastLocalSyncTsNs == 0 && entry.RemoteEntry.RemoteSize > 0 {
return true
}
return false
}
func mayHaveCachedToLocal(entry *filer_pb.Entry) bool {
if entry.IsDirectory {
return false
}
if entry.RemoteEntry == nil {
return false // should not uncache an entry that is not in remote
}
if entry.RemoteEntry.LastLocalSyncTsNs > 0 {
return true
}
return false
}
func (c *commandRemoteCache) cacheContentData(commandEnv *CommandEnv, writer io.Writer, localMountedDir util.FullPath, remoteMountedLocation *remote_pb.RemoteStorageLocation, dirToCache util.FullPath, fileFilter *FileFilter, remoteConf *remote_pb.RemoteConf, concurrency int) error {
var wg sync.WaitGroup
limitedConcurrentExecutor := util.NewLimitedConcurrentExecutor(concurrency)
var executionErr error
traverseErr := recursivelyTraverseDirectory(commandEnv, dirToCache, func(dir util.FullPath, entry *filer_pb.Entry) bool {
if !shouldCacheToLocal(entry) {
return true // true means recursive traversal should continue
}
if !fileFilter.matches(entry) {
return true
}
wg.Add(1)
limitedConcurrentExecutor.Execute(func() {
defer wg.Done()
fmt.Fprintf(writer, "Cache %+v ...\n", dir.Child(entry.Name))
remoteLocation := filer.MapFullPathToRemoteStorageLocation(localMountedDir, remoteMountedLocation, dir.Child(entry.Name))
if err := filer.CacheRemoteObjectToLocalCluster(commandEnv, remoteConf, remoteLocation, dir, entry); err != nil {
fmt.Fprintf(writer, "CacheRemoteObjectToLocalCluster %+v: %v\n", remoteLocation, err)
if executionErr == nil {
executionErr = fmt.Errorf("CacheRemoteObjectToLocalCluster %+v: %v\n", remoteLocation, err)
}
return
}
fmt.Fprintf(writer, "Cache %+v Done\n", dir.Child(entry.Name))
})
return true
})
wg.Wait()
if traverseErr != nil {
return traverseErr
}
if executionErr != nil {
return executionErr
}
return nil
}
|