aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-03-13 14:48:28 -0800
committerChris Lu <chris.lu@gmail.com>2021-03-13 14:48:28 -0800
commite12600cc8eea9a36115d465d2a7511b1b30aad07 (patch)
tree46111ea00a7465193192a8e23101511afa449b67
parent0059f4a201c33411f0911ffd08eae1aba511d10a (diff)
downloadseaweedfs-e12600cc8eea9a36115d465d2a7511b1b30aad07.tar.xz
seaweedfs-e12600cc8eea9a36115d465d2a7511b1b30aad07.zip
still not working
-rw-r--r--weed/command/volume.go2
-rw-r--r--weed/server/volume_server_udp_handlers.go97
-rw-r--r--weed/wdclient/volume_udp_client.go51
3 files changed, 55 insertions, 95 deletions
diff --git a/weed/command/volume.go b/weed/command/volume.go
index 1468aa2ee..ab3c63a9a 100644
--- a/weed/command/volume.go
+++ b/weed/command/volume.go
@@ -417,7 +417,7 @@ func (v VolumeServerOptions) startUdpService(volumeServer *weed_server.VolumeSer
conn, err := listener.Accept()
if err == nil {
glog.V(0).Infof("Client from %s", conn.RemoteAddr())
- go volumeServer.HandleTcpConnection(conn)
+ go volumeServer.HandleUdpConnection(conn)
} else if isTemporaryError(err) {
continue
} else {
diff --git a/weed/server/volume_server_udp_handlers.go b/weed/server/volume_server_udp_handlers.go
index 2f7563f4c..b1f95446c 100644
--- a/weed/server/volume_server_udp_handlers.go
+++ b/weed/server/volume_server_udp_handlers.go
@@ -1,81 +1,48 @@
package weed_server
import (
+ "bufio"
"github.com/chrislusf/seaweedfs/weed/glog"
- "pack.ag/tftp"
+ "io"
+ "net"
)
-func (vs *VolumeServer) ServeTFTP(r tftp.ReadRequest) {
+func (vs *VolumeServer) HandleUdpConnection(c net.Conn) {
+ defer c.Close()
- filename := r.Name()
+ glog.V(0).Infof("Serving writes from %s", c.RemoteAddr().String())
- volumeId, n, err := vs.parseFileId(filename)
- if err != nil {
- glog.Errorf("parse file id %s: %v", filename, err)
- return
- }
-
- hasVolume := vs.store.HasVolume(volumeId)
- _, hasEcVolume := vs.store.FindEcVolume(volumeId)
-
- if hasVolume {
- if _, err = vs.store.ReadVolumeNeedle(volumeId, n, nil); err != nil {
- glog.Errorf("ReadVolumeNeedle %s: %v", filename, err)
- return
- }
- }
- if hasEcVolume {
- if _, err = vs.store.ReadEcShardNeedle(volumeId, n); err != nil {
- glog.Errorf("ReadEcShardNeedle %s: %v", filename, err)
- return
- }
- }
-
- if _, err = r.Write(n.Data); err != nil {
- glog.Errorf("UDP Write data %s: %v", filename, err)
- return
- }
-
-}
-
-func (vs *VolumeServer) ReceiveTFTP(w tftp.WriteRequest) {
-
- filename := w.Name()
- println("+ ", filename)
+ bufReader := bufio.NewReaderSize(c, 1024*1024)
+ bufWriter := bufio.NewWriterSize(c, 1024*1024)
- // Get the file size
- size, err := w.Size()
-
- // Note: The size value is sent by the client, the client could send more data than
- // it indicated in the size option. To be safe we'd want to allocate a buffer
- // with the size we're expecting and use w.Read(buf) rather than ioutil.ReadAll.
-
- if filename[0] == '-' {
- err = vs.handleTcpDelete(filename[1:])
+ for {
+ cmd, err := bufReader.ReadString('\n')
if err != nil {
- glog.Errorf("handleTcpDelete %s: %v", filename, err)
+ if err != io.EOF {
+ glog.Errorf("read command from %s: %v", c.RemoteAddr().String(), err)
+ }
return
}
- }
-
- volumeId, n, err := vs.parseFileId(filename)
- if err != nil {
- glog.Errorf("parse file id %s: %v", filename, err)
- return
- }
-
- volume := vs.store.GetVolume(volumeId)
- if volume == nil {
- glog.Errorf("volume %d not found", volumeId)
- return
- }
+ cmd = cmd[:len(cmd)-1]
+ switch cmd[0] {
+ case '+':
+ fileId := cmd[1:]
+ err = vs.handleTcpPut(fileId, bufReader)
+ if err != nil {
+ glog.Errorf("put %s: %v", fileId, err)
+ }
+ case '-':
+ fileId := cmd[1:]
+ err = vs.handleTcpDelete(fileId)
+ if err != nil {
+ glog.Errorf("del %s: %v", fileId, err)
+ }
+ case '?':
+ fileId := cmd[1:]
+ err = vs.handleTcpGet(fileId, bufWriter)
+ case '!':
+ }
- err = volume.StreamWrite(n, w, uint32(size))
- if err != nil {
- glog.Errorf("StreamWrite %s: %v", filename, err)
- return
}
- println("- ", filename)
-
}
diff --git a/weed/wdclient/volume_udp_client.go b/weed/wdclient/volume_udp_client.go
index 93fd2b227..470d7a82d 100644
--- a/weed/wdclient/volume_udp_client.go
+++ b/weed/wdclient/volume_udp_client.go
@@ -2,9 +2,6 @@ package wdclient
import (
"bufio"
- "bytes"
- "fmt"
- "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/udptransfer"
"github.com/chrislusf/seaweedfs/weed/util"
@@ -70,45 +67,41 @@ func (c *VolumeUdpClient) PutFileChunk(volumeServerAddress string, fileId string
return parseErr
}
- c.cp.Register("udp", udpAddress)
- udpConn, getErr := c.cp.Get("udp", udpAddress)
- if getErr != nil {
- return fmt.Errorf("get connection to %s: %v", udpAddress, getErr)
+ listener, err := udptransfer.NewEndpoint(&udptransfer.Params{
+ LocalAddr: "",
+ Bandwidth: 100,
+ FastRetransmit: true,
+ FlatTraffic: true,
+ IsServ: false,
+ })
+ if err != nil {
+ return err
}
- conn := udpConn.RawConn().(*VolumeUdpConn)
- defer func() {
- if err != nil {
- udpConn.DiscardConnection()
- } else {
- udpConn.ReleaseConnection()
- }
- }()
- buf := []byte("+" + fileId + "\n")
- _, err = conn.bufWriter.Write([]byte(buf))
+ conn, err := listener.Dial(udpAddress)
if err != nil {
- return
+ return err
}
- util.Uint32toBytes(buf[0:4], fileSize)
- _, err = conn.bufWriter.Write(buf[0:4])
+ defer conn.Close()
+
+ bufWriter := bufio.NewWriter(conn)
+
+ buf := []byte("+" + fileId + "\n")
+ _, err = bufWriter.Write([]byte(buf))
if err != nil {
return
}
- _, err = io.Copy(conn.bufWriter, fileReader)
+ util.Uint32toBytes(buf[0:4], fileSize)
+ _, err = bufWriter.Write(buf[0:4])
if err != nil {
return
}
- conn.bufWriter.Write([]byte("!\n"))
- conn.bufWriter.Flush()
-
- ret, _, err := conn.bufReader.ReadLine()
+ _, err = io.Copy(bufWriter, fileReader)
if err != nil {
- glog.V(0).Infof("upload by udp: %v", err)
return
}
- if !bytes.HasPrefix(ret, []byte("+OK")) {
- glog.V(0).Infof("upload by udp: %v", string(ret))
- }
+ bufWriter.Write([]byte("!\n"))
+ bufWriter.Flush()
return nil
}