aboutsummaryrefslogtreecommitdiff
path: root/weed/mount/filehandle.go
blob: b7c0ca255d7ae9af71a1ad304ab0a854442a83fd (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package mount

import (
	"github.com/seaweedfs/seaweedfs/weed/filer"
	"github.com/seaweedfs/seaweedfs/weed/glog"
	"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
	"github.com/seaweedfs/seaweedfs/weed/util"
	"golang.org/x/exp/slices"
	"golang.org/x/sync/semaphore"
	"math"
	"sync"
)

type FileHandleId uint64

type FileHandle struct {
	fh        FileHandleId
	counter   int64
	entry     *LockedEntry
	entryLock sync.Mutex
	inode     uint64
	wfs       *WFS

	// cache file has been written to
	dirtyMetadata  bool
	dirtyPages     *PageWriter
	entryViewCache []filer.VisibleInterval
	reader         *filer.ChunkReadAt
	contentType    string
	handle         uint64
	orderedMutex   *semaphore.Weighted

	isDeleted bool
}

func newFileHandle(wfs *WFS, handleId FileHandleId, inode uint64, entry *filer_pb.Entry) *FileHandle {
	fh := &FileHandle{
		fh:           handleId,
		counter:      1,
		inode:        inode,
		wfs:          wfs,
		orderedMutex: semaphore.NewWeighted(int64(math.MaxInt64)),
	}
	// dirtyPages: newContinuousDirtyPages(file, writeOnly),
	fh.dirtyPages = newPageWriter(fh, wfs.option.ChunkSizeLimit)
	if entry != nil {
		entry.Attributes.FileSize = filer.FileSize(entry)
	}
	fh.entry = &LockedEntry{
		Entry: entry,
	}

	return fh
}

func (fh *FileHandle) FullPath() util.FullPath {
	fp, _ := fh.wfs.inodeToPath.GetPath(fh.inode)
	return fp
}

func (fh *FileHandle) GetEntry() *filer_pb.Entry {
	return fh.entry.GetEntry()
}

func (fh *FileHandle) SetEntry(entry *filer_pb.Entry) {
	fh.entry.SetEntry(entry)
}

func (fh *FileHandle) UpdateEntry(fn func(entry *filer_pb.Entry)) *filer_pb.Entry {
	return fh.entry.UpdateEntry(fn)
}

func (fh *FileHandle) AddChunks(chunks []*filer_pb.FileChunk) {
	//fh.entryLock.Lock()
	//defer fh.entryLock.Unlock()

	if fh.entry == nil {
		return
	}

	// find the earliest incoming chunk
	newChunks := chunks
	earliestChunk := newChunks[0]
	for i := 1; i < len(newChunks); i++ {
		if lessThan(earliestChunk, newChunks[i]) {
			earliestChunk = newChunks[i]
		}
	}

	// pick out-of-order chunks from existing chunks
	for _, chunk := range fh.entry.GetChunks() {
		if lessThan(earliestChunk, chunk) {
			chunks = append(chunks, chunk)
		}
	}

	// sort incoming chunks
	slices.SortFunc(chunks, func(a, b *filer_pb.FileChunk) bool {
		return lessThan(a, b)
	})

	glog.V(4).Infof("%s existing %d chunks adds %d more", fh.FullPath(), len(fh.entry.GetChunks()), len(chunks))

	fh.entry.AppendChunks(newChunks)
	fh.entryViewCache = nil
}

func (fh *FileHandle) CloseReader() {
	if fh.reader != nil {
		_ = fh.reader.Close()
		fh.reader = nil
	}
}

func (fh *FileHandle) Release() {
	fh.dirtyPages.Destroy()
	fh.CloseReader()
}

func lessThan(a, b *filer_pb.FileChunk) bool {
	if a.ModifiedTsNs == b.ModifiedTsNs {
		return a.Fid.FileKey < b.Fid.FileKey
	}
	return a.ModifiedTsNs < b.ModifiedTsNs
}