1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
|
package mount
import (
"os"
"sync"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
)
type FileHandleId uint64
var IsDebugFileReadWrite = false
type FileHandle struct {
fh FileHandleId
counter int64
entry *LockedEntry
entryLock sync.RWMutex
entryChunkGroup *filer.ChunkGroup
inode uint64
wfs *WFS
// cache file has been written to
dirtyMetadata bool
dirtyPages *PageWriter
reader *filer.ChunkReadAt
contentType string
isDeleted bool
// RDMA chunk offset cache for performance optimization
chunkOffsetCache []int64
chunkCacheValid bool
chunkCacheLock sync.RWMutex
// for debugging
mirrorFile *os.File
}
func newFileHandle(wfs *WFS, handleId FileHandleId, inode uint64, entry *filer_pb.Entry) *FileHandle {
fh := &FileHandle{
fh: handleId,
counter: 1,
inode: inode,
wfs: wfs,
}
// dirtyPages: newContinuousDirtyPages(file, writeOnly),
fh.dirtyPages = newPageWriter(fh, wfs.option.ChunkSizeLimit)
fh.entry = &LockedEntry{
Entry: entry,
}
if entry != nil {
fh.SetEntry(entry)
}
if IsDebugFileReadWrite {
var err error
fh.mirrorFile, err = os.OpenFile("/tmp/sw/"+entry.Name, os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
println("failed to create mirror:", err.Error())
}
}
return fh
}
func (fh *FileHandle) FullPath() util.FullPath {
fp, _ := fh.wfs.inodeToPath.GetPath(fh.inode)
return fp
}
func (fh *FileHandle) GetEntry() *LockedEntry {
return fh.entry
}
func (fh *FileHandle) SetEntry(entry *filer_pb.Entry) {
if entry != nil {
fileSize := filer.FileSize(entry)
entry.Attributes.FileSize = fileSize
var resolveManifestErr error
fh.entryChunkGroup, resolveManifestErr = filer.NewChunkGroup(fh.wfs.LookupFn(), fh.wfs.chunkCache, entry.Chunks)
if resolveManifestErr != nil {
glog.Warningf("failed to resolve manifest chunks in %+v", entry)
}
} else {
glog.Fatalf("setting file handle entry to nil")
}
fh.entry.SetEntry(entry)
// Invalidate chunk offset cache since chunks may have changed
fh.invalidateChunkCache()
}
func (fh *FileHandle) UpdateEntry(fn func(entry *filer_pb.Entry)) *filer_pb.Entry {
result := fh.entry.UpdateEntry(fn)
// Invalidate chunk offset cache since entry may have been modified
fh.invalidateChunkCache()
return result
}
func (fh *FileHandle) AddChunks(chunks []*filer_pb.FileChunk) {
fh.entry.AppendChunks(chunks)
// Invalidate chunk offset cache since new chunks were added
fh.invalidateChunkCache()
}
func (fh *FileHandle) ReleaseHandle() {
fhActiveLock := fh.wfs.fhLockTable.AcquireLock("ReleaseHandle", fh.fh, util.ExclusiveLock)
defer fh.wfs.fhLockTable.ReleaseLock(fh.fh, fhActiveLock)
fh.dirtyPages.Destroy()
if IsDebugFileReadWrite {
fh.mirrorFile.Close()
}
}
func lessThan(a, b *filer_pb.FileChunk) bool {
if a.ModifiedTsNs == b.ModifiedTsNs {
return a.Fid.FileKey < b.Fid.FileKey
}
return a.ModifiedTsNs < b.ModifiedTsNs
}
// getCumulativeOffsets returns cached cumulative offsets for chunks, computing them if necessary
func (fh *FileHandle) getCumulativeOffsets(chunks []*filer_pb.FileChunk) []int64 {
fh.chunkCacheLock.RLock()
if fh.chunkCacheValid && len(fh.chunkOffsetCache) == len(chunks)+1 {
// Cache is valid and matches current chunk count
result := make([]int64, len(fh.chunkOffsetCache))
copy(result, fh.chunkOffsetCache)
fh.chunkCacheLock.RUnlock()
return result
}
fh.chunkCacheLock.RUnlock()
// Need to compute/recompute cache
fh.chunkCacheLock.Lock()
defer fh.chunkCacheLock.Unlock()
// Double-check in case another goroutine computed it while we waited for the lock
if fh.chunkCacheValid && len(fh.chunkOffsetCache) == len(chunks)+1 {
result := make([]int64, len(fh.chunkOffsetCache))
copy(result, fh.chunkOffsetCache)
return result
}
// Compute cumulative offsets
cumulativeOffsets := make([]int64, len(chunks)+1)
for i, chunk := range chunks {
cumulativeOffsets[i+1] = cumulativeOffsets[i] + int64(chunk.Size)
}
// Cache the result
fh.chunkOffsetCache = make([]int64, len(cumulativeOffsets))
copy(fh.chunkOffsetCache, cumulativeOffsets)
fh.chunkCacheValid = true
return cumulativeOffsets
}
// invalidateChunkCache invalidates the chunk offset cache when chunks are modified
func (fh *FileHandle) invalidateChunkCache() {
fh.chunkCacheLock.Lock()
fh.chunkCacheValid = false
fh.chunkOffsetCache = nil
fh.chunkCacheLock.Unlock()
}
|