aboutsummaryrefslogtreecommitdiff
path: root/weed/command
diff options
context:
space:
mode:
Diffstat (limited to 'weed/command')
-rw-r--r--weed/command/benchmark.go24
-rw-r--r--weed/command/volume.go16
2 files changed, 37 insertions, 3 deletions
diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go
index af0793c70..d40a6cfe0 100644
--- a/weed/command/benchmark.go
+++ b/weed/command/benchmark.go
@@ -42,6 +42,7 @@ type BenchmarkOptions struct {
masterClient *wdclient.MasterClient
fsync *bool
useTcp *bool
+ useUdp *bool
}
var (
@@ -68,7 +69,8 @@ func init() {
b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file")
b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
b.fsync = cmdBenchmark.Flag.Bool("fsync", false, "flush data to disk after write")
- b.useTcp = cmdBenchmark.Flag.Bool("useTcp", false, "send data via tcp")
+ b.useTcp = cmdBenchmark.Flag.Bool("useTcp", false, "write data via tcp")
+ b.useUdp = cmdBenchmark.Flag.Bool("useUdp", false, "write data via udp")
sharedBytes = make([]byte, 1024)
}
@@ -226,6 +228,7 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
random := rand.New(rand.NewSource(time.Now().UnixNano()))
volumeTcpClient := wdclient.NewVolumeTcpClient()
+ volumeUdpClient := wdclient.NewVolumeUdpClient()
for id := range idChan {
start := time.Now()
@@ -255,6 +258,14 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
} else {
s.failed++
}
+ } else if *b.useUdp {
+ if uploadByUdp(volumeUdpClient, fp) {
+ fileIdLineChan <- fp.Fid
+ s.completed++
+ s.transferred += fileSize
+ } else {
+ s.failed++
+ }
} else if _, err := fp.Upload(0, b.masterClient.GetMaster, false, assignResult.Auth, b.grpcDialOption); err == nil {
if random.Intn(100) < *b.deletePercentage {
s.total++
@@ -352,6 +363,17 @@ func uploadByTcp(volumeTcpClient *wdclient.VolumeTcpClient, fp *operation.FilePa
return true
}
+func uploadByUdp(volumeUdpClient *wdclient.VolumeUdpClient, fp *operation.FilePart) bool {
+
+ err := volumeUdpClient.PutFileChunk(fp.Server, fp.Fid, uint32(fp.FileSize), fp.Reader)
+ if err != nil {
+ glog.Errorf("upload chunk err: %v", err)
+ return false
+ }
+
+ return true
+}
+
func readFileIds(fileName string, fileIdLineChan chan string) {
file, err := os.Open(fileName) // For read access.
if err != nil {
diff --git a/weed/command/volume.go b/weed/command/volume.go
index 0e8224dbc..002227a10 100644
--- a/weed/command/volume.go
+++ b/weed/command/volume.go
@@ -29,6 +29,7 @@ import (
stats_collect "github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/pin/tftp"
)
var (
@@ -256,6 +257,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
// starting tcp server
if *v.enableTcp {
go v.startTcpService(volumeServer)
+ go v.startUdpService(volumeServer)
}
// starting the cluster http server
@@ -378,10 +380,10 @@ func (v VolumeServerOptions) startClusterHttpService(handler http.Handler) httpd
func (v VolumeServerOptions) startTcpService(volumeServer *weed_server.VolumeServer) {
listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port+20000)
- glog.V(0).Infoln("Start Seaweed volume server", util.Version(), "tcp at", listeningAddress)
+ glog.V(0).Infoln("Start Seaweed volume server", util.Version(), "TCP at", listeningAddress)
listener, e := util.NewListener(listeningAddress, 0)
if e != nil {
- glog.Fatalf("Volume server listener error on %s:%v", listeningAddress, e)
+ glog.Fatalf("Volume server TCP on %s:%v", listeningAddress, e)
}
defer listener.Close()
@@ -394,3 +396,13 @@ func (v VolumeServerOptions) startTcpService(volumeServer *weed_server.VolumeSer
go volumeServer.HandleTcpConnection(c)
}
}
+
+func (v VolumeServerOptions) startUdpService(volumeServer *weed_server.VolumeServer) {
+ tftpServer := tftp.NewServer(volumeServer.UdpReadHandler, volumeServer.UdpWriteHandler)
+ listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port+20001)
+
+ glog.V(0).Infoln("Start Seaweed volume server", util.Version(), "UDP at", listeningAddress)
+ if e:= tftpServer.ListenAndServe(listeningAddress); e != nil {
+ glog.Fatalf("Volume server UDP on %s:%v", listeningAddress, e)
+ }
+}