aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
Diffstat (limited to 'weed')
-rw-r--r--weed/command/server.go1
-rw-r--r--weed/command/volume.go26
-rw-r--r--weed/server/volume_server_tcp_handlers_write.go138
-rw-r--r--weed/storage/volume_stream_write.go105
4 files changed, 0 insertions, 270 deletions
diff --git a/weed/command/server.go b/weed/command/server.go
index c47b7fa5d..8dfa63e34 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -130,7 +130,6 @@ func init() {
serverOptions.v.preStopSeconds = cmdServer.Flag.Int("volume.preStopSeconds", 10, "number of seconds between stop send heartbeats and stop volume server")
serverOptions.v.pprof = cmdServer.Flag.Bool("volume.pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile")
serverOptions.v.idxFolder = cmdServer.Flag.String("volume.dir.idx", "", "directory to store .idx files")
- serverOptions.v.enableTcp = cmdServer.Flag.Bool("volume.tcp", false, "<exprimental> enable tcp port")
serverOptions.v.inflightUploadDataTimeout = cmdServer.Flag.Duration("volume.inflightUploadDataTimeout", 60*time.Second, "inflight upload data wait timeout of volume servers")
s3Options.port = cmdServer.Flag.Int("s3.port", 8333, "s3 server http listen port")
diff --git a/weed/command/volume.go b/weed/command/volume.go
index 3ad8ba1bb..aa300108a 100644
--- a/weed/command/volume.go
+++ b/weed/command/volume.go
@@ -65,7 +65,6 @@ type VolumeServerOptions struct {
preStopSeconds *int
metricsHttpPort *int
// pulseSeconds *int
- enableTcp *bool
inflightUploadDataTimeout *time.Duration
}
@@ -96,7 +95,6 @@ func init() {
v.pprof = cmdVolume.Flag.Bool("pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile")
v.metricsHttpPort = cmdVolume.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
v.idxFolder = cmdVolume.Flag.String("dir.idx", "", "directory to store .idx files")
- v.enableTcp = cmdVolume.Flag.Bool("tcp", false, "<experimental> enable tcp port")
v.inflightUploadDataTimeout = cmdVolume.Flag.Duration("inflightUploadDataTimeout", 60*time.Second, "inflight upload data wait timeout of volume servers")
}
@@ -258,11 +256,6 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
}
}
- // starting tcp server
- if *v.enableTcp {
- go v.startTcpService(volumeServer)
- }
-
// starting the cluster http server
clusterHttpServer := v.startClusterHttpService(volumeMux)
@@ -388,22 +381,3 @@ func (v VolumeServerOptions) startClusterHttpService(handler http.Handler) httpd
}()
return clusterHttpServer
}
-
-func (v VolumeServerOptions) startTcpService(volumeServer *weed_server.VolumeServer) {
- listeningAddress := util.JoinHostPort(*v.bindIp, *v.port+20000)
- glog.V(0).Infoln("Start Seaweed volume server", util.Version(), "tcp at", listeningAddress)
- listener, e := util.NewListener(listeningAddress, 0)
- if e != nil {
- glog.Fatalf("Volume server listener error on %s:%v", listeningAddress, e)
- }
- defer listener.Close()
-
- for {
- c, err := listener.Accept()
- if err != nil {
- fmt.Println(err)
- return
- }
- go volumeServer.HandleTcpConnection(c)
- }
-}
diff --git a/weed/server/volume_server_tcp_handlers_write.go b/weed/server/volume_server_tcp_handlers_write.go
deleted file mode 100644
index fb2623a2c..000000000
--- a/weed/server/volume_server_tcp_handlers_write.go
+++ /dev/null
@@ -1,138 +0,0 @@
-package weed_server
-
-import (
- "bufio"
- "fmt"
- "io"
- "net"
- "strings"
-
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/storage/needle"
- "github.com/seaweedfs/seaweedfs/weed/util"
-)
-
-func (vs *VolumeServer) HandleTcpConnection(c net.Conn) {
- defer c.Close()
-
- glog.V(0).Infof("Serving writes from %s", c.RemoteAddr().String())
-
- bufReader := bufio.NewReaderSize(c, 1024*1024)
- bufWriter := bufio.NewWriterSize(c, 1024*1024)
-
- for {
- cmd, err := bufReader.ReadString('\n')
- if err != nil {
- if err != io.EOF {
- glog.Errorf("read command from %s: %v", c.RemoteAddr().String(), err)
- }
- return
- }
- cmd = cmd[:len(cmd)-1]
- switch cmd[0] {
- case '+':
- fileId := cmd[1:]
- err = vs.handleTcpPut(fileId, bufReader)
- if err == nil {
- bufWriter.Write([]byte("+OK\n"))
- } else {
- bufWriter.Write([]byte("-ERR " + string(err.Error()) + "\n"))
- }
- case '-':
- fileId := cmd[1:]
- err = vs.handleTcpDelete(fileId)
- if err == nil {
- bufWriter.Write([]byte("+OK\n"))
- } else {
- bufWriter.Write([]byte("-ERR " + string(err.Error()) + "\n"))
- }
- case '?':
- fileId := cmd[1:]
- err = vs.handleTcpGet(fileId, bufWriter)
- case '!':
- bufWriter.Flush()
- }
-
- }
-
-}
-
-func (vs *VolumeServer) handleTcpGet(fileId string, writer *bufio.Writer) (err error) {
-
- volumeId, n, err2 := vs.parseFileId(fileId)
- if err2 != nil {
- return err2
- }
-
- volume := vs.store.GetVolume(volumeId)
- if volume == nil {
- return fmt.Errorf("volume %d not found", volumeId)
- }
-
- err = volume.StreamRead(n, writer)
- if err != nil {
- return err
- }
-
- return nil
-}
-
-func (vs *VolumeServer) handleTcpPut(fileId string, bufReader *bufio.Reader) (err error) {
-
- volumeId, n, err2 := vs.parseFileId(fileId)
- if err2 != nil {
- return err2
- }
-
- volume := vs.store.GetVolume(volumeId)
- if volume == nil {
- return fmt.Errorf("volume %d not found", volumeId)
- }
-
- sizeBuf := make([]byte, 4)
- if _, err = bufReader.Read(sizeBuf); err != nil {
- return err
- }
- dataSize := util.BytesToUint32(sizeBuf)
-
- err = volume.StreamWrite(n, bufReader, dataSize)
- if err != nil {
- return err
- }
-
- return nil
-}
-
-func (vs *VolumeServer) handleTcpDelete(fileId string) (err error) {
-
- volumeId, n, err2 := vs.parseFileId(fileId)
- if err2 != nil {
- return err2
- }
-
- _, err = vs.store.DeleteVolumeNeedle(volumeId, n)
- if err != nil {
- return err
- }
-
- return nil
-}
-
-func (vs *VolumeServer) parseFileId(fileId string) (needle.VolumeId, *needle.Needle, error) {
-
- commaIndex := strings.LastIndex(fileId, ",")
- if commaIndex <= 0 {
- return 0, nil, fmt.Errorf("unknown fileId %s", fileId)
- }
-
- vid, fid := fileId[0:commaIndex], fileId[commaIndex+1:]
-
- volumeId, ve := needle.NewVolumeId(vid)
- if ve != nil {
- return 0, nil, fmt.Errorf("unknown volume id in fileId %s", fileId)
- }
-
- n := new(needle.Needle)
- n.ParsePath(fid)
- return volumeId, n, nil
-}
diff --git a/weed/storage/volume_stream_write.go b/weed/storage/volume_stream_write.go
deleted file mode 100644
index 2496387ff..000000000
--- a/weed/storage/volume_stream_write.go
+++ /dev/null
@@ -1,105 +0,0 @@
-package storage
-
-import (
- "bufio"
- "fmt"
- "io"
- "time"
-
- "github.com/seaweedfs/seaweedfs/weed/util"
-
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/storage/backend"
- "github.com/seaweedfs/seaweedfs/weed/storage/needle"
- . "github.com/seaweedfs/seaweedfs/weed/storage/types"
-)
-
-func (v *Volume) StreamWrite(n *needle.Needle, data io.Reader, dataSize uint32) (err error) {
-
- v.dataFileAccessLock.Lock()
- defer v.dataFileAccessLock.Unlock()
-
- df, ok := v.DataBackend.(*backend.DiskFile)
- if !ok {
- return fmt.Errorf("unexpected volume backend")
- }
- offset, _, _ := v.DataBackend.GetStat()
-
- header := make([]byte, NeedleHeaderSize+TimestampSize) // adding timestamp to reuse it and avoid extra allocation
- CookieToBytes(header[0:CookieSize], n.Cookie)
- NeedleIdToBytes(header[CookieSize:CookieSize+NeedleIdSize], n.Id)
- n.Size = 4 + Size(dataSize) + 1
- SizeToBytes(header[CookieSize+NeedleIdSize:CookieSize+NeedleIdSize+SizeSize], n.Size)
-
- n.DataSize = dataSize
-
- // needle header
- df.Write(header[0:NeedleHeaderSize])
-
- // data size and data
- util.Uint32toBytes(header[0:4], n.DataSize)
- df.Write(header[0:4])
- // write and calculate CRC
- crcWriter := needle.NewCRCwriter(df)
- io.Copy(crcWriter, io.LimitReader(data, int64(dataSize)))
-
- // flags
- util.Uint8toBytes(header[0:1], n.Flags)
- df.Write(header[0:1])
-
- // data checksum
- util.Uint32toBytes(header[0:needle.NeedleChecksumSize], crcWriter.Sum())
- // write timestamp, padding
- n.AppendAtNs = uint64(time.Now().UnixNano())
- util.Uint64toBytes(header[needle.NeedleChecksumSize:needle.NeedleChecksumSize+TimestampSize], n.AppendAtNs)
- padding := needle.PaddingLength(n.Size, needle.Version3)
- df.Write(header[0 : needle.NeedleChecksumSize+TimestampSize+padding])
-
- // add to needle map
- if err = v.nm.Put(n.Id, ToOffset(int64(offset)), n.Size); err != nil {
- glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err)
- }
- return
-}
-
-func (v *Volume) StreamRead(n *needle.Needle, writer io.Writer) (err error) {
-
- v.dataFileAccessLock.Lock()
- defer v.dataFileAccessLock.Unlock()
-
- nv, ok := v.nm.Get(n.Id)
- if !ok || nv.Offset.IsZero() {
- return ErrorNotFound
- }
-
- sr := &StreamReader{
- readerAt: v.DataBackend,
- offset: nv.Offset.ToActualOffset(),
- }
- bufReader := bufio.NewReader(sr)
- bufReader.Discard(NeedleHeaderSize)
- sizeBuf := make([]byte, 4)
- bufReader.Read(sizeBuf)
- if _, err = writer.Write(sizeBuf); err != nil {
- return err
- }
- dataSize := util.BytesToUint32(sizeBuf)
-
- _, err = io.Copy(writer, io.LimitReader(bufReader, int64(dataSize)))
-
- return
-}
-
-type StreamReader struct {
- offset int64
- readerAt io.ReaderAt
-}
-
-func (sr *StreamReader) Read(p []byte) (n int, err error) {
- n, err = sr.readerAt.ReadAt(p, sr.offset)
- if err != nil {
- return
- }
- sr.offset += int64(n)
- return
-}