aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-03-13 10:16:32 -0800
committerChris Lu <chris.lu@gmail.com>2021-03-13 10:16:32 -0800
commitf9a7c45e9a223d8876d4baafb3d03ac974c39cfb (patch)
tree78e87f0cfb847104a4a38a60937e3f2034c22298
parent2a68ddb963d2ad3810b799487fa66059790fb105 (diff)
downloadseaweedfs-f9a7c45e9a223d8876d4baafb3d03ac974c39cfb.tar.xz
seaweedfs-f9a7c45e9a223d8876d4baafb3d03ac974c39cfb.zip
udp hangs
-rw-r--r--go.mod1
-rw-r--r--go.sum2
-rw-r--r--weed/command/benchmark.go24
-rw-r--r--weed/command/volume.go16
-rw-r--r--weed/server/volume_server_udp_handlers.go65
-rw-r--r--weed/wdclient/volume_udp_client.go42
6 files changed, 147 insertions, 3 deletions
diff --git a/go.mod b/go.mod
index 7969c4c89..08b41cb09 100644
--- a/go.mod
+++ b/go.mod
@@ -57,6 +57,7 @@ require (
github.com/olivere/elastic/v7 v7.0.19
github.com/peterh/liner v1.1.0
github.com/pierrec/lz4 v2.2.7+incompatible // indirect
+ github.com/pin/tftp v2.1.0+incompatible // indirect
github.com/prometheus/client_golang v1.3.0
github.com/rakyll/statik v0.1.7
github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563 // indirect
diff --git a/go.sum b/go.sum
index 223d290af..e66b0ecb9 100644
--- a/go.sum
+++ b/go.sum
@@ -619,6 +619,8 @@ github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pierrec/lz4 v2.2.7+incompatible h1:Eerk9aiqeZo2QzsbWOAsELUf9ddvAxEdMY9LYze/DEc=
github.com/pierrec/lz4 v2.2.7+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
+github.com/pin/tftp v2.1.0+incompatible h1:Yng4J7jv6lOc6IF4XoB5mnd3P7ZrF60XQq+my3FAMus=
+github.com/pin/tftp v2.1.0+incompatible/go.mod h1:xVpZOMCXTy+A5QMjEVN0Glwa1sUvaJhFXbr/aAxuxGY=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go
index af0793c70..d40a6cfe0 100644
--- a/weed/command/benchmark.go
+++ b/weed/command/benchmark.go
@@ -42,6 +42,7 @@ type BenchmarkOptions struct {
masterClient *wdclient.MasterClient
fsync *bool
useTcp *bool
+ useUdp *bool
}
var (
@@ -68,7 +69,8 @@ func init() {
b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file")
b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
b.fsync = cmdBenchmark.Flag.Bool("fsync", false, "flush data to disk after write")
- b.useTcp = cmdBenchmark.Flag.Bool("useTcp", false, "send data via tcp")
+ b.useTcp = cmdBenchmark.Flag.Bool("useTcp", false, "write data via tcp")
+ b.useUdp = cmdBenchmark.Flag.Bool("useUdp", false, "write data via udp")
sharedBytes = make([]byte, 1024)
}
@@ -226,6 +228,7 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
random := rand.New(rand.NewSource(time.Now().UnixNano()))
volumeTcpClient := wdclient.NewVolumeTcpClient()
+ volumeUdpClient := wdclient.NewVolumeUdpClient()
for id := range idChan {
start := time.Now()
@@ -255,6 +258,14 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
} else {
s.failed++
}
+ } else if *b.useUdp {
+ if uploadByUdp(volumeUdpClient, fp) {
+ fileIdLineChan <- fp.Fid
+ s.completed++
+ s.transferred += fileSize
+ } else {
+ s.failed++
+ }
} else if _, err := fp.Upload(0, b.masterClient.GetMaster, false, assignResult.Auth, b.grpcDialOption); err == nil {
if random.Intn(100) < *b.deletePercentage {
s.total++
@@ -352,6 +363,17 @@ func uploadByTcp(volumeTcpClient *wdclient.VolumeTcpClient, fp *operation.FilePa
return true
}
+func uploadByUdp(volumeUdpClient *wdclient.VolumeUdpClient, fp *operation.FilePart) bool {
+
+ err := volumeUdpClient.PutFileChunk(fp.Server, fp.Fid, uint32(fp.FileSize), fp.Reader)
+ if err != nil {
+ glog.Errorf("upload chunk err: %v", err)
+ return false
+ }
+
+ return true
+}
+
func readFileIds(fileName string, fileIdLineChan chan string) {
file, err := os.Open(fileName) // For read access.
if err != nil {
diff --git a/weed/command/volume.go b/weed/command/volume.go
index 0e8224dbc..002227a10 100644
--- a/weed/command/volume.go
+++ b/weed/command/volume.go
@@ -29,6 +29,7 @@ import (
stats_collect "github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/pin/tftp"
)
var (
@@ -256,6 +257,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
// starting tcp server
if *v.enableTcp {
go v.startTcpService(volumeServer)
+ go v.startUdpService(volumeServer)
}
// starting the cluster http server
@@ -378,10 +380,10 @@ func (v VolumeServerOptions) startClusterHttpService(handler http.Handler) httpd
func (v VolumeServerOptions) startTcpService(volumeServer *weed_server.VolumeServer) {
listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port+20000)
- glog.V(0).Infoln("Start Seaweed volume server", util.Version(), "tcp at", listeningAddress)
+ glog.V(0).Infoln("Start Seaweed volume server", util.Version(), "TCP at", listeningAddress)
listener, e := util.NewListener(listeningAddress, 0)
if e != nil {
- glog.Fatalf("Volume server listener error on %s:%v", listeningAddress, e)
+ glog.Fatalf("Volume server TCP on %s:%v", listeningAddress, e)
}
defer listener.Close()
@@ -394,3 +396,13 @@ func (v VolumeServerOptions) startTcpService(volumeServer *weed_server.VolumeSer
go volumeServer.HandleTcpConnection(c)
}
}
+
+func (v VolumeServerOptions) startUdpService(volumeServer *weed_server.VolumeServer) {
+ tftpServer := tftp.NewServer(volumeServer.UdpReadHandler, volumeServer.UdpWriteHandler)
+ listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port+20001)
+
+ glog.V(0).Infoln("Start Seaweed volume server", util.Version(), "UDP at", listeningAddress)
+ if e:= tftpServer.ListenAndServe(listeningAddress); e != nil {
+ glog.Fatalf("Volume server UDP on %s:%v", listeningAddress, e)
+ }
+}
diff --git a/weed/server/volume_server_udp_handlers.go b/weed/server/volume_server_udp_handlers.go
new file mode 100644
index 000000000..99c43e3f4
--- /dev/null
+++ b/weed/server/volume_server_udp_handlers.go
@@ -0,0 +1,65 @@
+package weed_server
+
+import (
+ "bytes"
+ "fmt"
+ "io"
+)
+
+func (vs *VolumeServer) UdpReadHandler(filename string, rf io.ReaderFrom) error {
+
+ volumeId, n, err := vs.parseFileId(filename)
+ if err != nil {
+ return err
+ }
+
+ hasVolume := vs.store.HasVolume(volumeId)
+ _, hasEcVolume := vs.store.FindEcVolume(volumeId)
+
+ if hasVolume {
+ if _, err = vs.store.ReadVolumeNeedle(volumeId, n, nil); err != nil {
+ return err
+ }
+ }
+ if hasEcVolume {
+ if _, err = vs.store.ReadEcShardNeedle(volumeId, n); err != nil {
+ return err
+ }
+ }
+
+ if _, err = rf.ReadFrom(bytes.NewBuffer(n.Data)); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (vs *VolumeServer) UdpWriteHandler(filename string, wt io.WriterTo) error {
+
+ if filename[0] == '-' {
+ return vs.handleTcpDelete(filename[1:])
+ }
+
+ volumeId, n, err := vs.parseFileId(filename)
+ if err != nil {
+ return err
+ }
+
+ volume := vs.store.GetVolume(volumeId)
+ if volume == nil {
+ return fmt.Errorf("volume %d not found", volumeId)
+ }
+
+ var buf bytes.Buffer
+ written, err := wt.WriteTo(&buf)
+ if err != nil {
+ return err
+ }
+
+ err = volume.StreamWrite(n, &buf, uint32(written))
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
diff --git a/weed/wdclient/volume_udp_client.go b/weed/wdclient/volume_udp_client.go
new file mode 100644
index 000000000..0b7e8cf6d
--- /dev/null
+++ b/weed/wdclient/volume_udp_client.go
@@ -0,0 +1,42 @@
+package wdclient
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/pin/tftp"
+ "io"
+)
+
+// VolumeTcpClient put/get/delete file chunks directly on volume servers without replication
+type VolumeUdpClient struct {
+ udpClient *tftp.Client
+}
+
+func NewVolumeUdpClient() *VolumeUdpClient {
+ return &VolumeUdpClient{
+ }
+}
+
+func (c *VolumeUdpClient) PutFileChunk(volumeServerAddress string, fileId string, fileSize uint32, fileReader io.Reader) (err error) {
+
+ udpAddress, parseErr := pb.ParseServerAddress(volumeServerAddress, 20001)
+ if parseErr != nil {
+ return parseErr
+ }
+
+ if c.udpClient == nil {
+ c.udpClient, err = tftp.NewClient(udpAddress)
+ if err != nil {
+ return
+ }
+ }
+ rf, err := c.udpClient.Send(fileId, "octet")
+ if err != nil {
+ return
+ }
+ _, err = rf.ReadFrom(fileReader)
+ if err != nil {
+ return
+ }
+
+ return
+}