aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-03-09 01:02:01 -0700
committerChris Lu <chris.lu@gmail.com>2020-03-09 01:02:01 -0700
commit89eb05b50f10b6ca74a374e5435df2f72019f635 (patch)
tree9544704b3dd90da8a9cb4ecc3647ad7398e97d3e
parent8a899992f2d3f1b248f91065f3d5c8db3e10f325 (diff)
downloadseaweedfs-89eb05b50f10b6ca74a374e5435df2f72019f635.tar.xz
seaweedfs-89eb05b50f10b6ca74a374e5435df2f72019f635.zip
filer: support TTL for all filer stores
-rw-r--r--weed/command/filer_copy.go17
-rw-r--r--weed/filer2/filer.go28
-rw-r--r--weed/filer2/filer_buckets.go2
-rw-r--r--weed/filer2/filer_delete_entry.go2
-rw-r--r--weed/filer2/leveldb/leveldb_store_test.go6
-rw-r--r--weed/filer2/leveldb2/leveldb2_store_test.go6
-rw-r--r--weed/server/filer_grpc_server.go4
-rw-r--r--weed/server/filer_grpc_server_rename.go2
-rw-r--r--weed/server/filer_server_handlers_read_dir.go4
-rw-r--r--weed/storage/volume.go4
10 files changed, 53 insertions, 22 deletions
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go
index 1162bb204..0aee8cd80 100644
--- a/weed/command/filer_copy.go
+++ b/weed/command/filer_copy.go
@@ -20,6 +20,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/wdclient"
)
@@ -41,6 +42,7 @@ type CopyOptions struct {
grpcDialOption grpc.DialOption
masters []string
cipher bool
+ ttlSec int32
}
func init() {
@@ -124,6 +126,13 @@ func runCopy(cmd *Command, args []string) bool {
copy.masters = masters
copy.cipher = cipher
+ ttl, err := needle.ReadTTL(*copy.ttl)
+ if err != nil {
+ fmt.Printf("parsing ttl %s: %v\n", *copy.ttl, err)
+ return false
+ }
+ copy.ttlSec = int32(ttl.Minutes()) * 60
+
if *cmdCopy.IsDebug {
util.SetupProfiling("filer.copy.cpu.pprof", "filer.copy.mem.pprof")
}
@@ -286,7 +295,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err
Count: 1,
Replication: *worker.options.replication,
Collection: *worker.options.collection,
- TtlSec: int32(util.ParseInt(*worker.options.ttl, 0)),
+ TtlSec: worker.options.ttlSec,
ParentPath: task.destinationUrlPath,
}
@@ -342,7 +351,7 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err
Mime: mimeType,
Replication: *worker.options.replication,
Collection: *worker.options.collection,
- TtlSec: int32(util.ParseInt(*worker.options.ttl, 0)),
+ TtlSec: worker.options.ttlSec,
},
Chunks: chunks,
},
@@ -388,7 +397,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
Count: 1,
Replication: *worker.options.replication,
Collection: *worker.options.collection,
- TtlSec: int32(util.ParseInt(*worker.options.ttl, 0)),
+ TtlSec: worker.options.ttlSec,
ParentPath: task.destinationUrlPath,
}
@@ -469,7 +478,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
Mime: mimeType,
Replication: replication,
Collection: collection,
- TtlSec: int32(util.ParseInt(*worker.options.ttl, 0)),
+ TtlSec: worker.options.ttlSec,
},
Chunks: chunks,
},
diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go
index 0b6a5c96e..c3048b45d 100644
--- a/weed/filer2/filer.go
+++ b/weed/filer2/filer.go
@@ -223,14 +223,36 @@ func (f *Filer) FindEntry(ctx context.Context, p FullPath) (entry *Entry, err er
},
}, nil
}
- return f.store.FindEntry(ctx, p)
+ entry, err = f.store.FindEntry(ctx, p)
+ if entry != nil && entry.TtlSec > 0 {
+ if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
+ f.store.DeleteEntry(ctx, p.Child(entry.Name()))
+ return nil, filer_pb.ErrNotFound
+ }
+ }
+ return
+
}
-func (f *Filer) ListDirectoryEntries(ctx context.Context, p FullPath, startFileName string, inclusive bool, limit int) ([]*Entry, error) {
+func (f *Filer) ListDirectoryEntries(ctx context.Context, p FullPath, startFileName string, inclusive bool, limit int) (entries []*Entry, expiredCount int, err error) {
if strings.HasSuffix(string(p), "/") && len(p) > 1 {
p = p[0 : len(p)-1]
}
- return f.store.ListDirectoryEntries(ctx, p, startFileName, inclusive, limit)
+ listedEntries, listErr := f.store.ListDirectoryEntries(ctx, p, startFileName, inclusive, limit)
+ if listErr != nil {
+ return listedEntries, expiredCount, err
+ }
+ for _, entry := range listedEntries {
+ if entry.TtlSec > 0 {
+ if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
+ f.store.DeleteEntry(ctx, p.Child(entry.Name()))
+ expiredCount++
+ continue
+ }
+ }
+ entries = append(entries, entry)
+ }
+ return
}
func (f *Filer) cacheDelDirectory(dirpath string) {
diff --git a/weed/filer2/filer_buckets.go b/weed/filer2/filer_buckets.go
index 601b7dbf3..cb65fea14 100644
--- a/weed/filer2/filer_buckets.go
+++ b/weed/filer2/filer_buckets.go
@@ -28,7 +28,7 @@ func (f *Filer) LoadBuckets(dirBucketsPath string) {
limit := math.MaxInt32
- entries, err := f.ListDirectoryEntries(context.Background(), FullPath(dirBucketsPath), "", false, limit)
+ entries, _, err := f.ListDirectoryEntries(context.Background(), FullPath(dirBucketsPath), "", false, limit)
if err != nil {
glog.V(1).Infof("no buckets found: %v", err)
diff --git a/weed/filer2/filer_delete_entry.go b/weed/filer2/filer_delete_entry.go
index d0792ac66..b7ec805c5 100644
--- a/weed/filer2/filer_delete_entry.go
+++ b/weed/filer2/filer_delete_entry.go
@@ -57,7 +57,7 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry
lastFileName := ""
includeLastFile := false
for {
- entries, err := f.ListDirectoryEntries(ctx, entry.FullPath, lastFileName, includeLastFile, PaginationSize)
+ entries, _, err := f.ListDirectoryEntries(ctx, entry.FullPath, lastFileName, includeLastFile, PaginationSize)
if err != nil {
glog.Errorf("list folder %s: %v", entry.FullPath, err)
return nil, fmt.Errorf("list folder %s: %v", entry.FullPath, err)
diff --git a/weed/filer2/leveldb/leveldb_store_test.go b/weed/filer2/leveldb/leveldb_store_test.go
index 497158420..dcb99a3bd 100644
--- a/weed/filer2/leveldb/leveldb_store_test.go
+++ b/weed/filer2/leveldb/leveldb_store_test.go
@@ -48,14 +48,14 @@ func TestCreateAndFind(t *testing.T) {
}
// checking one upper directory
- entries, _ := filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is/one"), "", false, 100)
+ entries, _, _ := filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is/one"), "", false, 100)
if len(entries) != 1 {
t.Errorf("list entries count: %v", len(entries))
return
}
// checking one upper directory
- entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100)
+ entries, _, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100)
if len(entries) != 1 {
t.Errorf("list entries count: %v", len(entries))
return
@@ -75,7 +75,7 @@ func TestEmptyRoot(t *testing.T) {
ctx := context.Background()
// checking one upper directory
- entries, err := filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100)
+ entries, _, err := filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100)
if err != nil {
t.Errorf("list entries: %v", err)
return
diff --git a/weed/filer2/leveldb2/leveldb2_store_test.go b/weed/filer2/leveldb2/leveldb2_store_test.go
index dc94f2ac7..c1f2d6a0c 100644
--- a/weed/filer2/leveldb2/leveldb2_store_test.go
+++ b/weed/filer2/leveldb2/leveldb2_store_test.go
@@ -48,14 +48,14 @@ func TestCreateAndFind(t *testing.T) {
}
// checking one upper directory
- entries, _ := filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is/one"), "", false, 100)
+ entries, _, _ := filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is/one"), "", false, 100)
if len(entries) != 1 {
t.Errorf("list entries count: %v", len(entries))
return
}
// checking one upper directory
- entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100)
+ entries, _, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100)
if len(entries) != 1 {
t.Errorf("list entries count: %v", len(entries))
return
@@ -75,7 +75,7 @@ func TestEmptyRoot(t *testing.T) {
ctx := context.Background()
// checking one upper directory
- entries, err := filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100)
+ entries, _, err := filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100)
if err != nil {
t.Errorf("list entries: %v", err)
return
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go
index b904c1393..488967ec2 100644
--- a/weed/server/filer_grpc_server.go
+++ b/weed/server/filer_grpc_server.go
@@ -53,7 +53,7 @@ func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream file
lastFileName := req.StartFromFileName
includeLastFile := req.InclusiveStartFrom
for limit > 0 {
- entries, err := fs.filer.ListDirectoryEntries(stream.Context(), filer2.FullPath(req.Directory), lastFileName, includeLastFile, paginationLimit)
+ entries, expiredCount, err := fs.filer.ListDirectoryEntries(stream.Context(), filer2.FullPath(req.Directory), lastFileName, includeLastFile, paginationLimit)
if err != nil {
return err
@@ -92,7 +92,7 @@ func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream file
}
}
- if len(entries) < paginationLimit {
+ if len(entries)+expiredCount < paginationLimit {
break
}
diff --git a/weed/server/filer_grpc_server_rename.go b/weed/server/filer_grpc_server_rename.go
index 0669a26f1..3b2655585 100644
--- a/weed/server/filer_grpc_server_rename.go
+++ b/weed/server/filer_grpc_server_rename.go
@@ -68,7 +68,7 @@ func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent filer
includeLastFile := false
for {
- entries, err := fs.filer.ListDirectoryEntries(ctx, currentDirPath, lastFileName, includeLastFile, 1024)
+ entries, _, err := fs.filer.ListDirectoryEntries(ctx, currentDirPath, lastFileName, includeLastFile, 1024)
if err != nil {
return err
}
diff --git a/weed/server/filer_server_handlers_read_dir.go b/weed/server/filer_server_handlers_read_dir.go
index 87e864559..13f60eefe 100644
--- a/weed/server/filer_server_handlers_read_dir.go
+++ b/weed/server/filer_server_handlers_read_dir.go
@@ -32,7 +32,7 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque
lastFileName := r.FormValue("lastFileName")
- entries, err := fs.filer.ListDirectoryEntries(context.Background(), filer2.FullPath(path), lastFileName, false, limit)
+ entries, expiredCount, err := fs.filer.ListDirectoryEntries(context.Background(), filer2.FullPath(path), lastFileName, false, limit)
if err != nil {
glog.V(0).Infof("listDirectory %s %s %d: %s", path, lastFileName, limit, err)
@@ -40,7 +40,7 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque
return
}
- shouldDisplayLoadMore := len(entries) == limit
+ shouldDisplayLoadMore := len(entries)+expiredCount == limit
if path == "/" {
path = ""
}
diff --git a/weed/storage/volume.go b/weed/storage/volume.go
index 88a5db4c5..7da83de7a 100644
--- a/weed/storage/volume.go
+++ b/weed/storage/volume.go
@@ -180,9 +180,9 @@ func (v *Volume) expired(volumeSizeLimit uint64) bool {
if v.Ttl == nil || v.Ttl.Minutes() == 0 {
return false
}
- glog.V(1).Infof("now:%v lastModified:%v", time.Now().Unix(), v.lastModifiedTsSeconds)
+ glog.V(2).Infof("now:%v lastModified:%v", time.Now().Unix(), v.lastModifiedTsSeconds)
livedMinutes := (time.Now().Unix() - int64(v.lastModifiedTsSeconds)) / 60
- glog.V(1).Infof("ttl:%v lived:%v", v.Ttl, livedMinutes)
+ glog.V(2).Infof("ttl:%v lived:%v", v.Ttl, livedMinutes)
if int64(v.Ttl.Minutes()) < livedMinutes {
return true
}