aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-03-13 10:59:29 -0800
committerChris Lu <chris.lu@gmail.com>2021-03-13 10:59:29 -0800
commit56953b207d771a0bf8b799576d8a6e50bc2cd5af (patch)
treeed3769b59391cb273282f132649b32201a1f2fc5
parentf9a7c45e9a223d8876d4baafb3d03ac974c39cfb (diff)
downloadseaweedfs-56953b207d771a0bf8b799576d8a6e50bc2cd5af.tar.xz
seaweedfs-56953b207d771a0bf8b799576d8a6e50bc2cd5af.zip
compiles, but fail to send
-rw-r--r--go.mod1
-rw-r--r--go.sum2
-rw-r--r--weed/command/volume.go11
-rw-r--r--weed/server/volume_server_udp_handlers.go59
-rw-r--r--weed/wdclient/volume_udp_client.go21
5 files changed, 55 insertions, 39 deletions
diff --git a/go.mod b/go.mod
index 08b41cb09..c8a914ab0 100644
--- a/go.mod
+++ b/go.mod
@@ -99,6 +99,7 @@ require (
gopkg.in/jcmturner/goidentity.v3 v3.0.0 // indirect
gopkg.in/jcmturner/gokrb5.v7 v7.3.0 // indirect
gopkg.in/karlseguin/expect.v1 v1.0.1 // indirect
+ pack.ag/tftp v1.0.0 // indirect
)
// replace github.com/seaweedfs/fuse => /Users/chris/go/src/github.com/seaweedfs/fuse
diff --git a/go.sum b/go.sum
index e66b0ecb9..ab1ef4739 100644
--- a/go.sum
+++ b/go.sum
@@ -1243,6 +1243,8 @@ modernc.org/mathutil v1.1.1/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6
modernc.org/strutil v1.1.0 h1:+1/yCzZxY2pZwwrsbH+4T7BQMoLQ9QiBshRC9eicYsc=
modernc.org/strutil v1.1.0/go.mod h1:lstksw84oURvj9y3tn8lGvRxyRC1S2+g5uuIzNfIOBs=
pack.ag/amqp v0.11.2/go.mod h1:4/cbmt4EJXSKlG6LCfWHoqmN0uFdy5i/+YFz+fTfhV4=
+pack.ag/tftp v1.0.0 h1:q7iP8mKRtqTAWfxbQ4XY5/flZ5JmuThvrEmHPn8cN9A=
+pack.ag/tftp v1.0.0/go.mod h1:N1Pyo5YG+K90XHoR2vfLPhpRuE8ziqbgMn/r/SghZas=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
diff --git a/weed/command/volume.go b/weed/command/volume.go
index 002227a10..0f3dba361 100644
--- a/weed/command/volume.go
+++ b/weed/command/volume.go
@@ -29,7 +29,7 @@ import (
stats_collect "github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/pin/tftp"
+ "pack.ag/tftp"
)
var (
@@ -398,11 +398,16 @@ func (v VolumeServerOptions) startTcpService(volumeServer *weed_server.VolumeSer
}
func (v VolumeServerOptions) startUdpService(volumeServer *weed_server.VolumeServer) {
- tftpServer := tftp.NewServer(volumeServer.UdpReadHandler, volumeServer.UdpWriteHandler)
listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port+20001)
+ tftpServer, err := tftp.NewServer(listeningAddress)
+ if err != nil {
+ glog.Fatalf("Volume server listen on %s:%v", listeningAddress, err)
+ }
+ tftpServer.WriteHandler(volumeServer)
+ tftpServer.ReadHandler(volumeServer)
glog.V(0).Infoln("Start Seaweed volume server", util.Version(), "UDP at", listeningAddress)
- if e:= tftpServer.ListenAndServe(listeningAddress); e != nil {
+ if e:= tftpServer.ListenAndServe(); e != nil {
glog.Fatalf("Volume server UDP on %s:%v", listeningAddress, e)
}
}
diff --git a/weed/server/volume_server_udp_handlers.go b/weed/server/volume_server_udp_handlers.go
index 99c43e3f4..e0dc94310 100644
--- a/weed/server/volume_server_udp_handlers.go
+++ b/weed/server/volume_server_udp_handlers.go
@@ -1,16 +1,18 @@
package weed_server
import (
- "bytes"
- "fmt"
- "io"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "pack.ag/tftp"
)
-func (vs *VolumeServer) UdpReadHandler(filename string, rf io.ReaderFrom) error {
+func (vs *VolumeServer) ServeTFTP(r tftp.ReadRequest) {
+
+ filename := r.Name()
volumeId, n, err := vs.parseFileId(filename)
if err != nil {
- return err
+ glog.Errorf("parse file id %s: %v", filename, err)
+ return
}
hasVolume := vs.store.HasVolume(volumeId)
@@ -18,48 +20,59 @@ func (vs *VolumeServer) UdpReadHandler(filename string, rf io.ReaderFrom) error
if hasVolume {
if _, err = vs.store.ReadVolumeNeedle(volumeId, n, nil); err != nil {
- return err
+ glog.Errorf("ReadVolumeNeedle %s: %v", filename, err)
+ return
}
}
if hasEcVolume {
if _, err = vs.store.ReadEcShardNeedle(volumeId, n); err != nil {
- return err
+ glog.Errorf("ReadEcShardNeedle %s: %v", filename, err)
+ return
}
}
- if _, err = rf.ReadFrom(bytes.NewBuffer(n.Data)); err != nil {
- return err
+ if _, err = r.Write(n.Data); err != nil {
+ glog.Errorf("UDP Write data %s: %v", filename, err)
+ return
}
- return nil
}
-func (vs *VolumeServer) UdpWriteHandler(filename string, wt io.WriterTo) error {
+func (vs *VolumeServer) ReceiveTFTP(w tftp.WriteRequest) {
+
+ filename := w.Name()
+
+ // Get the file size
+ size, err := w.Size()
+
+ // Note: The size value is sent by the client, the client could send more data than
+ // it indicated in the size option. To be safe we'd want to allocate a buffer
+ // with the size we're expecting and use w.Read(buf) rather than ioutil.ReadAll.
if filename[0] == '-' {
- return vs.handleTcpDelete(filename[1:])
+ err = vs.handleTcpDelete(filename[1:])
+ if err != nil {
+ glog.Errorf("handleTcpDelete %s: %v", filename, err)
+ return
+ }
}
volumeId, n, err := vs.parseFileId(filename)
if err != nil {
- return err
+ glog.Errorf("parse file id %s: %v", filename, err)
+ return
}
volume := vs.store.GetVolume(volumeId)
if volume == nil {
- return fmt.Errorf("volume %d not found", volumeId)
- }
-
- var buf bytes.Buffer
- written, err := wt.WriteTo(&buf)
- if err != nil {
- return err
+ glog.Errorf("volume %d not found", volumeId)
+ return
}
- err = volume.StreamWrite(n, &buf, uint32(written))
+ err = volume.StreamWrite(n, w, uint32(size))
if err != nil {
- return err
+ glog.Errorf("StreamWrite %s: %v", filename, err)
+ return
}
- return nil
}
diff --git a/weed/wdclient/volume_udp_client.go b/weed/wdclient/volume_udp_client.go
index 0b7e8cf6d..2daf37bfc 100644
--- a/weed/wdclient/volume_udp_client.go
+++ b/weed/wdclient/volume_udp_client.go
@@ -1,14 +1,14 @@
package wdclient
import (
+ "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
- "github.com/pin/tftp"
+ "pack.ag/tftp"
"io"
)
// VolumeTcpClient put/get/delete file chunks directly on volume servers without replication
type VolumeUdpClient struct {
- udpClient *tftp.Client
}
func NewVolumeUdpClient() *VolumeUdpClient {
@@ -23,18 +23,13 @@ func (c *VolumeUdpClient) PutFileChunk(volumeServerAddress string, fileId string
return parseErr
}
- if c.udpClient == nil {
- c.udpClient, err = tftp.NewClient(udpAddress)
- if err != nil {
- return
- }
- }
- rf, err := c.udpClient.Send(fileId, "octet")
- if err != nil {
- return
- }
- _, err = rf.ReadFrom(fileReader)
+ udpClient, _ := tftp.NewClient()
+
+ fileUrl := "tftp://"+udpAddress+"/"+fileId
+
+ err = udpClient.Put(fileUrl, fileReader, int64(fileSize))
if err != nil {
+ glog.Errorf("udp put %s: %v", fileUrl, err)
return
}