aboutsummaryrefslogtreecommitdiff
path: root/weed/sftpd/sftp_file_reader.go
blob: 791f2d581099a2e7491b69cde6ae31e03700e7ce (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package sftpd

import (
	"fmt"
	"io"
	"sync"

	"github.com/seaweedfs/seaweedfs/weed/filer"
	filer_pb "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
	"github.com/seaweedfs/seaweedfs/weed/sftpd/utils"
)

type SeaweedFileReaderAt struct {
	fs         *SftpServer
	entry      *filer_pb.Entry
	reader     io.ReadSeeker
	mu         sync.Mutex
	bufferSize int
	cache      *utils.LruCache
	fileSize   int64
}

func NewSeaweedFileReaderAt(fs *SftpServer, entry *filer_pb.Entry) *SeaweedFileReaderAt {
	return &SeaweedFileReaderAt{
		fs:         fs,
		entry:      entry,
		bufferSize: 5 * 1024 * 1024,       // 5MB
		cache:      utils.NewLRUCache(10), // Max 10 chunks = ~50MB
		fileSize:   int64(entry.Attributes.FileSize),
	}
}

func (ra *SeaweedFileReaderAt) ReadAt(p []byte, off int64) (n int, err error) {
	ra.mu.Lock()
	defer ra.mu.Unlock()

	if off >= ra.fileSize {
		return 0, io.EOF
	}

	remaining := len(p)
	readOffset := off
	totalRead := 0

	for remaining > 0 && readOffset < ra.fileSize {
		bufferKey := (readOffset / int64(ra.bufferSize)) * int64(ra.bufferSize)
		bufferOffset := int(readOffset - bufferKey)

		buffer, ok := ra.cache.Get(bufferKey)
		if !ok {
			readSize := ra.bufferSize
			if bufferKey+int64(readSize) > ra.fileSize {
				readSize = int(ra.fileSize - bufferKey)
			}

			if ra.reader == nil {
				r := filer.NewFileReader(ra.fs, ra.entry)
				if rs, ok := r.(io.ReadSeeker); ok {
					ra.reader = rs
				} else {
					return 0, fmt.Errorf("reader is not seekable")
				}
			}

			if _, err := ra.reader.Seek(bufferKey, io.SeekStart); err != nil {
				return 0, fmt.Errorf("seek error: %w", err)
			}

			buffer = make([]byte, readSize)
			readBytes, err := io.ReadFull(ra.reader, buffer)
			if err != nil && err != io.ErrUnexpectedEOF {
				return 0, fmt.Errorf("read error: %w", err)
			}
			buffer = buffer[:readBytes]
			ra.cache.Put(bufferKey, buffer)
		}

		toCopy := len(buffer) - bufferOffset
		if toCopy > remaining {
			toCopy = remaining
		}
		if toCopy <= 0 {
			break
		}

		copy(p[totalRead:], buffer[bufferOffset:bufferOffset+toCopy])
		totalRead += toCopy
		readOffset += int64(toCopy)
		remaining -= toCopy
	}

	if totalRead == 0 {
		return 0, io.EOF
	}
	if totalRead < len(p) {
		return totalRead, io.EOF
	}
	return totalRead, nil
}