aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-03-05 02:29:38 -0800
committerChris Lu <chris.lu@gmail.com>2021-03-05 02:29:38 -0800
commit400de380f48c44c7700fdf7e8f247bf856662c10 (patch)
tree2675abb8d23a4b185b9c6279f66971f5605f1d84
parent2e89c8c9aee141e0ca471644639c970ebaf1dcaa (diff)
downloadseaweedfs-400de380f48c44c7700fdf7e8f247bf856662c10.tar.xz
seaweedfs-400de380f48c44c7700fdf7e8f247bf856662c10.zip
volume server: support tcp direct put/get/delete
-rw-r--r--weed/command/volume.go22
-rw-r--r--weed/server/volume_server_tcp_handlers_write.go133
-rw-r--r--weed/storage/backend/disk_file.go2
-rw-r--r--weed/storage/needle/crc.go24
-rw-r--r--weed/storage/volume_stream_write.go106
-rw-r--r--weed/storage/volume_vacuum.go2
6 files changed, 287 insertions, 2 deletions
diff --git a/weed/command/volume.go b/weed/command/volume.go
index 659c93d96..cf162a732 100644
--- a/weed/command/volume.go
+++ b/weed/command/volume.go
@@ -251,6 +251,9 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
}
}
+ // starting tcp server
+ go v.startTcpService(volumeServer)
+
// starting the cluster http server
clusterHttpServer := v.startClusterHttpService(volumeMux)
@@ -368,3 +371,22 @@ func (v VolumeServerOptions) startClusterHttpService(handler http.Handler) httpd
}()
return clusterHttpServer
}
+
+func (v VolumeServerOptions) startTcpService(volumeServer *weed_server.VolumeServer) {
+ listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port+20000)
+ glog.V(0).Infoln("Start Seaweed volume server", util.Version(), "tcp at", listeningAddress)
+ listener, e := util.NewListener(listeningAddress, 0)
+ if e != nil {
+ glog.Fatalf("Volume server listener error on %s:%v", listeningAddress, e)
+ }
+ defer listener.Close()
+
+ for {
+ c, err := listener.Accept()
+ if err != nil {
+ fmt.Println(err)
+ return
+ }
+ go volumeServer.HandleTcpConnection(c)
+ }
+}
diff --git a/weed/server/volume_server_tcp_handlers_write.go b/weed/server/volume_server_tcp_handlers_write.go
new file mode 100644
index 000000000..7c2f1a77f
--- /dev/null
+++ b/weed/server/volume_server_tcp_handlers_write.go
@@ -0,0 +1,133 @@
+package weed_server
+
+import (
+ "bufio"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "net"
+ "strings"
+)
+
+func (vs *VolumeServer) HandleTcpConnection(c net.Conn) {
+ defer c.Close()
+
+ glog.V(0).Infof("Serving writes from %s", c.RemoteAddr().String())
+
+ bufReader := bufio.NewReaderSize(c, 1024*1024)
+ bufWriter := bufio.NewWriterSize(c, 1024*1024)
+
+ for {
+ cmd, err := bufReader.ReadString('\n')
+ if err != nil {
+ glog.Errorf("read command from %s: %v", c.RemoteAddr().String(), err)
+ return
+ }
+ switch cmd[0] {
+ case '+':
+ fileId := cmd[1:]
+ err = vs.handleTcpPut(fileId, bufReader)
+ if err == nil {
+ bufWriter.Write([]byte("+OK\n"))
+ } else {
+ bufWriter.Write([]byte("-ERR " + string(err.Error()) + "\n"))
+ }
+ case '-':
+ fileId := cmd[1:]
+ err = vs.handleTcpDelete(fileId)
+ if err == nil {
+ bufWriter.Write([]byte("+OK\n"))
+ } else {
+ bufWriter.Write([]byte("-ERR " + string(err.Error()) + "\n"))
+ }
+ case '?':
+ fileId := cmd[1:]
+ err = vs.handleTcpGet(fileId, bufWriter)
+ case '!':
+ bufWriter.Flush()
+ }
+
+ }
+
+}
+
+func (vs *VolumeServer) handleTcpGet(fileId string, writer *bufio.Writer) (err error) {
+
+ volumeId, n, err2 := vs.parseFileId(fileId)
+ if err2 != nil {
+ return err2
+ }
+
+ volume := vs.store.GetVolume(volumeId)
+ if volume == nil {
+ return fmt.Errorf("volume %d not found", volumeId)
+ }
+
+ err = volume.StreamRead(n, writer)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (vs *VolumeServer) handleTcpPut(fileId string, bufReader *bufio.Reader) (err error) {
+
+ volumeId, n, err2 := vs.parseFileId(fileId)
+ if err2 != nil {
+ return err2
+ }
+
+ volume := vs.store.GetVolume(volumeId)
+ if volume == nil {
+ return fmt.Errorf("volume %d not found", volumeId)
+ }
+
+ sizeBuf := make([]byte, 4)
+ if _, err = bufReader.Read(sizeBuf); err != nil {
+ return err
+ }
+ dataSize := util.BytesToUint32(sizeBuf)
+
+ err = volume.StreamWrite(n, bufReader, dataSize)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (vs *VolumeServer) handleTcpDelete(fileId string) (err error) {
+
+ volumeId, n, err2 := vs.parseFileId(fileId)
+ if err2 != nil {
+ return err2
+ }
+
+ _, err = vs.store.DeleteVolumeNeedle(volumeId, n)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (vs *VolumeServer) parseFileId(fileId string) (needle.VolumeId, *needle.Needle, error) {
+
+ commaIndex := strings.LastIndex(fileId, ",")
+ if commaIndex <= 0 {
+ return 0, nil, fmt.Errorf("unknown fileId %s", fileId)
+ }
+
+ vid, fid := fileId[0:commaIndex], fileId[commaIndex+1:]
+
+ volumeId, ve := needle.NewVolumeId(vid)
+ if ve != nil {
+ return 0, nil, fmt.Errorf("unknown volume id in fileId %s", fileId)
+ }
+
+ n := new(needle.Needle)
+ n.ParsePath(fid)
+ return volumeId, n, nil
+}
diff --git a/weed/storage/backend/disk_file.go b/weed/storage/backend/disk_file.go
index 498963c31..3b42429cf 100644
--- a/weed/storage/backend/disk_file.go
+++ b/weed/storage/backend/disk_file.go
@@ -52,7 +52,7 @@ func (df *DiskFile) WriteAt(p []byte, off int64) (n int, err error) {
return
}
-func (df *DiskFile) Append(p []byte) (n int, err error) {
+func (df *DiskFile) Write(p []byte) (n int, err error) {
return df.WriteAt(p, df.fileSize)
}
diff --git a/weed/storage/needle/crc.go b/weed/storage/needle/crc.go
index 6fd910bb7..e1bac829a 100644
--- a/weed/storage/needle/crc.go
+++ b/weed/storage/needle/crc.go
@@ -2,6 +2,8 @@ package needle
import (
"fmt"
+ "hash"
+ "io"
"github.com/klauspost/crc32"
@@ -29,3 +31,25 @@ func (n *Needle) Etag() string {
util.Uint32toBytes(bits, uint32(n.Checksum))
return fmt.Sprintf("%x", bits)
}
+
+func NewCRCwriter(w io.Writer) *CRCwriter {
+
+ return &CRCwriter{
+ h: crc32.New(table),
+ w: w,
+ }
+
+}
+
+type CRCwriter struct {
+ h hash.Hash32
+ w io.Writer
+}
+
+func (c *CRCwriter) Write(p []byte) (n int, err error) {
+ n, err = c.w.Write(p) // with each write ...
+ c.h.Write(p) // ... update the hash
+ return
+}
+
+func (c *CRCwriter) Sum() uint32 { return c.h.Sum32() } // final hash
diff --git a/weed/storage/volume_stream_write.go b/weed/storage/volume_stream_write.go
new file mode 100644
index 000000000..f619de30b
--- /dev/null
+++ b/weed/storage/volume_stream_write.go
@@ -0,0 +1,106 @@
+package storage
+
+import (
+ "bufio"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "io"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ . "github.com/chrislusf/seaweedfs/weed/storage/types"
+)
+
+func (v *Volume) StreamWrite(n *needle.Needle, data io.Reader, dataSize uint32) (err error) {
+
+ v.dataFileAccessLock.Lock()
+ defer v.dataFileAccessLock.Unlock()
+
+ df, ok := v.DataBackend.(*backend.DiskFile)
+ if !ok {
+ return fmt.Errorf("unexpected volume backend")
+ }
+ offset, _, _ := v.DataBackend.GetStat()
+
+ header := make([]byte, NeedleHeaderSize+TimestampSize) // adding timestamp to reuse it and avoid extra allocation
+ CookieToBytes(header[0:CookieSize], n.Cookie)
+ NeedleIdToBytes(header[CookieSize:CookieSize+NeedleIdSize], n.Id)
+ n.Size = 4 + Size(dataSize) + 1
+ SizeToBytes(header[CookieSize+NeedleIdSize:CookieSize+NeedleIdSize+SizeSize], n.Size)
+
+ n.DataSize = dataSize
+
+ // needle header
+ df.Write(header[0:NeedleHeaderSize])
+
+ // data size and data
+ util.Uint32toBytes(header[0:4], n.DataSize)
+ df.Write(header[0:4])
+ // write and calculate CRC
+ crcWriter := needle.NewCRCwriter(df)
+ io.Copy(crcWriter, io.LimitReader(data, int64(dataSize)))
+
+ // flags
+ util.Uint8toBytes(header[0:1], n.Flags)
+ df.Write(header[0:1])
+
+ // data checksum
+ util.Uint32toBytes(header[0:needle.NeedleChecksumSize], crcWriter.Sum())
+ df.Write(header[0:needle.NeedleChecksumSize])
+
+ // write timestamp, padding
+ n.AppendAtNs = uint64(time.Now().UnixNano())
+ util.Uint64toBytes(header[needle.NeedleChecksumSize:needle.NeedleChecksumSize+TimestampSize], n.AppendAtNs)
+ padding := needle.PaddingLength(n.Size, needle.Version3)
+ df.Write(header[0 : needle.NeedleChecksumSize+TimestampSize+padding])
+
+ // add to needle map
+ if err = v.nm.Put(n.Id, ToOffset(int64(offset)), n.Size); err != nil {
+ glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err)
+ }
+ return
+}
+
+func (v *Volume) StreamRead(n *needle.Needle, writer io.Writer) (err error) {
+
+ v.dataFileAccessLock.Lock()
+ defer v.dataFileAccessLock.Unlock()
+
+ nv, ok := v.nm.Get(n.Id)
+ if !ok || nv.Offset.IsZero() {
+ return ErrorNotFound
+ }
+
+ sr := &StreamReader{
+ readerAt: v.DataBackend,
+ offset: nv.Offset.ToActualOffset(),
+ }
+ bufReader := bufio.NewReader(sr)
+ bufReader.Discard(NeedleHeaderSize)
+ sizeBuf := make([]byte, 4)
+ bufReader.Read(sizeBuf)
+ if _, err = writer.Write(sizeBuf); err != nil {
+ return err
+ }
+ dataSize := util.BytesToUint32(sizeBuf)
+
+ _, err = io.Copy(writer, io.LimitReader(bufReader, int64(dataSize)))
+
+ return
+}
+
+type StreamReader struct {
+ offset int64
+ readerAt io.ReaderAt
+}
+
+func (sr *StreamReader) Read(p []byte) (n int, err error) {
+ n, err = sr.readerAt.ReadAt(p, sr.offset)
+ if err != nil {
+ return
+ }
+ sr.offset += int64(n)
+ return
+}
diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go
index 0ee1e61c6..be84f8a13 100644
--- a/weed/storage/volume_vacuum.go
+++ b/weed/storage/volume_vacuum.go
@@ -286,7 +286,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
if err != nil {
return fmt.Errorf("ReadNeedleBlob %s key %d offset %d size %d failed: %v", oldDatFile.Name(), key, increIdxEntry.offset.ToActualOffset(), increIdxEntry.size, err)
}
- dstDatBackend.Append(needleBytes)
+ dstDatBackend.Write(needleBytes)
util.Uint32toBytes(idxEntryBytes[8:12], uint32(offset/NeedlePaddingSize))
} else { //deleted needle
//fakeDelNeedle 's default Data field is nil