diff options
Diffstat (limited to 'weed/server/volume_server_tcp_handlers_write.go')
| -rw-r--r-- | weed/server/volume_server_tcp_handlers_write.go | 138 |
1 files changed, 0 insertions, 138 deletions
diff --git a/weed/server/volume_server_tcp_handlers_write.go b/weed/server/volume_server_tcp_handlers_write.go deleted file mode 100644 index fb2623a2c..000000000 --- a/weed/server/volume_server_tcp_handlers_write.go +++ /dev/null @@ -1,138 +0,0 @@ -package weed_server - -import ( - "bufio" - "fmt" - "io" - "net" - "strings" - - "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/storage/needle" - "github.com/seaweedfs/seaweedfs/weed/util" -) - -func (vs *VolumeServer) HandleTcpConnection(c net.Conn) { - defer c.Close() - - glog.V(0).Infof("Serving writes from %s", c.RemoteAddr().String()) - - bufReader := bufio.NewReaderSize(c, 1024*1024) - bufWriter := bufio.NewWriterSize(c, 1024*1024) - - for { - cmd, err := bufReader.ReadString('\n') - if err != nil { - if err != io.EOF { - glog.Errorf("read command from %s: %v", c.RemoteAddr().String(), err) - } - return - } - cmd = cmd[:len(cmd)-1] - switch cmd[0] { - case '+': - fileId := cmd[1:] - err = vs.handleTcpPut(fileId, bufReader) - if err == nil { - bufWriter.Write([]byte("+OK\n")) - } else { - bufWriter.Write([]byte("-ERR " + string(err.Error()) + "\n")) - } - case '-': - fileId := cmd[1:] - err = vs.handleTcpDelete(fileId) - if err == nil { - bufWriter.Write([]byte("+OK\n")) - } else { - bufWriter.Write([]byte("-ERR " + string(err.Error()) + "\n")) - } - case '?': - fileId := cmd[1:] - err = vs.handleTcpGet(fileId, bufWriter) - case '!': - bufWriter.Flush() - } - - } - -} - -func (vs *VolumeServer) handleTcpGet(fileId string, writer *bufio.Writer) (err error) { - - volumeId, n, err2 := vs.parseFileId(fileId) - if err2 != nil { - return err2 - } - - volume := vs.store.GetVolume(volumeId) - if volume == nil { - return fmt.Errorf("volume %d not found", volumeId) - } - - err = volume.StreamRead(n, writer) - if err != nil { - return err - } - - return nil -} - -func (vs *VolumeServer) handleTcpPut(fileId string, bufReader *bufio.Reader) (err error) { - - volumeId, n, err2 := vs.parseFileId(fileId) - if err2 != nil { - return err2 - } - - volume := vs.store.GetVolume(volumeId) - if volume == nil { - return fmt.Errorf("volume %d not found", volumeId) - } - - sizeBuf := make([]byte, 4) - if _, err = bufReader.Read(sizeBuf); err != nil { - return err - } - dataSize := util.BytesToUint32(sizeBuf) - - err = volume.StreamWrite(n, bufReader, dataSize) - if err != nil { - return err - } - - return nil -} - -func (vs *VolumeServer) handleTcpDelete(fileId string) (err error) { - - volumeId, n, err2 := vs.parseFileId(fileId) - if err2 != nil { - return err2 - } - - _, err = vs.store.DeleteVolumeNeedle(volumeId, n) - if err != nil { - return err - } - - return nil -} - -func (vs *VolumeServer) parseFileId(fileId string) (needle.VolumeId, *needle.Needle, error) { - - commaIndex := strings.LastIndex(fileId, ",") - if commaIndex <= 0 { - return 0, nil, fmt.Errorf("unknown fileId %s", fileId) - } - - vid, fid := fileId[0:commaIndex], fileId[commaIndex+1:] - - volumeId, ve := needle.NewVolumeId(vid) - if ve != nil { - return 0, nil, fmt.Errorf("unknown volume id in fileId %s", fileId) - } - - n := new(needle.Needle) - n.ParsePath(fid) - return volumeId, n, nil -} |
