aboutsummaryrefslogtreecommitdiff
path: root/weed/filer/filer_buckets.go
blob: 4d4f4abc30a563a0bb412bc7374db0100e101653 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package filer

import (
	"context"
	"math"
	"sync"

	"github.com/chrislusf/seaweedfs/weed/glog"
	"github.com/chrislusf/seaweedfs/weed/util"
)

type BucketName string
type BucketOption struct {
	Name        BucketName
	Replication string
	fsync       bool
}
type FilerBuckets struct {
	dirBucketsPath string
	buckets        map[BucketName]*BucketOption
	sync.RWMutex
}

func (f *Filer) LoadBuckets() {

	f.buckets = &FilerBuckets{
		buckets: make(map[BucketName]*BucketOption),
	}

	limit := math.MaxInt32

	entries, err := f.ListDirectoryEntries(context.Background(), util.FullPath(f.DirBucketsPath), "", false, limit, "")

	if err != nil {
		glog.V(1).Infof("no buckets found: %v", err)
		return
	}

	shouldFsyncMap := make(map[string]bool)
	for _, bucket := range f.FsyncBuckets {
		shouldFsyncMap[bucket] = true
	}

	glog.V(1).Infof("buckets found: %d", len(entries))

	f.buckets.Lock()
	for _, entry := range entries {
		_, shouldFsnyc := shouldFsyncMap[entry.Name()]
		f.buckets.buckets[BucketName(entry.Name())] = &BucketOption{
			Name:        BucketName(entry.Name()),
			Replication: entry.Replication,
			fsync:       shouldFsnyc,
		}
	}
	f.buckets.Unlock()

}

func (f *Filer) ReadBucketOption(buketName string) (replication string, fsync bool) {

	f.buckets.RLock()
	defer f.buckets.RUnlock()

	option, found := f.buckets.buckets[BucketName(buketName)]

	if !found {
		return "", false
	}
	return option.Replication, option.fsync

}

func (f *Filer) isBucket(entry *Entry) bool {
	if !entry.IsDirectory() {
		return false
	}
	parent, dirName := entry.FullPath.DirAndName()
	if parent != f.DirBucketsPath {
		return false
	}

	f.buckets.RLock()
	defer f.buckets.RUnlock()

	_, found := f.buckets.buckets[BucketName(dirName)]

	return found

}

func (f *Filer) maybeAddBucket(entry *Entry) {
	if !entry.IsDirectory() {
		return
	}
	parent, dirName := entry.FullPath.DirAndName()
	if parent != f.DirBucketsPath {
		return
	}
	f.addBucket(dirName, &BucketOption{
		Name:        BucketName(dirName),
		Replication: entry.Replication,
	})
}

func (f *Filer) addBucket(buketName string, bucketOption *BucketOption) {

	f.buckets.Lock()
	defer f.buckets.Unlock()

	f.buckets.buckets[BucketName(buketName)] = bucketOption

}

func (f *Filer) deleteBucket(buketName string) {

	f.buckets.Lock()
	defer f.buckets.Unlock()

	delete(f.buckets.buckets, BucketName(buketName))

}