aboutsummaryrefslogtreecommitdiff
path: root/weed/shell/command_remote_uncache.go
blob: 86d992ef1e6d2701a1756d3d8f36428894695d9a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
package shell

import (
	"context"
	"flag"
	"fmt"
	"io"
	"path/filepath"
	"strings"
	"time"

	"github.com/seaweedfs/seaweedfs/weed/filer"
	"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
	"github.com/seaweedfs/seaweedfs/weed/util"
)

func init() {
	Commands = append(Commands, &commandRemoteUncache{})
}

type commandRemoteUncache struct {
}

func (c *commandRemoteUncache) Name() string {
	return "remote.uncache"
}

func (c *commandRemoteUncache) Help() string {
	return `keep the metadata but remote cache the file content for mounted directories or files

	This is designed to run regularly. So you can add it to some cronjob.
	If a file is not synchronized with the remote copy, the file will be skipped to avoid loss of data.

	remote.uncache -dir=/xxx
	remote.uncache -dir=/xxx/some/sub/dir
	remote.uncache -dir=/xxx/some/sub/dir -include=*.pdf
	remote.uncache -dir=/xxx/some/sub/dir -exclude=*.txt
	remote.uncache -minSize=1024000    # uncache files larger than 100K
	remote.uncache -minAge=3600        # uncache files older than 1 hour

`
}

func (c *commandRemoteUncache) HasTag(CommandTag) bool {
	return false
}

func (c *commandRemoteUncache) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {

	remoteUncacheCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)

	dir := remoteUncacheCommand.String("dir", "", "a directory in filer")
	fileFiler := newFileFilter(remoteUncacheCommand)

	if err = remoteUncacheCommand.Parse(args); err != nil {
		return nil
	}

	mappings, listErr := filer.ReadMountMappings(commandEnv.option.GrpcDialOption, commandEnv.option.FilerAddress)
	if listErr != nil {
		return listErr
	}
	if *dir != "" {
		var localMountedDir string
		for k := range mappings.Mappings {
			if strings.HasPrefix(*dir, k) {
				localMountedDir = k
			}
		}
		if localMountedDir == "" {
			jsonPrintln(writer, mappings)
			fmt.Fprintf(writer, "%s is not mounted\n", *dir)
			return nil
		}

		// pull content from remote
		if err = c.uncacheContentData(commandEnv, writer, util.FullPath(*dir), fileFiler); err != nil {
			return fmt.Errorf("uncache content data: %v", err)
		}
		return nil
	}

	for key, _ := range mappings.Mappings {
		if err := c.uncacheContentData(commandEnv, writer, util.FullPath(key), fileFiler); err != nil {
			return err
		}
	}

	return nil
}

func (c *commandRemoteUncache) uncacheContentData(commandEnv *CommandEnv, writer io.Writer, dirToCache util.FullPath, fileFilter *FileFilter) error {

	return recursivelyTraverseDirectory(commandEnv, dirToCache, func(dir util.FullPath, entry *filer_pb.Entry) bool {

		if !mayHaveCachedToLocal(entry) {
			return true // true means recursive traversal should continue
		}

		if !fileFilter.matches(entry) {
			return true
		}

		if entry.RemoteEntry.LastLocalSyncTsNs/1e9 < entry.Attributes.Mtime {
			return true // should not uncache an entry that is not synchronized with remote
		}

		entry.RemoteEntry.LastLocalSyncTsNs = 0
		entry.Chunks = nil

		fmt.Fprintf(writer, "Uncache %+v ... ", dir.Child(entry.Name))

		err := commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
			_, updateErr := client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{
				Directory: string(dir),
				Entry:     entry,
			})
			return updateErr
		})
		if err != nil {
			fmt.Fprintf(writer, "uncache %+v: %v\n", dir.Child(entry.Name), err)
			return false
		}
		fmt.Fprintf(writer, "Done\n")

		return true
	})
}

type FileFilter struct {
	include *string
	exclude *string
	minSize *int64
	maxSize *int64
	minAge  *int64
	maxAge  *int64
}

func newFileFilter(remoteMountCommand *flag.FlagSet) (ff *FileFilter) {
	ff = &FileFilter{}
	ff.include = remoteMountCommand.String("include", "", "pattens of file names, e.g., *.pdf, *.html, ab?d.txt")
	ff.exclude = remoteMountCommand.String("exclude", "", "pattens of file names, e.g., *.pdf, *.html, ab?d.txt")
	ff.minSize = remoteMountCommand.Int64("minSize", -1, "minimum file size in bytes")
	ff.maxSize = remoteMountCommand.Int64("maxSize", -1, "maximum file size in bytes")
	ff.minAge = remoteMountCommand.Int64("minAge", -1, "minimum file age in seconds")
	ff.maxAge = remoteMountCommand.Int64("maxAge", -1, "maximum file age in seconds")
	return
}

func (ff *FileFilter) matches(entry *filer_pb.Entry) bool {
	if *ff.include != "" {
		if ok, _ := filepath.Match(*ff.include, entry.Name); !ok {
			return false
		}
	}
	if *ff.exclude != "" {
		if ok, _ := filepath.Match(*ff.exclude, entry.Name); ok {
			return false
		}
	}
	if *ff.minSize != -1 {
		if int64(entry.Attributes.FileSize) < *ff.minSize {
			return false
		}
	}
	if *ff.maxSize != -1 {
		if int64(entry.Attributes.FileSize) > *ff.maxSize {
			return false
		}
	}
	if *ff.minAge != -1 {
		if entry.Attributes.Crtime+*ff.minAge > time.Now().Unix() {
			return false
		}
	}
	if *ff.maxAge != -1 {
		if entry.Attributes.Crtime+*ff.maxAge < time.Now().Unix() {
			return false
		}
	}
	return true
}