diff options
Diffstat (limited to 'weed/storage/volume_stream_write.go')
| -rw-r--r-- | weed/storage/volume_stream_write.go | 105 |
1 files changed, 0 insertions, 105 deletions
diff --git a/weed/storage/volume_stream_write.go b/weed/storage/volume_stream_write.go deleted file mode 100644 index 2496387ff..000000000 --- a/weed/storage/volume_stream_write.go +++ /dev/null @@ -1,105 +0,0 @@ -package storage - -import ( - "bufio" - "fmt" - "io" - "time" - - "github.com/seaweedfs/seaweedfs/weed/util" - - "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/storage/backend" - "github.com/seaweedfs/seaweedfs/weed/storage/needle" - . "github.com/seaweedfs/seaweedfs/weed/storage/types" -) - -func (v *Volume) StreamWrite(n *needle.Needle, data io.Reader, dataSize uint32) (err error) { - - v.dataFileAccessLock.Lock() - defer v.dataFileAccessLock.Unlock() - - df, ok := v.DataBackend.(*backend.DiskFile) - if !ok { - return fmt.Errorf("unexpected volume backend") - } - offset, _, _ := v.DataBackend.GetStat() - - header := make([]byte, NeedleHeaderSize+TimestampSize) // adding timestamp to reuse it and avoid extra allocation - CookieToBytes(header[0:CookieSize], n.Cookie) - NeedleIdToBytes(header[CookieSize:CookieSize+NeedleIdSize], n.Id) - n.Size = 4 + Size(dataSize) + 1 - SizeToBytes(header[CookieSize+NeedleIdSize:CookieSize+NeedleIdSize+SizeSize], n.Size) - - n.DataSize = dataSize - - // needle header - df.Write(header[0:NeedleHeaderSize]) - - // data size and data - util.Uint32toBytes(header[0:4], n.DataSize) - df.Write(header[0:4]) - // write and calculate CRC - crcWriter := needle.NewCRCwriter(df) - io.Copy(crcWriter, io.LimitReader(data, int64(dataSize))) - - // flags - util.Uint8toBytes(header[0:1], n.Flags) - df.Write(header[0:1]) - - // data checksum - util.Uint32toBytes(header[0:needle.NeedleChecksumSize], crcWriter.Sum()) - // write timestamp, padding - n.AppendAtNs = uint64(time.Now().UnixNano()) - util.Uint64toBytes(header[needle.NeedleChecksumSize:needle.NeedleChecksumSize+TimestampSize], n.AppendAtNs) - padding := needle.PaddingLength(n.Size, needle.Version3) - df.Write(header[0 : needle.NeedleChecksumSize+TimestampSize+padding]) - - // add to needle map - if err = v.nm.Put(n.Id, ToOffset(int64(offset)), n.Size); err != nil { - glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err) - } - return -} - -func (v *Volume) StreamRead(n *needle.Needle, writer io.Writer) (err error) { - - v.dataFileAccessLock.Lock() - defer v.dataFileAccessLock.Unlock() - - nv, ok := v.nm.Get(n.Id) - if !ok || nv.Offset.IsZero() { - return ErrorNotFound - } - - sr := &StreamReader{ - readerAt: v.DataBackend, - offset: nv.Offset.ToActualOffset(), - } - bufReader := bufio.NewReader(sr) - bufReader.Discard(NeedleHeaderSize) - sizeBuf := make([]byte, 4) - bufReader.Read(sizeBuf) - if _, err = writer.Write(sizeBuf); err != nil { - return err - } - dataSize := util.BytesToUint32(sizeBuf) - - _, err = io.Copy(writer, io.LimitReader(bufReader, int64(dataSize))) - - return -} - -type StreamReader struct { - offset int64 - readerAt io.ReaderAt -} - -func (sr *StreamReader) Read(p []byte) (n int, err error) { - n, err = sr.readerAt.ReadAt(p, sr.offset) - if err != nil { - return - } - sr.offset += int64(n) - return -} |
