aboutsummaryrefslogtreecommitdiff
path: root/weed/command
diff options
context:
space:
mode:
Diffstat (limited to 'weed/command')
-rw-r--r--weed/command/benchmark.go47
-rw-r--r--weed/command/server.go1
-rw-r--r--weed/command/volume.go32
3 files changed, 77 insertions, 3 deletions
diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go
index 9adcb6f33..326e0090c 100644
--- a/weed/command/benchmark.go
+++ b/weed/command/benchmark.go
@@ -7,6 +7,7 @@ import (
"io"
"math"
"math/rand"
+ "net"
"os"
"runtime"
"runtime/pprof"
@@ -41,7 +42,8 @@ type BenchmarkOptions struct {
maxCpu *int
grpcDialOption grpc.DialOption
masterClient *wdclient.MasterClient
- grpcRead *bool
+ readByGrpc *bool
+ readByTcp *bool
}
var (
@@ -66,7 +68,8 @@ func init() {
b.replication = cmdBenchmark.Flag.String("replication", "000", "replication type")
b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file")
b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
- b.grpcRead = cmdBenchmark.Flag.Bool("grpcRead", false, "use grpc API to read")
+ b.readByGrpc = cmdBenchmark.Flag.Bool("read.grpc", false, "use grpc API to read")
+ b.readByTcp = cmdBenchmark.Flag.Bool("read.tcp", false, "use tcp API to read")
sharedBytes = make([]byte, 1024)
}
@@ -283,7 +286,7 @@ func readFiles(fileIdLineChan chan string, s *stat) {
start := time.Now()
var bytesRead int
var err error
- if *b.grpcRead {
+ if *b.readByGrpc {
volumeServer, err := b.masterClient.LookupVolumeServer(fid)
if err != nil {
s.failed++
@@ -291,6 +294,15 @@ func readFiles(fileIdLineChan chan string, s *stat) {
continue
}
bytesRead, err = grpcFileGet(volumeServer, fid, b.grpcDialOption)
+ } else if *b.readByTcp {
+ volumeServer, err := b.masterClient.LookupVolumeServer(fid)
+ if err != nil {
+ s.failed++
+ println("!!!! ", fid, " location not found!!!!!")
+ continue
+ }
+ bytesRead, err = tcpFileGet(volumeServer, fid)
+
} else {
url, err := b.masterClient.LookupFileId(fid)
if err != nil {
@@ -336,6 +348,35 @@ func grpcFileGet(volumeServer, fid string, grpcDialOption grpc.DialOption) (byte
return
}
+func tcpFileGet(volumeServer, fid string) (bytesRead int, err error) {
+
+ err = operation.WithVolumeServerTcpConnection(volumeServer, func(conn net.Conn) error {
+ // println("requesting", fid, "...")
+ if err := util.WriteMessage(conn, &volume_server_pb.TcpRequestHeader{
+ Get: &volume_server_pb.FileGetRequest{FileId: fid},
+ }); err != nil {
+ return err
+ }
+
+ for {
+ resp := &volume_server_pb.FileGetResponse{}
+ // println("reading...")
+ respErr := util.ReadMessage(conn, resp)
+ if respErr != nil {
+ if respErr == io.EOF {
+ return nil
+ }
+ // println("err:", respErr.Error())
+ return respErr
+ }
+ // println("resp size", len(resp.Data))
+ bytesRead += len(resp.Data)
+ }
+ })
+
+ return
+}
+
func writeFileIds(fileName string, fileIdLineChan chan string, finishChan chan bool) {
file, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
diff --git a/weed/command/server.go b/weed/command/server.go
index 6aa68b6d2..3a3dd5426 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -91,6 +91,7 @@ func init() {
serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second")
serverOptions.v.fileSizeLimitMB = cmdServer.Flag.Int("volume.fileSizeLimitMB", 256, "limit file size to avoid out of memory")
serverOptions.v.publicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address")
+ serverOptions.v.enableTcp = cmdServer.Flag.Bool("volume.enableTcp", false, "[experimental] toggle tcp port, running on 20000 + port")
s3Options.filerBucketsPath = cmdServer.Flag.String("s3.filer.dir.buckets", "/buckets", "folder on filer to store all buckets")
s3Options.port = cmdServer.Flag.Int("s3.port", 8333, "s3 server http listen port")
diff --git a/weed/command/volume.go b/weed/command/volume.go
index 9d665d143..8caa8d92f 100644
--- a/weed/command/volume.go
+++ b/weed/command/volume.go
@@ -50,6 +50,7 @@ type VolumeServerOptions struct {
memProfile *string
compactionMBPerSecond *int
fileSizeLimitMB *int
+ enableTcp *bool // temporary toggle
}
func init() {
@@ -71,6 +72,7 @@ func init() {
v.memProfile = cmdVolume.Flag.String("memprofile", "", "memory profile output file")
v.compactionMBPerSecond = cmdVolume.Flag.Int("compactionMBps", 0, "limit background compaction or copying speed in mega bytes per second")
v.fileSizeLimitMB = cmdVolume.Flag.Int("fileSizeLimitMB", 256, "limit file size to avoid out of memory")
+ v.enableTcp = cmdVolume.Flag.Bool("enableTcp", false, "[experimental] toggle tcp port, running on 20000 + port")
}
var cmdVolume = &Command{
@@ -168,6 +170,10 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
// starting grpc server
grpcS := v.startGrpcService(volumeServer)
+ if v.enableTcp != nil && *v.enableTcp {
+ go v.startTcpServer(volumeServer)
+ }
+
// starting public http server
var publicHttpDown httpdown.Server
if v.isSeparatedPublicPort() {
@@ -245,6 +251,32 @@ func (v VolumeServerOptions) startGrpcService(vs volume_server_pb.VolumeServerSe
return grpcS
}
+func (v VolumeServerOptions) startTcpServer(vs *weed_server.VolumeServer) {
+ tcpPort := *v.port + 20000
+ tcpL, err := util.NewListener(*v.bindIp+":"+strconv.Itoa(tcpPort), 0)
+ if err != nil {
+ glog.Fatalf("failed to listen on tcp port %d: %v", tcpPort, err)
+ }
+ defer tcpL.Close()
+
+ for {
+ c, err := tcpL.Accept()
+ if err!= nil {
+ glog.V(0).Infof("accept tcp connection: %v", err)
+ continue
+ }
+ go func() {
+ for {
+ if err := vs.HandleTcpConnection(c); err != nil {
+ glog.V(0).Infof("handle tcp remote %s: %v", c.RemoteAddr(), err)
+ return
+ }
+ }
+
+ }()
+ }
+}
+
func (v VolumeServerOptions) startPublicHttpService(handler http.Handler) httpdown.Server {
publicListeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.publicPort)
glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "public at", publicListeningAddress)