1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
|
package filer2
import (
"context"
"time"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/util"
)
type FilerStore interface {
// GetName gets the name to locate the configuration in filer.toml file
GetName() string
// Initialize initializes the file store
Initialize(configuration util.Configuration, prefix string) error
InsertEntry(context.Context, *Entry) error
UpdateEntry(context.Context, *Entry) (err error)
// err == filer2.ErrNotFound if not found
FindEntry(context.Context, util.FullPath) (entry *Entry, err error)
DeleteEntry(context.Context, util.FullPath) (err error)
DeleteFolderChildren(context.Context, util.FullPath) (err error)
ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error)
BeginTransaction(ctx context.Context) (context.Context, error)
CommitTransaction(ctx context.Context) error
RollbackTransaction(ctx context.Context) error
Shutdown()
}
type FilerLocalStore interface {
UpdateOffset(filer string, lastTsNs int64) error
ReadOffset(filer string) (lastTsNs int64, err error)
}
type FilerStoreWrapper struct {
ActualStore FilerStore
}
func NewFilerStoreWrapper(store FilerStore) *FilerStoreWrapper {
if innerStore, ok := store.(*FilerStoreWrapper); ok {
return innerStore
}
return &FilerStoreWrapper{
ActualStore: store,
}
}
func (fsw *FilerStoreWrapper) GetName() string {
return fsw.ActualStore.GetName()
}
func (fsw *FilerStoreWrapper) Initialize(configuration util.Configuration, prefix string) error {
return fsw.ActualStore.Initialize(configuration, prefix)
}
func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) error {
stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "insert").Inc()
start := time.Now()
defer func() {
stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "insert").Observe(time.Since(start).Seconds())
}()
filer_pb.BeforeEntrySerialization(entry.Chunks)
return fsw.ActualStore.InsertEntry(ctx, entry)
}
func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) error {
stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "update").Inc()
start := time.Now()
defer func() {
stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "update").Observe(time.Since(start).Seconds())
}()
filer_pb.BeforeEntrySerialization(entry.Chunks)
return fsw.ActualStore.UpdateEntry(ctx, entry)
}
func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp util.FullPath) (entry *Entry, err error) {
stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "find").Inc()
start := time.Now()
defer func() {
stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "find").Observe(time.Since(start).Seconds())
}()
entry, err = fsw.ActualStore.FindEntry(ctx, fp)
if err != nil {
return nil, err
}
filer_pb.AfterEntryDeserialization(entry.Chunks)
return
}
func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) {
stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "delete").Inc()
start := time.Now()
defer func() {
stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "delete").Observe(time.Since(start).Seconds())
}()
return fsw.ActualStore.DeleteEntry(ctx, fp)
}
func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) {
stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "deleteFolderChildren").Inc()
start := time.Now()
defer func() {
stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "deleteFolderChildren").Observe(time.Since(start).Seconds())
}()
return fsw.ActualStore.DeleteFolderChildren(ctx, fp)
}
func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) {
stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "list").Inc()
start := time.Now()
defer func() {
stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "list").Observe(time.Since(start).Seconds())
}()
entries, err := fsw.ActualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
if err != nil {
return nil, err
}
for _, entry := range entries {
filer_pb.AfterEntryDeserialization(entry.Chunks)
}
return entries, err
}
func (fsw *FilerStoreWrapper) BeginTransaction(ctx context.Context) (context.Context, error) {
return fsw.ActualStore.BeginTransaction(ctx)
}
func (fsw *FilerStoreWrapper) CommitTransaction(ctx context.Context) error {
return fsw.ActualStore.CommitTransaction(ctx)
}
func (fsw *FilerStoreWrapper) RollbackTransaction(ctx context.Context) error {
return fsw.ActualStore.RollbackTransaction(ctx)
}
func (fsw *FilerStoreWrapper) Shutdown() {
fsw.ActualStore.Shutdown()
}
|