aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-03-18 22:02:40 -0700
committerChris Lu <chris.lu@gmail.com>2021-03-18 22:02:40 -0700
commitc012902c16878613c30707f9b83c66890c753eb0 (patch)
treeaf782d7b520783804bccab091f8dc578186315be
parent5495497876a29f26fcc212393d938593a4572ca9 (diff)
downloadseaweedfs-c012902c16878613c30707f9b83c66890c753eb0.tar.xz
seaweedfs-c012902c16878613c30707f9b83c66890c753eb0.zip
testing udporigin/test_udp
This reverts commit 69694a17be1ef8817d9afb3a1265e605298e7ab3.
-rw-r--r--weed/command/volume.go11
-rw-r--r--weed/server/volume_server_udp_handlers.go4
-rw-r--r--weed/wdclient/penet/penet.go1048
-rw-r--r--weed/wdclient/volume_udp_client.go40
4 files changed, 1067 insertions, 36 deletions
diff --git a/weed/command/volume.go b/weed/command/volume.go
index ab3c63a9a..eb89027ab 100644
--- a/weed/command/volume.go
+++ b/weed/command/volume.go
@@ -3,7 +3,7 @@ package command
import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/storage/types"
- "github.com/chrislusf/seaweedfs/weed/udptransfer"
+ "github.com/chrislusf/seaweedfs/weed/wdclient/penet"
"net"
"net/http"
httppprof "net/http/pprof"
@@ -401,13 +401,7 @@ func (v VolumeServerOptions) startTcpService(volumeServer *weed_server.VolumeSer
func (v VolumeServerOptions) startUdpService(volumeServer *weed_server.VolumeServer) {
listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port+20001)
- listener, err := udptransfer.NewEndpoint(&udptransfer.Params{
- LocalAddr: listeningAddress,
- Bandwidth: 100,
- FastRetransmit: true,
- FlatTraffic: true,
- IsServ: true,
- })
+ listener, err := penet.Listen("", listeningAddress)
if err != nil {
glog.Fatalf("Volume server listen on %s:%v", listeningAddress, err)
}
@@ -416,7 +410,6 @@ func (v VolumeServerOptions) startUdpService(volumeServer *weed_server.VolumeSer
for {
conn, err := listener.Accept()
if err == nil {
- glog.V(0).Infof("Client from %s", conn.RemoteAddr())
go volumeServer.HandleUdpConnection(conn)
} else if isTemporaryError(err) {
continue
diff --git a/weed/server/volume_server_udp_handlers.go b/weed/server/volume_server_udp_handlers.go
index 974ec78d2..cd6cb5e6e 100644
--- a/weed/server/volume_server_udp_handlers.go
+++ b/weed/server/volume_server_udp_handlers.go
@@ -10,8 +10,6 @@ import (
func (vs *VolumeServer) HandleUdpConnection(c net.Conn) {
defer c.Close()
- glog.V(0).Infof("Serving writes from %s", c.RemoteAddr().String())
-
bufReader := bufio.NewReaderSize(c, 1024*1024)
bufWriter := bufio.NewWriterSize(c, 1024*1024)
@@ -19,7 +17,7 @@ func (vs *VolumeServer) HandleUdpConnection(c net.Conn) {
cmd, err := bufReader.ReadString('\n')
if err != nil {
if err != io.EOF {
- glog.Errorf("read command from %s: %v", c.RemoteAddr().String(), err)
+ // glog.Errorf("read command from: %v", err)
}
return
}
diff --git a/weed/wdclient/penet/penet.go b/weed/wdclient/penet/penet.go
new file mode 100644
index 000000000..9aa00cc7d
--- /dev/null
+++ b/weed/wdclient/penet/penet.go
@@ -0,0 +1,1048 @@
+package penet
+
+import (
+ "container/list"
+ crand "crypto/rand"
+ "encoding/binary"
+ "errors"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ mrand "math/rand"
+ "net"
+ "runtime"
+ "runtime/debug"
+ "sync"
+ "time"
+)
+
+type DataSend struct {
+ Seq uint32
+ Acked bool
+ Resend byte
+ Fast bool
+ Code byte
+ Time uint32
+ Data []byte
+}
+
+type DataRecv struct {
+ Seq uint32
+ AckCnt byte
+ Code byte
+ Data []byte
+}
+
+type UdpSend struct {
+ Id uint64
+ sock *net.UDPConn
+ remote *net.UDPAddr
+ seq uint32
+ rtt float64
+ rttMax float64
+ rttMin float64
+ rate uint32
+ mss uint32
+ interval uint32
+ data []DataSend
+ dataBegin int
+ dataLen int
+ sendRnd int
+ sendList *list.List
+ sendListLock sync.Mutex
+ writable chan bool
+ writeMax int
+ isClose bool
+ closing bool
+ opening byte
+ conn *Conn
+ resendCnt int
+ name string
+ acked uint32
+ recvWnd uint32
+}
+
+const (
+ TypeData uint8 = 1
+ TypeAck uint8 = 2
+ TypeClose uint8 = 3
+ TypeSYN uint8 = 8
+)
+
+var (
+ ErrClose = errors.New("conn close")
+ ErrTimeout = errors.New("conn timeout")
+
+ mss uint32 = 1200
+ defaultRate = mss * 3000
+ dropRate = 0.0
+ writeMaxSep = 5
+ resendLimit = true
+)
+
+func NewUdpSend(conn *Conn, id uint64, sock *net.UDPConn, remote *net.UDPAddr, name string) *UdpSend {
+ u := &UdpSend{
+ conn: conn,
+ Id: id,
+ sock: sock,
+ remote: remote,
+ mss: mss,
+ interval: 20,
+ data: make([]DataSend, 4),
+ seq: 1,
+ rate: defaultRate, // 修复bug: 太高导致压测的时候内存爆炸,速度慢
+ rtt: 200,
+ rttMax: 200,
+ rttMin: 200,
+ sendList: list.New(),
+ writable: make(chan bool, 1),
+ name: name,
+ }
+ u.writeMax = int(u.rate/u.mss) / writeMaxSep // fixed bug: 初始化 修复bug: 写入太多,应该一点点写
+ u.recvWnd = uint32(u.writeMax)
+ return u
+}
+
+func (u *UdpSend) send(nowTime time.Time, buf []byte) {
+ var sendMax = int(u.rate / u.mss)
+ u.writeMax = sendMax / writeMaxSep
+ // glog.V(4).Info("send", u.dataLen, u.name, sendMax, uint32(u.rtt))
+ var sendCount = uint32(u.rate / u.mss / (1000 / u.interval))
+ var sendTotal = sendCount
+ if sendCount > u.recvWnd/5 {
+ sendCount = u.recvWnd / 5
+ }
+ if sendCount <= 0 {
+ sendCount = 1
+ }
+
+ now := uint32(nowTime.UnixNano() / int64(time.Millisecond))
+ resendCnt := 0
+ for i := 0; i < u.dataLen; i++ {
+ if sendCount <= 0 {
+ break
+ }
+ index := (i + u.dataBegin) % len(u.data)
+ d := &u.data[index]
+ if d.Acked == false && now-d.Time >= uint32(u.rtt*2.0) {
+ glog.V(4).Infof("resend, seq:%v id:%v rtt:%v name:%v", d.Seq, u.Id, uint32(u.rtt), u.name)
+ // data包: type0 flag1 id2 len3 seq4 tm5 sndwnd6
+ // 发送窗口就是配置值,其实发送窗口不需要发送出去
+ // 接收端能发送了,要告诉对面开始发送。这个逻辑可以通过接受端的发送通道来发送。
+ headLen := structPack(buf, "BBQHIII", uint8(TypeData),
+ d.Code, uint64(u.Id), uint16(len(d.Data)+4+4+4), d.Seq, uint32(now), uint32(sendMax))
+ copy(buf[headLen:], d.Data)
+ u.sock.WriteToUDP(buf[:headLen+len(d.Data)], u.remote)
+ d.Time = now
+ d.Resend++
+ d.Fast = false
+ sendCount--
+ resendCnt++
+ u.resendCnt++
+ }
+
+ }
+
+ u.sendRnd++
+ if u.sendRnd > 100 && u.dataLen > 0 {
+ u.sendRnd = 0
+
+ var resendMax = uint32(float64(u.dataLen) * 0.6)
+ if u.resendCnt > 2 {
+ glog.V(0).Infof("resendCnt:%v resendMax:%v remain:%v rtt:%v id:%v", u.resendCnt,
+ resendMax, u.dataLen, uint32(u.rtt), u.Id)
+ u.resendCnt = 0
+ }
+ // 修复bug: 之前使用lastrecvtime,存在新增发送数据数据那瞬间,出现超时情况
+ if u.data[u.dataBegin].Resend > 10 {
+ glog.V(0).Infof("resend too much, close, id:%v name:%v drop seq:%v remain:%v",
+ u.Id, u.name, u.data[u.dataBegin].Seq, u.dataLen)
+ u.conn.close(true, true)
+ }
+ }
+
+ var resendToomuch bool
+ if uint32(resendCnt) > sendTotal/3 && resendLimit { // 修复bug: 限速
+ resendToomuch = true
+ glog.V(4).Infof("resend too much, slow, %v %v", resendCnt, sendTotal)
+ }
+
+ u.sendListLock.Lock()
+ for ; sendCount > 0; sendCount-- {
+ sendData := u.sendList.Front()
+ if sendData == nil {
+ break
+ }
+
+ if sendMax-u.dataLen <= 0 || resendToomuch {
+ break
+ }
+ if u.dataLen >= len(u.data) {
+ newData := make([]DataSend, 2*len(u.data))
+ copy(newData, u.data[u.dataBegin:])
+ copy(newData[len(u.data)-u.dataBegin:], u.data[:u.dataBegin])
+ u.dataBegin = 0
+ u.data = newData
+ }
+
+ u.sendList.Remove(sendData)
+ hType := uint8(0)
+ sdata, ok := sendData.Value.([]byte)
+ if !ok {
+ // 发送控制消息
+ mesCode, _ := sendData.Value.(byte)
+ hType = mesCode
+ }
+ headLen := structPack(buf, "BBQHIII", uint8(TypeData), hType,
+ uint64(u.Id), uint16(len(sdata)+4+4+4), u.seq, uint32(now), uint32(sendMax))
+ copy(buf[headLen:], sdata)
+ u.sock.WriteToUDP(buf[:headLen+len(sdata)], u.remote)
+ glog.V(4).Infof("send, seq:%v id:%v %v", u.seq, u.Id, u.name)
+
+ index := (u.dataBegin + u.dataLen) % len(u.data)
+ u.data[index] = DataSend{
+ Seq: u.seq,
+ Time: now,
+ Code: hType,
+ Data: sdata,
+ }
+ u.seq++
+ u.dataLen++
+ }
+ listRemain := u.writeMax - u.sendList.Len()
+ u.sendListLock.Unlock()
+
+ if listRemain > 0 {
+ select {
+ case u.writable <- !u.isClose:
+ default:
+ }
+ }
+
+ if u.isClose && u.dataLen == 0 && u.closing == false { // 修复bug: 之前5秒删除,实际发送没完成
+ // 修复bug: 接受端write端close为true, 导致5秒后呗删除,来不及接受数据。
+ glog.V(1).Infof("close and emtpy, rm conn, id:%v name:%v seq:%v listlen:%v",
+ u.Id, u.name, u.seq, u.sendList.Len())
+ u.closing = true
+ u.conn.close(false, true)
+ }
+}
+
+func testDrop() bool {
+ if dropRate < 0.01 {
+ return false
+ }
+ var v uint32
+ var b [4]byte
+ if _, err := crand.Read(b[:]); err != nil {
+ v = mrand.Uint32()
+ } else {
+ v = binary.BigEndian.Uint32(b[:])
+ }
+ if v%1000 < uint32(dropRate*1000) {
+ return true
+ }
+ return false
+}
+
+func (u *UdpSend) recv(buf []byte) {
+
+ if testDrop() {
+ return
+ }
+
+ now := uint32(time.Now().UnixNano() / int64(time.Millisecond))
+
+ // ack包: type0 flag1 id2 len3 tm4 rcvwnd5 acked6
+ head, headLen := structUnPack(buf, "BBQHIII")
+ if head[0] == uint64(TypeAck) && u.dataLen > 0 {
+
+ firstSeq := u.data[u.dataBegin].Seq
+ acked := uint32(head[6])
+ offset := int(int32(acked - firstSeq))
+ glog.V(4).Infof("id:%v recv ack: %v offset:%v databegin:%v dataLen:%v firstSeq:%v name:%v",
+ u.Id, acked, offset, u.dataBegin, u.dataLen, firstSeq, u.name)
+ if offset >= 0 && offset < u.dataLen {
+ offset++
+ for i := 0; i < offset; i++ { // 修复bug: 清除引用等
+ index := (u.dataBegin + i) % len(u.data) // 修复bug,i写成offset
+ d := &u.data[index]
+ d.Data = nil
+ d.Acked = true
+ }
+ u.acked += uint32(offset)
+ u.dataBegin += offset
+ u.dataBegin = u.dataBegin % len(u.data)
+ u.dataLen -= offset
+ glog.V(4).Infof("2 acked ok:%v, id:%v %v", u.acked, u.Id, u.name)
+ }
+ if u.dataLen > 0 {
+ curSeq := u.data[u.dataBegin].Seq
+ // var ackedSeq = []uint32{}
+ for i := headLen; i < len(buf); i += 4 {
+ seq := binary.BigEndian.Uint32(buf[i:])
+ offset := int(int32(seq - curSeq))
+ if offset < 0 || offset >= u.dataLen {
+ continue
+ }
+ index := (u.dataBegin + offset) % len(u.data)
+ d := &u.data[index]
+ if d.Seq == seq {
+ d.Acked = true
+ d.Data = nil // 修复bug: memory leak
+ // ackedSeq = append(ackedSeq, seq)
+ } else {
+ panic("index not correct")
+ }
+ }
+ // if len(ackedSeq) > 0 {
+ // glog.V(0).Info("seq:", ackedSeq)
+ // }
+ }
+
+ var i = 0
+ for ; i < u.dataLen; i++ {
+ index := (i + u.dataBegin) % len(u.data)
+ d := &u.data[index]
+ if d.Acked == false {
+ break
+ }
+ // fmt.Println("acked->", d.Seq)
+ // if d.Seq == 7 {
+ // fmt.Println("data:", u.data[u.dataBegin:u.dataBegin+5])
+ // }
+ }
+ if i > 0 {
+ u.acked += uint32(i)
+ u.dataBegin += i
+ u.dataBegin = u.dataBegin % len(u.data)
+ u.dataLen -= i
+ glog.V(4).Infof("3 acked ok:%v, id:%v %v %v", u.acked, u.Id, u.dataBegin, u.name)
+ }
+
+ sendTime := uint32(head[4])
+ rtt := now - sendTime
+ if rtt > 0 {
+ if firstSeq < 3 {
+ u.rtt = float64(rtt) // 初始值
+ } else {
+ u.rtt = u.rtt*0.8 + float64(rtt)*0.2
+ }
+ if u.rtt < 50.0 {
+ u.rtt = 50
+ }
+ // glog.V(4).Infof("rtt:%v u.rtt:%v id:%v", rtt, u.rtt, u.Id)
+ }
+
+ u.recvWnd = uint32(head[5])
+ }
+
+}
+
+func structPack(b []byte, format string, param ...interface{}) int {
+ j := 0
+ for i, s := range format {
+ switch s {
+ case 'I':
+ p, _ := param[i].(uint32)
+ binary.BigEndian.PutUint32(b[j:], p)
+ j += 4
+ case 'B':
+ p, _ := param[i].(uint8)
+ b[j] = p
+ j++
+ case 'H':
+ p, _ := param[i].(uint16)
+ binary.BigEndian.PutUint16(b[j:], p)
+ j += 2
+ case 'Q':
+ p, _ := param[i].(uint64)
+ binary.BigEndian.PutUint64(b[j:], p)
+ j += 8
+ default:
+ panic("structPack not found")
+ }
+ }
+ return j
+}
+
+func structUnPack(b []byte, format string) ([]uint64, int) {
+ var re = make([]uint64, 0, len(format))
+ defer func() {
+ if err := recover(); err != nil {
+ // log.Error(err)
+ re[0] = 0
+ }
+ }()
+ j := 0
+ for _, s := range format {
+ switch s {
+ case 'I':
+ re = append(re, uint64(binary.BigEndian.Uint32(b[j:])))
+ j += 4
+ case 'B':
+ re = append(re, uint64(b[j]))
+ j++
+ case 'H':
+ re = append(re, uint64(binary.BigEndian.Uint16(b[j:])))
+ j += 2
+ case 'Q':
+ re = append(re, uint64(binary.BigEndian.Uint64(b[j:])))
+ j += 8
+ default:
+ panic("structUnPack not found")
+ }
+ }
+ return re, j
+}
+
+type UdpRecv struct {
+ conn *Conn
+ Id uint64
+ sock *net.UDPConn
+ remote *net.UDPAddr
+ acked uint32
+ lastTm uint32
+ sndWnd uint32
+ recvCnt uint32
+ isClose bool
+ isNew byte
+ isRecved bool
+ recvList *list.List
+ recvListLock sync.Mutex
+ readable chan byte
+ readDeadline *time.Time
+ seqData map[uint32]*DataRecv
+ dataList *list.List
+ name string
+}
+
+func NewUdpRecv(conn *Conn, id uint64, sock *net.UDPConn, remote *net.UDPAddr, name string) *UdpRecv {
+ return &UdpRecv{
+ Id: id,
+ conn: conn,
+ sock: sock,
+ remote: remote,
+ acked: 0,
+ recvList: list.New(),
+ dataList: list.New(),
+ seqData: make(map[uint32]*DataRecv),
+ readable: make(chan byte, 1),
+ isNew: 50,
+ name: name,
+ sndWnd: 1000,
+ }
+}
+
+func (u *UdpRecv) SetReadDeadline(t time.Time) {
+ u.readDeadline = &t
+}
+
+func (u *UdpRecv) sendAck(nowTime time.Time, buf []byte) {
+
+ u.recvListLock.Lock()
+ for {
+ if d, ok := u.seqData[u.acked+1]; ok {
+ u.acked++
+ if d.Code == TypeClose {
+ if u.isClose == false {
+ glog.V(1).Info("recv close:", u.Id)
+ u.conn.close(false, true)
+ u.isClose = true
+ }
+ } else {
+ u.recvList.PushBack(d.Data)
+ }
+ d.Data = nil
+ delete(u.seqData, u.acked)
+ } else {
+ break
+ }
+ }
+ recvListLen := u.recvList.Len()
+ u.recvListLock.Unlock()
+
+ // glog.V(4).Info("acked:", u.acked, u.Id, recvListLen, u.name)
+ if recvListLen > 0 { // 修复bug,用标志可能会有读不到的数据 修复bug: 去掉 && u.isClose == false,导致读取延迟
+ select {
+ case u.readable <- 1:
+ default:
+ }
+ } else if u.isClose == true { // 修复bug:快速close
+ select {
+ case u.readable <- 0:
+ default:
+ }
+ }
+ if u.readDeadline != nil && !u.readDeadline.IsZero() {
+ if u.readDeadline.Before(nowTime) { // 修复bug after
+ select {
+ case u.readable <- 2:
+ glog.V(0).Info("read dealline: ", u.Id)
+ default:
+ }
+ }
+ }
+
+ var b = buf[:mss]
+ buf = buf[mss:]
+ var n = 0
+ for i := u.dataList.Front(); i != nil; {
+ d := i.Value.(*DataRecv)
+
+ next := i.Next()
+ if d.AckCnt > 6 || before(d.Seq, u.acked+1) { // 发几次够了,不反复发
+ // d.Removed = true
+ u.dataList.Remove(i) // 修复bug,删除逻辑不对,需要保存next
+ // delete(u.seqData, d.Seq) // 修复bug: 对面已经确认,这里删了,没有重发,也没有了数据。已经收到的数据并且发了ack的数据,不要删了!
+ i = next
+ continue
+ }
+ i = next
+
+ if d.AckCnt%3 == 0 {
+ binary.BigEndian.PutUint32(b[n:], d.Seq)
+ n += 4
+ if n >= len(b) {
+ wnd := int(u.sndWnd) - recvListLen - len(u.seqData)
+ if wnd < 0 {
+ wnd = 0
+ }
+ headLen := structPack(buf, "BBQHIII", uint8(TypeAck), uint8(0), uint64(u.Id), uint16(4+4+4+n),
+ u.lastTm, uint32(wnd), u.acked)
+ copy(buf[headLen:], b[:n])
+ u.sock.WriteToUDP(buf[:headLen+n], u.remote)
+ glog.V(4).Infof("id:%v send ack n:%v", u.Id, n)
+ n = 0 // 修复bug,没有置零
+ u.isRecved = false // 修复bug: 有时候发多一条数据
+ }
+ }
+ d.AckCnt++
+ }
+ if n > 0 || u.isRecved { // 修复bug: 一直发数据 修复bug: 有时候发多一条数据
+ wnd := int(u.sndWnd) - recvListLen - len(u.seqData)
+ if wnd < 0 {
+ wnd = 0
+ }
+ headLen := structPack(buf, "BBQHIII", uint8(TypeAck), uint8(0), uint64(u.Id), uint16(4+4+4+n),
+ u.lastTm, uint32(wnd), u.acked)
+ copy(buf[headLen:], b[:n])
+ u.sock.WriteToUDP(buf[:headLen+n], u.remote)
+ glog.V(4).Infof("id:%v send ack n:%v datalen:%v", u.Id, n, u.dataList.Len())
+ }
+ u.isRecved = false
+
+ // 修复bug: 如果自己只是发数据,那么自己的acked通道会没用到。所以要判断自己是否在发送数据。
+ if u.isNew > 0 { // 完成的优化:超时不确认第一包,就删除链接。防止旧链接不断发包。
+ u.isNew--
+ if u.isNew == 0 && u.acked == 0 {
+ glog.V(0).Infof("not recv first packet!!! close, id:%v name:%v", u.Id, u.name)
+ u.conn.Close()
+ }
+ if u.acked >= 1 || u.conn.s.seq > 1 {
+ u.isNew = 0
+ }
+ }
+
+}
+
+// before seq1比seq2小
+func before(seq1, seq2 uint32) bool {
+ return (int32)(seq1-seq2) < 0
+}
+
+func after(seq1, seq2 uint32) bool {
+ return (int32)(seq2-seq1) < 0
+}
+
+func (u *UdpRecv) recv(buf []byte) {
+
+ if testDrop() {
+ return
+ }
+
+ u.isRecved = true
+ u.recvCnt++
+ // data包: type0 flag1 id2 len3 seq4 tm5 sndwnd6
+ head, headLen := structUnPack(buf, "BBQHIII")
+ if head[0] == uint64(TypeData) {
+
+ seq := uint32(head[4])
+ u.lastTm = uint32(head[5])
+ u.sndWnd = uint32(head[6])
+
+ glog.V(4).Info(u.Id, " recv seq: ", seq, " len: ", u.dataList.Len())
+
+ if before(seq, u.acked+1) { // 修复bug: 收到before的数据,seqData不会回收。
+ // glog.V(0).Info("seq before u.acked:", seq, u.acked, u.Id)
+ return
+ }
+
+ // 修复bug: 修复没重发问题。之前由于一直会重发,这个逻辑有意义。现在只重发几次。
+ // if d, ok := u.seqData[seq]; ok {
+ // d.AckCnt = 0
+ // if d.Removed == false { // 修复bug:可能没ack导致一直重发,要直到acked全部覆盖。
+ // return
+ // }
+ // }
+
+ // glog.V(4).Info(u.Id, " recv 2 seq: ", seq, " len: ", u.dataList.Len())
+
+ d := &DataRecv{
+ Seq: seq,
+ Data: buf[headLen:],
+ Code: byte(head[1]),
+ }
+ u.dataList.PushBack(d)
+ u.seqData[seq] = d
+
+ }
+
+}
+
+func SetRate(rate uint32) {
+ defaultRate = rate
+}
+
+func SetDropRate(rate float64) {
+ if rate > 0.001 {
+ dropRate = rate
+ }
+}
+
+type Conn struct {
+ Id uint64
+ s *UdpSend
+ r *UdpRecv
+ responsed chan bool
+ isClose bool
+ isSendClose bool
+ isRmConn bool
+ conns *Conns
+}
+
+func NewConn(conns *Conns, Id uint64, localConn *net.UDPConn, remote *net.UDPAddr, name string) *Conn {
+ conn := &Conn{
+ Id: Id,
+ conns: conns,
+ responsed: make(chan bool, 1),
+ }
+ conn.s = NewUdpSend(conn, conn.Id, localConn, remote, name)
+ conn.r = NewUdpRecv(conn, conn.Id, localConn, remote, name)
+ return conn
+}
+
+func (c *Conn) Write(bb []byte) (n int, err error) {
+
+ if c.isClose {
+ return 0, ErrClose
+ }
+
+ // 1.对方告诉你满了,是要叫你不要发数据了,而不是还发数据。
+ // 2.没有窗口就没法快速地确定对面有没有满,如果你要等到自己的的buffer满,那么可能会比较慢感知到
+ // 固定buffer + 停止通知
+ b := make([]byte, len(bb)) // 修复bug1:没有拷贝
+ copy(b, bb)
+ for {
+ c.s.sendListLock.Lock()
+ remain := c.s.writeMax - c.s.sendList.Len()
+ for {
+ if len(b) <= 0 {
+ break
+ }
+ if remain <= 0 {
+ break
+ }
+ remain--
+ var sendLen = int(mss)
+ if len(b) < sendLen {
+ // 实际发送值
+ sendLen = len(b)
+ }
+ n += sendLen
+ c.s.sendList.PushBack(b[:sendLen])
+ // fmt.Println("write:", b[:10])
+ b = b[sendLen:]
+ }
+ c.s.sendListLock.Unlock()
+ if len(b) <= 0 {
+ break
+ }
+ // glog.V(0).Info("wait write: ", c.Id)
+ w := <-c.s.writable
+ if w == false {
+ return n, ErrClose
+ }
+ }
+
+ return n, nil
+}
+
+func (c *Conn) Read(b []byte) (n int, err error) {
+ for {
+ c.r.recvListLock.Lock()
+ for {
+ f := c.r.recvList.Front()
+ if f != nil {
+ data := f.Value.([]byte)
+ copy(b[n:], data)
+ maxCap := len(b[n:])
+ if maxCap < len(data) {
+ // b已满
+ f.Value = data[maxCap:]
+ n += maxCap
+ break
+ } else {
+ // b未满
+ c.r.recvList.Remove(f)
+ n += len(data)
+ }
+ } else {
+ // 读完数据了
+ break
+ }
+ }
+ c.r.recvListLock.Unlock()
+ if n <= 0 {
+ // glog.V(4).Info("wait read", c.Id)
+ // wait for chan
+ r := <-c.r.readable
+ if r == 0 { // close之后总是返回初始值
+ c.r.recvListLock.Lock()
+ rlen := c.r.recvList.Len()
+ c.r.recvListLock.Unlock()
+ if rlen <= 0 { // 修复bug: 等到read完所有数据才让read返回错误
+ return n, ErrClose
+ }
+ }
+ if r == 2 {
+ return n, ErrTimeout
+ }
+ } else {
+ break
+ }
+ }
+
+ return
+}
+
+func (c *Conn) LocalAddr() net.Addr {
+ return c.conns.sock.LocalAddr()
+}
+
+func (c *Conn) RemoteAddr() net.Addr {
+ return c.conns.sock.RemoteAddr()
+}
+
+func (c *Conn) SetDeadline(t time.Time) error {
+ c.r.SetReadDeadline(t)
+ return nil
+}
+
+func (c *Conn) SetReadDeadline(t time.Time) error {
+ c.r.SetReadDeadline(t)
+ return nil
+}
+
+func (c *Conn) SetWriteDeadline(t time.Time) error {
+ return nil
+}
+
+func (c *Conn) close(sendClose, rmConn bool) {
+ if c.isClose == false {
+ c.isClose = true
+ c.r.isClose = true
+ }
+ if sendClose && c.isSendClose == false { // 修复bug:
+ c.isSendClose = true
+ c.s.sendListLock.Lock()
+ c.s.sendList.PushBack(byte(TypeClose))
+ c.s.sendListLock.Unlock()
+ }
+ if rmConn && c.isRmConn == false {
+ c.isRmConn = true
+ time.AfterFunc(time.Second*5, func() {
+ select {
+ case c.conns.input <- Input{
+ typ: ActRmConn,
+ param: c,
+ }:
+ default:
+ }
+ })
+ }
+}
+
+func (c *Conn) Close() error {
+ if c.isClose == false {
+ c.isClose = true
+ c.s.isClose = true
+ // bug: 接收不能主动关闭
+ c.isSendClose = true
+ c.s.sendListLock.Lock()
+ c.s.sendList.PushBack(byte(TypeClose))
+ c.s.sendListLock.Unlock()
+ // bug: close之后,5秒数据可能无法完成发送
+ }
+ return nil
+}
+
+type Conns struct {
+ conns map[uint64]*Conn
+ sock *net.UDPConn
+ accept chan *Conn
+ isClose bool
+ isDial bool
+ timerRnd uint32
+ input chan Input
+}
+
+func NewConns() *Conns {
+ return &Conns{
+ conns: make(map[uint64]*Conn),
+ accept: make(chan *Conn, 256),
+ input: make(chan Input, 2048),
+ }
+}
+
+func Listen(network, address string) (net.Listener, error) {
+ addr, err := net.ResolveUDPAddr("udp", address)
+ if err != nil {
+ return nil, err
+ }
+ conn, err := net.ListenUDP("udp", addr)
+ if err != nil {
+ return nil, err
+ }
+ listener := NewConns()
+ listener.sock = conn
+ go listener.loop()
+ return listener, nil
+}
+
+var dialConns *Conns
+var dialConnsLock sync.Mutex
+
+func Dial(network, address string) (net.Conn, error) {
+ return DialTimeout(network, address, time.Second*3)
+}
+
+func DialTimeout(network, address string, timeout time.Duration) (net.Conn, error) {
+ addr, err := net.ResolveUDPAddr("udp", address)
+ if err != nil {
+ return nil, err
+ }
+ var b [8]byte
+ if _, err := crand.Read(b[:]); err != nil {
+ return nil, err
+ }
+ id := binary.LittleEndian.Uint64(b[:])
+
+ glog.V(0).Info("dial new 3:", id)
+
+ dialConnsLock.Lock()
+ if dialConns == nil {
+ dialConns = NewConns()
+ }
+ if dialConns.sock == nil {
+ s, err := net.ListenUDP("udp", &net.UDPAddr{})
+ if err != nil {
+ dialConnsLock.Unlock()
+ return nil, err
+ }
+ dialConns.sock = s
+ dialConns.isDial = true
+ go dialConns.loop()
+ }
+ dialConnsLock.Unlock()
+
+ conn := NewConn(dialConns, id, dialConns.sock, addr, "reqer")
+ dialConns.input <- Input{
+ typ: ActAddConn,
+ param: conn,
+ }
+
+ // conn.s.sendListLock.Lock()
+ // conn.s.sendList.PushBack(byte(TypeSYN))
+ // conn.s.sendListLock.Unlock()
+
+ return conn, nil
+}
+
+func (c *Conns) Accept() (net.Conn, error) {
+ for {
+ if c.isClose {
+ return nil, errors.New("listener close")
+ }
+ conn := <-c.accept
+ if conn == nil {
+ return nil, errors.New("listener close")
+ }
+ return conn, nil
+ }
+}
+
+func (c *Conns) Close() error {
+ c.isClose = true
+ c.sock.Close()
+ c.input <- Input{
+ typ: ActEnd,
+ }
+ return nil
+}
+
+func (c *Conns) Addr() net.Addr {
+ return c.sock.LocalAddr()
+}
+
+const (
+ ActData = 1
+ ActTimer = 2
+ ActAddConn = 3
+ ActRmConn = 4
+ ActEnd = 5
+)
+
+type Input struct {
+ typ uint8
+ data []byte
+ param interface{}
+}
+
+func (c *Conns) loop() {
+ // 这里输入时间和数据
+ // 只起一个timer,给所有conn发
+ // 之前用setreaddeadline这个不太好,容易出现长时间没超时
+
+ runtime.LockOSThread()
+
+ glog.V(0).Info("loop: ", c.isDial)
+
+ go func() {
+ var buf = make([]byte, 2048)
+ for {
+ n, remote, err := c.sock.ReadFromUDP(buf)
+ if n <= 0 || err != nil {
+ c.Close()
+ return
+ }
+ b := make([]byte, n)
+ copy(b, buf[:n])
+ c.input <- Input{
+ typ: ActData,
+ data: b,
+ param: remote,
+ }
+ if c.isClose {
+ return
+ }
+ }
+ }()
+
+ var timerRunning bool
+ var releaseMemory uint32
+ var buf = make([]byte, mss*3)
+
+ for {
+ data := <-c.input
+ switch data.typ {
+ case ActData:
+ head, _ := structUnPack(data.data, "BBQ")
+ // fmt.Println(head[0], head[2])
+ var dataType = head[0]
+ if conn, ok := c.conns[head[2]]; ok {
+ // TODO: 给dial中的链接发送成功 -> 暂时不需要,现在dial不判断这些,默认成功
+ if dataType == uint64(TypeData) {
+ conn.r.recv(data.data)
+ } else if dataType == uint64(TypeAck) {
+ conn.s.recv(data.data)
+ }
+ } else {
+ if c.isDial == false && dataType == uint64(TypeData) && c.isClose == false { // 不需要TypeSYN
+ // glog.V(0).Info("create new:", head[2])
+ // 只有主动listen的,才有新链接,而dial自己就会创建新连接,不用创建
+ conn := NewConn(c, head[2], c.sock, data.param.(*net.UDPAddr), "rsper")
+ c.conns[conn.Id] = conn
+ conn.r.recv(data.data)
+ select {
+ case c.accept <- conn:
+ default:
+ }
+ if timerRunning == false {
+ timerRunning = true
+ go c.runTimer(c.timerRnd)
+ glog.V(0).Info("start timer, round:", c.timerRnd, c.isDial)
+ }
+ }
+ }
+ case ActTimer:
+ now := time.Now()
+ for _, conn := range c.conns {
+ conn.s.send(now, buf)
+ conn.r.sendAck(now, buf)
+ }
+ if len(c.conns) == 0 && timerRunning {
+ glog.V(0).Info("no conn, stop timer, round:", c.timerRnd, c.isDial)
+ c.timerRnd++
+ timerRunning = false
+ debug.FreeOSMemory()
+ }
+ releaseMemory++
+ if releaseMemory > 50*60 {
+ releaseMemory = 0
+ go func() {
+ debug.FreeOSMemory() // 修复bug: go在windows不回收内存
+ }()
+ }
+ case ActAddConn:
+ conn := data.param.(*Conn)
+ c.conns[conn.Id] = conn
+ if timerRunning == false {
+ timerRunning = true
+ go c.runTimer(c.timerRnd)
+ glog.V(0).Info("start timer, round:", c.timerRnd, c.isDial)
+ }
+ case ActRmConn:
+ conn := data.param.(*Conn)
+ // fmt.Println("rm conn:", conn.Id)
+ conn.isClose = true
+ conn.s.isClose = true
+ conn.r.isClose = true
+ if _, ok := c.conns[conn.Id]; ok {
+ glog.V(0).Info("rm conn ok:", conn.Id, c.isDial, len(c.conns))
+ close(conn.r.readable)
+ close(conn.s.writable) // 修复bug: 没有close
+ delete(c.conns, conn.Id)
+ }
+ case ActEnd:
+ c.timerRnd++
+ timerRunning = false
+ c.isClose = true
+ for _, conn := range c.conns {
+ conn.isClose = true
+ conn.s.isClose = true
+ conn.r.isClose = true
+ close(conn.r.readable)
+ close(conn.s.writable)
+ }
+ close(c.accept)
+ c.conns = make(map[uint64]*Conn)
+ return
+ }
+
+ }
+
+}
+
+func (c *Conns) runTimer(rnd uint32) {
+ // runtime.LockOSThread()
+ for {
+ time.Sleep(20 * time.Millisecond)
+ // C.usleep(20 * 1000)
+ if rnd != c.timerRnd {
+ glog.V(0).Info("timer stop, round:", rnd, c.isDial)
+ }
+ if c.isClose || rnd != c.timerRnd {
+ return
+ }
+ c.input <- Input{typ: ActTimer}
+ }
+}
diff --git a/weed/wdclient/volume_udp_client.go b/weed/wdclient/volume_udp_client.go
index d0365f110..e8a5fa34d 100644
--- a/weed/wdclient/volume_udp_client.go
+++ b/weed/wdclient/volume_udp_client.go
@@ -3,18 +3,22 @@ package wdclient
import (
"bufio"
"github.com/chrislusf/seaweedfs/weed/pb"
- "github.com/chrislusf/seaweedfs/weed/udptransfer"
"github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/chrislusf/seaweedfs/weed/wdclient/penet"
"io"
"net"
+ "time"
)
// VolumeUdpClient put/get/delete file chunks directly on volume servers without replication
type VolumeUdpClient struct {
+ Conn net.Conn
+ bufWriter *bufio.Writer
+ bufReader *bufio.Reader
}
type VolumeUdpConn struct {
- net.Conn
+ Conn net.Conn
bufWriter *bufio.Writer
bufReader *bufio.Reader
}
@@ -30,41 +34,29 @@ func (c *VolumeUdpClient) PutFileChunk(volumeServerAddress string, fileId string
return parseErr
}
- listener, err := udptransfer.NewEndpoint(&udptransfer.Params{
- LocalAddr: "",
- Bandwidth: 100,
- FastRetransmit: true,
- FlatTraffic: true,
- IsServ: false,
- })
- if err != nil {
- return err
+ if c.Conn == nil {
+ c.Conn, err = penet.DialTimeout("", udpAddress, 500*time.Millisecond)
+ if err != nil {
+ return err
+ }
+ c.bufWriter = bufio.NewWriter(c.Conn)
}
- defer listener.Close()
-
- conn, err := listener.Dial(udpAddress)
- if err != nil {
- return err
- }
- defer conn.Close()
-
- bufWriter := bufio.NewWriter(conn)
buf := []byte("+" + fileId + "\n")
- _, err = bufWriter.Write([]byte(buf))
+ _, err = c.bufWriter.Write([]byte(buf))
if err != nil {
return
}
util.Uint32toBytes(buf[0:4], fileSize)
- _, err = bufWriter.Write(buf[0:4])
+ _, err = c.bufWriter.Write(buf[0:4])
if err != nil {
return
}
- _, err = io.Copy(bufWriter, fileReader)
+ _, err = io.Copy(c.bufWriter, fileReader)
if err != nil {
return
}
- bufWriter.Flush()
+ c.bufWriter.Flush()
return nil
}