aboutsummaryrefslogtreecommitdiff
path: root/weed/mount/filehandle.go
blob: d3836754f43bec6a6ea96c1c2e7dce94e6ede71a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
package mount

import (
	"os"
	"sync"

	"github.com/seaweedfs/seaweedfs/weed/filer"
	"github.com/seaweedfs/seaweedfs/weed/glog"
	"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
	"github.com/seaweedfs/seaweedfs/weed/util"
)

type FileHandleId uint64

var IsDebugFileReadWrite = false

type FileHandle struct {
	fh              FileHandleId
	counter         int64
	entry           *LockedEntry
	entryLock       sync.RWMutex
	entryChunkGroup *filer.ChunkGroup
	inode           uint64
	wfs             *WFS

	// cache file has been written to
	dirtyMetadata bool
	dirtyPages    *PageWriter
	reader        *filer.ChunkReadAt
	contentType   string

	isDeleted bool

	// RDMA chunk offset cache for performance optimization
	chunkOffsetCache []int64
	chunkCacheValid  bool
	chunkCacheLock   sync.RWMutex

	// for debugging
	mirrorFile *os.File
}

func newFileHandle(wfs *WFS, handleId FileHandleId, inode uint64, entry *filer_pb.Entry) *FileHandle {
	fh := &FileHandle{
		fh:      handleId,
		counter: 1,
		inode:   inode,
		wfs:     wfs,
	}
	// dirtyPages: newContinuousDirtyPages(file, writeOnly),
	fh.dirtyPages = newPageWriter(fh, wfs.option.ChunkSizeLimit)
	fh.entry = &LockedEntry{
		Entry: entry,
	}
	if entry != nil {
		fh.SetEntry(entry)
	}

	if IsDebugFileReadWrite {
		var err error
		fh.mirrorFile, err = os.OpenFile("/tmp/sw/"+entry.Name, os.O_RDWR|os.O_CREATE, 0600)
		if err != nil {
			println("failed to create mirror:", err.Error())
		}
	}

	return fh
}

func (fh *FileHandle) FullPath() util.FullPath {
	fp, _ := fh.wfs.inodeToPath.GetPath(fh.inode)
	return fp
}

func (fh *FileHandle) GetEntry() *LockedEntry {
	return fh.entry
}

func (fh *FileHandle) SetEntry(entry *filer_pb.Entry) {
	if entry != nil {
		fileSize := filer.FileSize(entry)
		entry.Attributes.FileSize = fileSize
		var resolveManifestErr error
		fh.entryChunkGroup, resolveManifestErr = filer.NewChunkGroup(fh.wfs.LookupFn(), fh.wfs.chunkCache, entry.Chunks)
		if resolveManifestErr != nil {
			glog.Warningf("failed to resolve manifest chunks in %+v", entry)
		}
	} else {
		glog.Fatalf("setting file handle entry to nil")
	}
	fh.entry.SetEntry(entry)
	
	// Invalidate chunk offset cache since chunks may have changed
	fh.invalidateChunkCache()
}

func (fh *FileHandle) UpdateEntry(fn func(entry *filer_pb.Entry)) *filer_pb.Entry {
	result := fh.entry.UpdateEntry(fn)
	
	// Invalidate chunk offset cache since entry may have been modified
	fh.invalidateChunkCache()
	
	return result
}

func (fh *FileHandle) AddChunks(chunks []*filer_pb.FileChunk) {
	fh.entry.AppendChunks(chunks)
	
	// Invalidate chunk offset cache since new chunks were added
	fh.invalidateChunkCache()
}

func (fh *FileHandle) ReleaseHandle() {

	fhActiveLock := fh.wfs.fhLockTable.AcquireLock("ReleaseHandle", fh.fh, util.ExclusiveLock)
	defer fh.wfs.fhLockTable.ReleaseLock(fh.fh, fhActiveLock)

	fh.dirtyPages.Destroy()
	if IsDebugFileReadWrite {
		fh.mirrorFile.Close()
	}
}

func lessThan(a, b *filer_pb.FileChunk) bool {
	if a.ModifiedTsNs == b.ModifiedTsNs {
		return a.Fid.FileKey < b.Fid.FileKey
	}
	return a.ModifiedTsNs < b.ModifiedTsNs
}

// getCumulativeOffsets returns cached cumulative offsets for chunks, computing them if necessary
func (fh *FileHandle) getCumulativeOffsets(chunks []*filer_pb.FileChunk) []int64 {
	fh.chunkCacheLock.RLock()
	if fh.chunkCacheValid && len(fh.chunkOffsetCache) == len(chunks)+1 {
		// Cache is valid and matches current chunk count
		result := make([]int64, len(fh.chunkOffsetCache))
		copy(result, fh.chunkOffsetCache)
		fh.chunkCacheLock.RUnlock()
		return result
	}
	fh.chunkCacheLock.RUnlock()

	// Need to compute/recompute cache
	fh.chunkCacheLock.Lock()
	defer fh.chunkCacheLock.Unlock()

	// Double-check in case another goroutine computed it while we waited for the lock
	if fh.chunkCacheValid && len(fh.chunkOffsetCache) == len(chunks)+1 {
		result := make([]int64, len(fh.chunkOffsetCache))
		copy(result, fh.chunkOffsetCache)
		return result
	}

	// Compute cumulative offsets
	cumulativeOffsets := make([]int64, len(chunks)+1)
	for i, chunk := range chunks {
		cumulativeOffsets[i+1] = cumulativeOffsets[i] + int64(chunk.Size)
	}

	// Cache the result
	fh.chunkOffsetCache = make([]int64, len(cumulativeOffsets))
	copy(fh.chunkOffsetCache, cumulativeOffsets)
	fh.chunkCacheValid = true

	return cumulativeOffsets
}

// invalidateChunkCache invalidates the chunk offset cache when chunks are modified
func (fh *FileHandle) invalidateChunkCache() {
	fh.chunkCacheLock.Lock()
	fh.chunkCacheValid = false
	fh.chunkOffsetCache = nil
	fh.chunkCacheLock.Unlock()
}