aboutsummaryrefslogtreecommitdiff
path: root/weed/storage/volume_stream_write.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/storage/volume_stream_write.go')
-rw-r--r--weed/storage/volume_stream_write.go106
1 files changed, 106 insertions, 0 deletions
diff --git a/weed/storage/volume_stream_write.go b/weed/storage/volume_stream_write.go
new file mode 100644
index 000000000..f619de30b
--- /dev/null
+++ b/weed/storage/volume_stream_write.go
@@ -0,0 +1,106 @@
+package storage
+
+import (
+ "bufio"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "io"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ . "github.com/chrislusf/seaweedfs/weed/storage/types"
+)
+
+func (v *Volume) StreamWrite(n *needle.Needle, data io.Reader, dataSize uint32) (err error) {
+
+ v.dataFileAccessLock.Lock()
+ defer v.dataFileAccessLock.Unlock()
+
+ df, ok := v.DataBackend.(*backend.DiskFile)
+ if !ok {
+ return fmt.Errorf("unexpected volume backend")
+ }
+ offset, _, _ := v.DataBackend.GetStat()
+
+ header := make([]byte, NeedleHeaderSize+TimestampSize) // adding timestamp to reuse it and avoid extra allocation
+ CookieToBytes(header[0:CookieSize], n.Cookie)
+ NeedleIdToBytes(header[CookieSize:CookieSize+NeedleIdSize], n.Id)
+ n.Size = 4 + Size(dataSize) + 1
+ SizeToBytes(header[CookieSize+NeedleIdSize:CookieSize+NeedleIdSize+SizeSize], n.Size)
+
+ n.DataSize = dataSize
+
+ // needle header
+ df.Write(header[0:NeedleHeaderSize])
+
+ // data size and data
+ util.Uint32toBytes(header[0:4], n.DataSize)
+ df.Write(header[0:4])
+ // write and calculate CRC
+ crcWriter := needle.NewCRCwriter(df)
+ io.Copy(crcWriter, io.LimitReader(data, int64(dataSize)))
+
+ // flags
+ util.Uint8toBytes(header[0:1], n.Flags)
+ df.Write(header[0:1])
+
+ // data checksum
+ util.Uint32toBytes(header[0:needle.NeedleChecksumSize], crcWriter.Sum())
+ df.Write(header[0:needle.NeedleChecksumSize])
+
+ // write timestamp, padding
+ n.AppendAtNs = uint64(time.Now().UnixNano())
+ util.Uint64toBytes(header[needle.NeedleChecksumSize:needle.NeedleChecksumSize+TimestampSize], n.AppendAtNs)
+ padding := needle.PaddingLength(n.Size, needle.Version3)
+ df.Write(header[0 : needle.NeedleChecksumSize+TimestampSize+padding])
+
+ // add to needle map
+ if err = v.nm.Put(n.Id, ToOffset(int64(offset)), n.Size); err != nil {
+ glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err)
+ }
+ return
+}
+
+func (v *Volume) StreamRead(n *needle.Needle, writer io.Writer) (err error) {
+
+ v.dataFileAccessLock.Lock()
+ defer v.dataFileAccessLock.Unlock()
+
+ nv, ok := v.nm.Get(n.Id)
+ if !ok || nv.Offset.IsZero() {
+ return ErrorNotFound
+ }
+
+ sr := &StreamReader{
+ readerAt: v.DataBackend,
+ offset: nv.Offset.ToActualOffset(),
+ }
+ bufReader := bufio.NewReader(sr)
+ bufReader.Discard(NeedleHeaderSize)
+ sizeBuf := make([]byte, 4)
+ bufReader.Read(sizeBuf)
+ if _, err = writer.Write(sizeBuf); err != nil {
+ return err
+ }
+ dataSize := util.BytesToUint32(sizeBuf)
+
+ _, err = io.Copy(writer, io.LimitReader(bufReader, int64(dataSize)))
+
+ return
+}
+
+type StreamReader struct {
+ offset int64
+ readerAt io.ReaderAt
+}
+
+func (sr *StreamReader) Read(p []byte) (n int, err error) {
+ n, err = sr.readerAt.ReadAt(p, sr.offset)
+ if err != nil {
+ return
+ }
+ sr.offset += int64(n)
+ return
+}