aboutsummaryrefslogtreecommitdiff
path: root/go/operation/chunked_file.go
blob: 9537651496139b1b9645d7a657bc3071260b8491 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package operation

import (
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"net/http"
	"sort"

	"github.com/chrislusf/seaweedfs/go/util"
)

var ErrOutOfRange = errors.New("Out of Range")

type ChunkInfo struct {
	Fid    string `json:"fid,omitempty"`
	Offset int64  `json:"offset,omitempty"`
	Size   int64  `json:"size,omitempty"`
}

type ChunkList []*ChunkInfo

type ChunkedFile struct {
	Name   string    `json:"name,omitempty"`
	Mime   string    `json:"mime,omitempty"`
	Size   int64     `json:"size,omitempty"`
	Chunks ChunkList `json:"chunks,omitempty"`

	master string `json:"-"`
}

func (s ChunkList) Len() int           { return len(s) }
func (s ChunkList) Less(i, j int) bool { return s[i].Offset < s[j].Offset }
func (s ChunkList) Swap(i, j int)      { s[i], s[j] = s[j], s[i] }

func NewChunkedNeedle(buffer []byte, master string) (*ChunkedFile, error) {
	c := ChunkedFile{}

	if e := json.Unmarshal(buffer, c); e != nil {
		return nil, e
	}
	sort.Sort(c.Chunks)
	c.master = master
	return &c, nil
}

func (c *ChunkedFile) Marshal() ([]byte, error) {
	return json.Marshal(c)
}

func copyChunk(fileUrl string, w io.Writer, startOffset, size int64) (written int64, e error) {
	req, err := http.NewRequest("GET", fileUrl, nil)
	if err != nil {
		return written, err
	}
	if startOffset > 0 {
		req.Header.Set("Range", fmt.Sprintf("bytes=%d-", startOffset))
	}

	resp, err := util.Do(req)
	if err != nil {
		return written, err
	}
	defer resp.Body.Close()
	if startOffset > 0 && resp.StatusCode != 206 {
		return written, fmt.Errorf("Cannot Read Needle Position: %d [%s]", startOffset, fileUrl)
	}

	if size > 0 {
		return io.CopyN(w, resp.Body, size)
	} else {
		return io.Copy(w, resp.Body)
	}
}

func (c *ChunkedFile) WriteBuffer(w io.Writer, offset, size int64) (written int64, e error) {
	if offset >= c.Size || offset+size > c.Size {
		return written, ErrOutOfRange
	}
	chunkIndex := -1
	chunkStartOffset := int64(0)
	for i, ci := range c.Chunks {
		if offset >= ci.Offset && offset < ci.Offset+ci.Size {
			chunkIndex = i
			chunkStartOffset = offset - ci.Offset
			break
		}
	}
	if chunkIndex < 0 {
		return written, ErrOutOfRange
	}
	//preload next chunk?
	for ; chunkIndex < c.Chunks.Len(); chunkIndex++ {
		ci := c.Chunks[chunkIndex]
		fileUrl, lookupError := LookupFileId(c.master, ci.Fid)
		if lookupError != nil {
			return written, lookupError
		}
		rsize := int64(0)
		if size > 0 {
			rsize = size - written
		}
		if n, e := copyChunk(fileUrl, w, chunkStartOffset, rsize); e != nil {
			return written, e
		} else {
			written += n
		}

		if size > 0 && written >= size {
			break
		}
		chunkStartOffset = 0
	}

	return written, nil
}

func (c *ChunkedFile) DeleteHelper() error {
	//TODO Delete all chunks
	return nil
}

func (c *ChunkedFile) StoredHelper() error {
	//TODO
	return nil
}