diff options
Diffstat (limited to 'weed/storage/needle_read_write.go')
| -rw-r--r-- | weed/storage/needle_read_write.go | 238 |
1 files changed, 238 insertions, 0 deletions
diff --git a/weed/storage/needle_read_write.go b/weed/storage/needle_read_write.go new file mode 100644 index 000000000..1dac6d5c4 --- /dev/null +++ b/weed/storage/needle_read_write.go @@ -0,0 +1,238 @@ +package storage + +import ( + "errors" + "fmt" + "io" + "os" + "weed/util" +) + +const ( + FlagGzip = 0x01 + FlagHasName = 0x02 + FlagHasMime = 0x04 +) + +func (n *Needle) DiskSize() uint32 { + padding := NeedlePaddingSize - ((NeedleHeaderSize + n.Size + NeedleChecksumSize) % NeedlePaddingSize) + return NeedleHeaderSize + n.Size + padding + NeedleChecksumSize +} +func (n *Needle) Append(w io.Writer, version Version) (size uint32, err error) { + if s, ok := w.(io.Seeker); ok { + if end, e := s.Seek(0, 1); e == nil { + defer func(s io.Seeker, off int64) { + if err != nil { + if _, e = s.Seek(off, 0); e != nil { + fmt.Printf("Failed to seek back to %d with error: %s\n", w, off, e) + } + } + }(s, end) + } else { + err = fmt.Errorf("Cnnot Read Current Volume Position: %s", e) + return + } + } + switch version { + case Version1: + header := make([]byte, NeedleHeaderSize) + util.Uint32toBytes(header[0:4], n.Cookie) + util.Uint64toBytes(header[4:12], n.Id) + n.Size = uint32(len(n.Data)) + size = n.Size + util.Uint32toBytes(header[12:16], n.Size) + if _, err = w.Write(header); err != nil { + return + } + if _, err = w.Write(n.Data); err != nil { + return + } + padding := NeedlePaddingSize - ((NeedleHeaderSize + n.Size + NeedleChecksumSize) % NeedlePaddingSize) + util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value()) + _, err = w.Write(header[0 : NeedleChecksumSize+padding]) + return + case Version2: + header := make([]byte, NeedleHeaderSize) + util.Uint32toBytes(header[0:4], n.Cookie) + util.Uint64toBytes(header[4:12], n.Id) + n.DataSize, n.NameSize, n.MimeSize = uint32(len(n.Data)), uint8(len(n.Name)), uint8(len(n.Mime)) + if n.DataSize > 0 { + n.Size = 4 + n.DataSize + 1 + if n.HasName() { + n.Size = n.Size + 1 + uint32(n.NameSize) + } + if n.HasMime() { + n.Size = n.Size + 1 + uint32(n.MimeSize) + } + } + size = n.DataSize + util.Uint32toBytes(header[12:16], n.Size) + if _, err = w.Write(header); err != nil { + return + } + if n.DataSize > 0 { + util.Uint32toBytes(header[0:4], n.DataSize) + if _, err = w.Write(header[0:4]); err != nil { + return + } + if _, err = w.Write(n.Data); err != nil { + return + } + util.Uint8toBytes(header[0:1], n.Flags) + if _, err = w.Write(header[0:1]); err != nil { + return + } + } + if n.HasName() { + util.Uint8toBytes(header[0:1], n.NameSize) + if _, err = w.Write(header[0:1]); err != nil { + return + } + if _, err = w.Write(n.Name); err != nil { + return + } + } + if n.HasMime() { + util.Uint8toBytes(header[0:1], n.MimeSize) + if _, err = w.Write(header[0:1]); err != nil { + return + } + if _, err = w.Write(n.Mime); err != nil { + return + } + } + padding := NeedlePaddingSize - ((NeedleHeaderSize + n.Size + NeedleChecksumSize) % NeedlePaddingSize) + util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value()) + _, err = w.Write(header[0 : NeedleChecksumSize+padding]) + return n.DataSize, err + } + return 0, fmt.Errorf("Unsupported Version! (%d)", version) +} + +func (n *Needle) Read(r io.Reader, size uint32, version Version) (ret int, err error) { + switch version { + case Version1: + bytes := make([]byte, NeedleHeaderSize+size+NeedleChecksumSize) + if ret, err = r.Read(bytes); err != nil { + return + } + n.readNeedleHeader(bytes) + n.Data = bytes[NeedleHeaderSize : NeedleHeaderSize+size] + checksum := util.BytesToUint32(bytes[NeedleHeaderSize+size : NeedleHeaderSize+size+NeedleChecksumSize]) + if checksum != NewCRC(n.Data).Value() { + return 0, errors.New("CRC error! Data On Disk Corrupted!") + } + return + case Version2: + if size == 0 { + return 0, nil + } + bytes := make([]byte, NeedleHeaderSize+size+NeedleChecksumSize) + if ret, err = r.Read(bytes); err != nil { + return + } + if ret != int(NeedleHeaderSize+size+NeedleChecksumSize) { + return 0, errors.New("File Entry Not Found!") + } + n.readNeedleHeader(bytes) + if n.Size != size { + return 0, fmt.Errorf("File Entry Not Found! Needle %d Memory %d", n.Size, size) + } + n.readNeedleDataVersion2(bytes[NeedleHeaderSize : NeedleHeaderSize+int(n.Size)]) + checksum := util.BytesToUint32(bytes[NeedleHeaderSize+n.Size : NeedleHeaderSize+n.Size+NeedleChecksumSize]) + if checksum != NewCRC(n.Data).Value() { + return 0, errors.New("CRC error! Data On Disk Corrupted!") + } + return + } + return 0, fmt.Errorf("Unsupported Version! (%d)", version) +} +func (n *Needle) readNeedleHeader(bytes []byte) { + n.Cookie = util.BytesToUint32(bytes[0:4]) + n.Id = util.BytesToUint64(bytes[4:12]) + n.Size = util.BytesToUint32(bytes[12:NeedleHeaderSize]) +} +func (n *Needle) readNeedleDataVersion2(bytes []byte) { + index, lenBytes := 0, len(bytes) + if index < lenBytes { + n.DataSize = util.BytesToUint32(bytes[index : index+4]) + index = index + 4 + n.Data = bytes[index : index+int(n.DataSize)] + index = index + int(n.DataSize) + n.Flags = bytes[index] + index = index + 1 + } + if index < lenBytes && n.HasName() { + n.NameSize = uint8(bytes[index]) + index = index + 1 + n.Name = bytes[index : index+int(n.NameSize)] + index = index + int(n.NameSize) + } + if index < lenBytes && n.HasMime() { + n.MimeSize = uint8(bytes[index]) + index = index + 1 + n.Mime = bytes[index : index+int(n.MimeSize)] + } +} + +func ReadNeedleHeader(r *os.File, version Version) (n *Needle, bodyLength uint32, err error) { + n = new(Needle) + if version == Version1 || version == Version2 { + bytes := make([]byte, NeedleHeaderSize) + var count int + count, err = r.Read(bytes) + if count <= 0 || err != nil { + return nil, 0, err + } + n.readNeedleHeader(bytes) + padding := NeedlePaddingSize - ((n.Size + NeedleHeaderSize + NeedleChecksumSize) % NeedlePaddingSize) + bodyLength = n.Size + NeedleChecksumSize + padding + } + return +} + +//n should be a needle already read the header +//the input stream will read until next file entry +func (n *Needle) ReadNeedleBody(r *os.File, version Version, bodyLength uint32) (err error) { + if bodyLength <= 0 { + return nil + } + switch version { + case Version1: + bytes := make([]byte, bodyLength) + if _, err = r.Read(bytes); err != nil { + return + } + n.Data = bytes[:n.Size] + n.Checksum = NewCRC(n.Data) + case Version2: + bytes := make([]byte, bodyLength) + if _, err = r.Read(bytes); err != nil { + return + } + n.readNeedleDataVersion2(bytes[0:n.Size]) + n.Checksum = NewCRC(n.Data) + default: + err = fmt.Errorf("Unsupported Version! (%d)", version) + } + return +} + +func (n *Needle) IsGzipped() bool { + return n.Flags&FlagGzip > 0 +} +func (n *Needle) SetGzipped() { + n.Flags = n.Flags | FlagGzip +} +func (n *Needle) HasName() bool { + return n.Flags&FlagHasName > 0 +} +func (n *Needle) SetHasName() { + n.Flags = n.Flags | FlagHasName +} +func (n *Needle) HasMime() bool { + return n.Flags&FlagHasMime > 0 +} +func (n *Needle) SetHasMime() { + n.Flags = n.Flags | FlagHasMime +} |
