diff options
Diffstat (limited to 'weed/command')
| -rw-r--r-- | weed/command/benchmark.go | 47 | ||||
| -rw-r--r-- | weed/command/server.go | 1 | ||||
| -rw-r--r-- | weed/command/volume.go | 32 |
3 files changed, 77 insertions, 3 deletions
diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go index 9adcb6f33..326e0090c 100644 --- a/weed/command/benchmark.go +++ b/weed/command/benchmark.go @@ -7,6 +7,7 @@ import ( "io" "math" "math/rand" + "net" "os" "runtime" "runtime/pprof" @@ -41,7 +42,8 @@ type BenchmarkOptions struct { maxCpu *int grpcDialOption grpc.DialOption masterClient *wdclient.MasterClient - grpcRead *bool + readByGrpc *bool + readByTcp *bool } var ( @@ -66,7 +68,8 @@ func init() { b.replication = cmdBenchmark.Flag.String("replication", "000", "replication type") b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file") b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") - b.grpcRead = cmdBenchmark.Flag.Bool("grpcRead", false, "use grpc API to read") + b.readByGrpc = cmdBenchmark.Flag.Bool("read.grpc", false, "use grpc API to read") + b.readByTcp = cmdBenchmark.Flag.Bool("read.tcp", false, "use tcp API to read") sharedBytes = make([]byte, 1024) } @@ -283,7 +286,7 @@ func readFiles(fileIdLineChan chan string, s *stat) { start := time.Now() var bytesRead int var err error - if *b.grpcRead { + if *b.readByGrpc { volumeServer, err := b.masterClient.LookupVolumeServer(fid) if err != nil { s.failed++ @@ -291,6 +294,15 @@ func readFiles(fileIdLineChan chan string, s *stat) { continue } bytesRead, err = grpcFileGet(volumeServer, fid, b.grpcDialOption) + } else if *b.readByTcp { + volumeServer, err := b.masterClient.LookupVolumeServer(fid) + if err != nil { + s.failed++ + println("!!!! ", fid, " location not found!!!!!") + continue + } + bytesRead, err = tcpFileGet(volumeServer, fid) + } else { url, err := b.masterClient.LookupFileId(fid) if err != nil { @@ -336,6 +348,35 @@ func grpcFileGet(volumeServer, fid string, grpcDialOption grpc.DialOption) (byte return } +func tcpFileGet(volumeServer, fid string) (bytesRead int, err error) { + + err = operation.WithVolumeServerTcpConnection(volumeServer, func(conn net.Conn) error { + // println("requesting", fid, "...") + if err := util.WriteMessage(conn, &volume_server_pb.TcpRequestHeader{ + Get: &volume_server_pb.FileGetRequest{FileId: fid}, + }); err != nil { + return err + } + + for { + resp := &volume_server_pb.FileGetResponse{} + // println("reading...") + respErr := util.ReadMessage(conn, resp) + if respErr != nil { + if respErr == io.EOF { + return nil + } + // println("err:", respErr.Error()) + return respErr + } + // println("resp size", len(resp.Data)) + bytesRead += len(resp.Data) + } + }) + + return +} + func writeFileIds(fileName string, fileIdLineChan chan string, finishChan chan bool) { file, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) if err != nil { diff --git a/weed/command/server.go b/weed/command/server.go index 6aa68b6d2..3a3dd5426 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -91,6 +91,7 @@ func init() { serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second") serverOptions.v.fileSizeLimitMB = cmdServer.Flag.Int("volume.fileSizeLimitMB", 256, "limit file size to avoid out of memory") serverOptions.v.publicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address") + serverOptions.v.enableTcp = cmdServer.Flag.Bool("volume.enableTcp", false, "[experimental] toggle tcp port, running on 20000 + port") s3Options.filerBucketsPath = cmdServer.Flag.String("s3.filer.dir.buckets", "/buckets", "folder on filer to store all buckets") s3Options.port = cmdServer.Flag.Int("s3.port", 8333, "s3 server http listen port") diff --git a/weed/command/volume.go b/weed/command/volume.go index 9d665d143..8caa8d92f 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -50,6 +50,7 @@ type VolumeServerOptions struct { memProfile *string compactionMBPerSecond *int fileSizeLimitMB *int + enableTcp *bool // temporary toggle } func init() { @@ -71,6 +72,7 @@ func init() { v.memProfile = cmdVolume.Flag.String("memprofile", "", "memory profile output file") v.compactionMBPerSecond = cmdVolume.Flag.Int("compactionMBps", 0, "limit background compaction or copying speed in mega bytes per second") v.fileSizeLimitMB = cmdVolume.Flag.Int("fileSizeLimitMB", 256, "limit file size to avoid out of memory") + v.enableTcp = cmdVolume.Flag.Bool("enableTcp", false, "[experimental] toggle tcp port, running on 20000 + port") } var cmdVolume = &Command{ @@ -168,6 +170,10 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v // starting grpc server grpcS := v.startGrpcService(volumeServer) + if v.enableTcp != nil && *v.enableTcp { + go v.startTcpServer(volumeServer) + } + // starting public http server var publicHttpDown httpdown.Server if v.isSeparatedPublicPort() { @@ -245,6 +251,32 @@ func (v VolumeServerOptions) startGrpcService(vs volume_server_pb.VolumeServerSe return grpcS } +func (v VolumeServerOptions) startTcpServer(vs *weed_server.VolumeServer) { + tcpPort := *v.port + 20000 + tcpL, err := util.NewListener(*v.bindIp+":"+strconv.Itoa(tcpPort), 0) + if err != nil { + glog.Fatalf("failed to listen on tcp port %d: %v", tcpPort, err) + } + defer tcpL.Close() + + for { + c, err := tcpL.Accept() + if err!= nil { + glog.V(0).Infof("accept tcp connection: %v", err) + continue + } + go func() { + for { + if err := vs.HandleTcpConnection(c); err != nil { + glog.V(0).Infof("handle tcp remote %s: %v", c.RemoteAddr(), err) + return + } + } + + }() + } +} + func (v VolumeServerOptions) startPublicHttpService(handler http.Handler) httpdown.Server { publicListeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.publicPort) glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "public at", publicListeningAddress) |
