aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-03-13 14:14:30 -0800
committerChris Lu <chris.lu@gmail.com>2021-03-13 14:14:30 -0800
commit0059f4a201c33411f0911ffd08eae1aba511d10a (patch)
tree4b088c22bde95dcdb2c1019ee6b5957fa19308db
parent22ef015cfffb3217f25bf3901b6af53c07bb74da (diff)
downloadseaweedfs-0059f4a201c33411f0911ffd08eae1aba511d10a.tar.xz
seaweedfs-0059f4a201c33411f0911ffd08eae1aba511d10a.zip
trying https://github.com/spance/suft
seems something wrong with the timing
-rw-r--r--go.mod2
-rw-r--r--go.sum4
-rw-r--r--weed/command/volume.go37
-rw-r--r--weed/udptransfer/conn.go715
-rw-r--r--weed/udptransfer/conn_test.go86
-rw-r--r--weed/udptransfer/debug.go95
-rw-r--r--weed/udptransfer/endpoint.go339
-rw-r--r--weed/udptransfer/endpoint_test.go42
-rw-r--r--weed/udptransfer/linked.go333
-rw-r--r--weed/udptransfer/linked_test.go236
-rw-r--r--weed/udptransfer/packet.go142
-rw-r--r--weed/udptransfer/state.go516
-rw-r--r--weed/udptransfer/stopwatch.go36
-rw-r--r--weed/udptransfer/timer.go18
-rw-r--r--weed/udptransfer/timing_test.go32
-rw-r--r--weed/wdclient/volume_udp_client.go102
16 files changed, 2711 insertions, 24 deletions
diff --git a/go.mod b/go.mod
index c8a914ab0..e16b67e6e 100644
--- a/go.mod
+++ b/go.mod
@@ -15,6 +15,7 @@ require (
github.com/buraksezer/consistent v0.0.0-20191006190839-693edf70fd72
github.com/cespare/xxhash v1.1.0
github.com/chrislusf/raft v1.0.4
+ github.com/cloudflare/golibs v0.0.0-20201113145655-eb7a42c5e0be // indirect
github.com/coreos/go-semver v0.3.0 // indirect
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/disintegration/imaging v1.6.2
@@ -64,6 +65,7 @@ require (
github.com/seaweedfs/fuse v1.1.3
github.com/seaweedfs/goexif v1.0.2
github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e
+ github.com/spance/suft v0.0.0-20161129124228-358fdb24d82d // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/viper v1.4.0
diff --git a/go.sum b/go.sum
index ab1ef4739..a67a1bd39 100644
--- a/go.sum
+++ b/go.sum
@@ -158,6 +158,8 @@ github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5P
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/clbanning/x2j v0.0.0-20191024224557-825249438eec/go.mod h1:jMjuTZXRI4dUb/I5gc9Hdhagfvm9+RyrPryS/auMzxE=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
+github.com/cloudflare/golibs v0.0.0-20201113145655-eb7a42c5e0be h1:E+nLopD71RbvKLi2IGciQu/gtRmofNOJIBplVOaeKEw=
+github.com/cloudflare/golibs v0.0.0-20201113145655-eb7a42c5e0be/go.mod h1:HlgKKR8V5a1wroIDDIz3/A+T+9Janfq+7n1P5sEFdi0=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa h1:OaNxuTZr7kxeODyLWsRMC+OD03aFUH+mW6r2d+MWa5Y=
github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8=
@@ -710,6 +712,8 @@ github.com/smartystreets/gunit v1.3.4/go.mod h1:ZjM1ozSIMJlAz/ay4SG8PeKF00ckUp+z
github.com/soheilhy/cmux v0.1.4 h1:0HKaf1o97UwFjHH9o5XsHUOF+tqmdA7KEzXLpiyaw0E=
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
github.com/sony/gobreaker v0.4.1/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY=
+github.com/spance/suft v0.0.0-20161129124228-358fdb24d82d h1:izNZHQvaeiCisdxVrhrbQgigPIJt0RJPrpRNWMWdjaQ=
+github.com/spance/suft v0.0.0-20161129124228-358fdb24d82d/go.mod h1:S8IZv4Sq+amXiFLdengs5dVl9IdMjR/1yq35vjnYd54=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
diff --git a/weed/command/volume.go b/weed/command/volume.go
index 0f3dba361..1468aa2ee 100644
--- a/weed/command/volume.go
+++ b/weed/command/volume.go
@@ -3,6 +3,8 @@ package command
import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/storage/types"
+ "github.com/chrislusf/seaweedfs/weed/udptransfer"
+ "net"
"net/http"
httppprof "net/http/pprof"
"os"
@@ -29,7 +31,6 @@ import (
stats_collect "github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/util"
- "pack.ag/tftp"
)
var (
@@ -399,15 +400,37 @@ func (v VolumeServerOptions) startTcpService(volumeServer *weed_server.VolumeSer
func (v VolumeServerOptions) startUdpService(volumeServer *weed_server.VolumeServer) {
listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port+20001)
- tftpServer, err := tftp.NewServer(listeningAddress)
+
+ listener, err := udptransfer.NewEndpoint(&udptransfer.Params{
+ LocalAddr: listeningAddress,
+ Bandwidth: 100,
+ FastRetransmit: true,
+ FlatTraffic: true,
+ IsServ: true,
+ })
if err != nil {
glog.Fatalf("Volume server listen on %s:%v", listeningAddress, err)
}
- tftpServer.WriteHandler(volumeServer)
- tftpServer.ReadHandler(volumeServer)
+ defer listener.Close()
- glog.V(0).Infoln("Start Seaweed volume server", util.Version(), "UDP at", listeningAddress)
- if e:= tftpServer.ListenAndServe(); e != nil {
- glog.Fatalf("Volume server UDP on %s:%v", listeningAddress, e)
+ for {
+ conn, err := listener.Accept()
+ if err == nil {
+ glog.V(0).Infof("Client from %s", conn.RemoteAddr())
+ go volumeServer.HandleTcpConnection(conn)
+ } else if isTemporaryError(err) {
+ continue
+ } else {
+ break
+ }
}
}
+
+func isTemporaryError(err error) bool {
+ if err != nil {
+ if ne, y := err.(net.Error); y {
+ return ne.Temporary()
+ }
+ }
+ return false
+} \ No newline at end of file
diff --git a/weed/udptransfer/conn.go b/weed/udptransfer/conn.go
new file mode 100644
index 000000000..e2eca49da
--- /dev/null
+++ b/weed/udptransfer/conn.go
@@ -0,0 +1,715 @@
+package udptransfer
+
+import (
+ "encoding/binary"
+ "fmt"
+ "io"
+ "log"
+ "net"
+ "time"
+)
+
+const (
+ _MAX_RETRIES = 6
+ _MIN_RTT = 8
+ _MIN_RTO = 30
+ _MIN_ATO = 2
+ _MAX_ATO = 10
+ _MIN_SWND = 10
+ _MAX_SWND = 960
+)
+
+const (
+ _VACK_SCHED = iota + 1
+ _VACK_QUICK
+ _VACK_MUST
+ _VSWND_ACTIVE
+ _VRETR_IMMED
+)
+
+const (
+ _RETR_REST = -1
+ _CLOSE = 0xff
+)
+
+var debug int
+
+func nodeOf(pk *packet) *qNode {
+ return &qNode{packet: pk}
+}
+
+func (c *Conn) internalRecvLoop() {
+ defer func() {
+ // avoid send to closed channel while some replaying
+ // data packets were received in shutting down.
+ _ = recover()
+ }()
+ var buf, body []byte
+ for {
+ select {
+ case buf = <-c.evRecv:
+ if buf != nil {
+ body = buf[_TH_SIZE:]
+ } else { // shutdown
+ return
+ }
+ }
+ pk := new(packet)
+ // keep the original buffer, so we could recycle it in future
+ pk.buffer = buf
+ unmarshall(pk, body)
+ if pk.flag&_F_SACK != 0 {
+ c.processSAck(pk)
+ continue
+ }
+ if pk.flag&_F_ACK != 0 {
+ c.processAck(pk)
+ }
+ if pk.flag&_F_DATA != 0 {
+ c.insertData(pk)
+ } else if pk.flag&_F_FIN != 0 {
+ if pk.flag&_F_RESET != 0 {
+ go c.forceShutdownWithLock()
+ } else {
+ go c.closeR(pk)
+ }
+ }
+ }
+}
+
+func (c *Conn) internalSendLoop() {
+ var timer = time.NewTimer(time.Duration(c.rtt) * time.Millisecond)
+ for {
+ select {
+ case v := <-c.evSWnd:
+ switch v {
+ case _VRETR_IMMED:
+ c.outlock.Lock()
+ c.retransmit2()
+ c.outlock.Unlock()
+ case _VSWND_ACTIVE:
+ timer.Reset(time.Duration(c.rtt) * time.Millisecond)
+ case _CLOSE:
+ return
+ }
+ case <-timer.C: // timeout yet
+ var notifySender bool
+ c.outlock.Lock()
+ rest, _ := c.retransmit()
+ switch rest {
+ case _RETR_REST, 0: // nothing to send
+ if c.outQ.size() > 0 {
+ timer.Reset(time.Duration(c.rtt) * time.Millisecond)
+ } else {
+ timer.Stop()
+ // avoid sender blocking
+ notifySender = true
+ }
+ default: // recent rto point
+ timer.Reset(time.Duration(minI64(rest, c.rtt)) * time.Millisecond)
+ }
+ c.outlock.Unlock()
+ if notifySender {
+ select {
+ case c.evSend <- 1:
+ default:
+ }
+ }
+ }
+ }
+}
+
+func (c *Conn) internalAckLoop() {
+ // var ackTimer = time.NewTicker(time.Duration(c.ato))
+ var ackTimer = time.NewTimer(time.Duration(c.ato) * time.Millisecond)
+ var lastAckState byte
+ for {
+ var v byte
+ select {
+ case <-ackTimer.C:
+ // may cause sending duplicated ack if ato>rtt
+ v = _VACK_QUICK
+ case v = <-c.evAck:
+ ackTimer.Reset(time.Duration(c.ato) * time.Millisecond)
+ state := lastAckState
+ lastAckState = v
+ if state != v {
+ if v == _CLOSE {
+ return
+ }
+ v = _VACK_MUST
+ }
+ }
+ c.inlock.Lock()
+ if pkAck := c.makeAck(v); pkAck != nil {
+ c.internalWrite(nodeOf(pkAck))
+ }
+ c.inlock.Unlock()
+ }
+}
+
+func (c *Conn) retransmit() (rest int64, count int32) {
+ var now, rto = Now(), c.rto
+ var limit = c.cwnd
+ for item := c.outQ.head; item != nil && limit > 0; item = item.next {
+ if item.scnt != _SENT_OK { // ACKed has scnt==-1
+ diff := now - item.sent
+ if diff > rto { // already rto
+ c.internalWrite(item)
+ count++
+ } else {
+ // continue search next min rto duration
+ if rest > 0 {
+ rest = minI64(rest, rto-diff+1)
+ } else {
+ rest = rto - diff + 1
+ }
+ limit--
+ }
+ }
+ }
+ c.outDupCnt += int(count)
+ if count > 0 {
+ shrcond := (c.fastRetransmit && count > maxI32(c.cwnd>>5, 4)) || (!c.fastRetransmit && count > c.cwnd>>3)
+ if shrcond && now-c.lastShrink > c.rto {
+ log.Printf("shrink cwnd from=%d to=%d s/4=%d", c.cwnd, c.cwnd>>1, c.swnd>>2)
+ c.lastShrink = now
+ // shrink cwnd and ensure cwnd >= swnd/4
+ if c.cwnd > c.swnd>>1 {
+ c.cwnd >>= 1
+ }
+ }
+ }
+ if c.outQ.size() > 0 {
+ return
+ }
+ return _RETR_REST, 0
+}
+
+func (c *Conn) retransmit2() (count int32) {
+ var limit, now = minI32(c.outPending>>4, 8), Now()
+ var fRtt = c.rtt
+ if now-c.lastShrink > c.rto {
+ fRtt += maxI64(c.rtt>>4, 1)
+ } else {
+ fRtt += maxI64(c.rtt>>1, 2)
+ }
+ for item := c.outQ.head; item != nil && count < limit; item = item.next {
+ if item.scnt != _SENT_OK { // ACKed has scnt==-1
+ if item.miss >= 3 && now-item.sent >= fRtt {
+ item.miss = 0
+ c.internalWrite(item)
+ count++
+ }
+ }
+ }
+ c.fRCnt += int(count)
+ c.outDupCnt += int(count)
+ return
+}
+
+func (c *Conn) inputAndSend(pk *packet) error {
+ item := &qNode{packet: pk}
+ if c.mySeq&3 == 1 {
+ c.tSlotT0 = NowNS()
+ }
+ c.outlock.Lock()
+ // inflight packets exceeds cwnd
+ // inflight includes: 1, unacked; 2, missed
+ for c.outPending >= c.cwnd+c.missed {
+ c.outlock.Unlock()
+ if c.wtmo > 0 {
+ var tmo int64
+ tmo, c.wtmo = c.wtmo, 0
+ select {
+ case v := <-c.evSend:
+ if v == _CLOSE {
+ return io.EOF
+ }
+ case <-NewTimerChan(tmo):
+ return ErrIOTimeout
+ }
+ } else {
+ if v := <-c.evSend; v == _CLOSE {
+ return io.EOF
+ }
+ }
+ c.outlock.Lock()
+ }
+ c.outPending++
+ c.outPkCnt++
+ c.mySeq++
+ pk.seq = c.mySeq
+ c.outQ.appendTail(item)
+ c.internalWrite(item)
+ c.outlock.Unlock()
+ // active resending timer, must blocking
+ c.evSWnd <- _VSWND_ACTIVE
+ if c.mySeq&3 == 0 && c.flatTraffic {
+ // calculate time error bewteen tslot with actual usage.
+ // consider last sleep time error
+ t1 := NowNS()
+ terr := c.tSlot<<2 - c.lastSErr - (t1 - c.tSlotT0)
+ // rest terr/2 if current time usage less than tslot of 100us.
+ if terr > 1e5 { // 100us
+ time.Sleep(time.Duration(terr >> 1))
+ c.lastSErr = maxI64(NowNS()-t1-terr, 0)
+ } else {
+ c.lastSErr >>= 1
+ }
+ }
+ return nil
+}
+
+func (c *Conn) internalWrite(item *qNode) {
+ if item.scnt >= 20 {
+ // no exception of sending fin
+ if item.flag&_F_FIN != 0 {
+ c.fakeShutdown()
+ c.dest = nil
+ return
+ } else {
+ log.Println("Warn: too many retries", item)
+ if c.urgent > 0 { // abort
+ c.forceShutdown()
+ return
+ } else { // continue to retry 10
+ c.urgent++
+ item.scnt = 10
+ }
+ }
+ }
+ // update current sent time and prev sent time
+ item.sent, item.sent_1 = Now(), item.sent
+ item.scnt++
+ buf := item.marshall(c.connID)
+ if debug >= 3 {
+ var pkType = packetTypeNames[item.flag]
+ if item.flag&_F_SACK != 0 {
+ log.Printf("send %s trp=%d on=%d %x", pkType, item.seq, item.ack, buf[_AH_SIZE+4:])
+ } else {
+ log.Printf("send %s seq=%d ack=%d scnt=%d len=%d", pkType, item.seq, item.ack, item.scnt, len(buf)-_TH_SIZE)
+ }
+ }
+ c.sock.WriteToUDP(buf, c.dest)
+}
+
+func (c *Conn) logAck(ack uint32) {
+ c.lastAck = ack
+ c.lastAckTime = Now()
+}
+
+func (c *Conn) makeLastAck() (pk *packet) {
+ c.inlock.Lock()
+ defer c.inlock.Unlock()
+ if Now()-c.lastAckTime < c.rtt {
+ return nil
+ }
+ pk = &packet{
+ ack: maxU32(c.lastAck, c.inQ.maxCtnSeq),
+ flag: _F_ACK,
+ }
+ c.logAck(pk.ack)
+ return
+}
+
+func (c *Conn) makeAck(level byte) (pk *packet) {
+ now := Now()
+ if level < _VACK_MUST && now-c.lastAckTime < c.ato {
+ if level < _VACK_QUICK || now-c.lastAckTime < minI64(c.ato>>2, 1) {
+ return
+ }
+ }
+ // ready Q <-|
+ // |-> outQ start (or more right)
+ // |-> bitmap start
+ // [predecessor] [predecessor+1] [predecessor+2] .....
+ var fakeSAck bool
+ var predecessor = c.inQ.maxCtnSeq
+ bmap, tbl := c.inQ.makeHolesBitmap(predecessor)
+ if len(bmap) <= 0 { // fake sack
+ bmap = make([]uint64, 1)
+ bmap[0], tbl = 1, 1
+ fakeSAck = true
+ }
+ // head 4-byte: TBL:1 | SCNT:1 | DELAY:2
+ buf := make([]byte, len(bmap)*8+4)
+ pk = &packet{
+ ack: predecessor + 1,
+ flag: _F_SACK,
+ payload: buf,
+ }
+ if fakeSAck {
+ pk.ack--
+ }
+ buf[0] = byte(tbl)
+ // mark delayed time according to the time reference point
+ if trp := c.inQ.lastIns; trp != nil {
+ delayed := now - trp.sent
+ if delayed < c.rtt {
+ pk.seq = trp.seq
+ pk.flag |= _F_TIME
+ buf[1] = trp.scnt
+ if delayed <= 0 {
+ delayed = 1
+ }
+ binary.BigEndian.PutUint16(buf[2:], uint16(delayed))
+ }
+ }
+ buf1 := buf[4:]
+ for i, b := range bmap {
+ binary.BigEndian.PutUint64(buf1[i*8:], b)
+ }
+ c.logAck(predecessor)
+ return
+}
+
+func unmarshallSAck(data []byte) (bmap []uint64, tbl uint32, delayed uint16, scnt uint8) {
+ if len(data) > 0 {
+ bmap = make([]uint64, len(data)>>3)
+ } else {
+ return
+ }
+ tbl = uint32(data[0])
+ scnt = data[1]
+ delayed = binary.BigEndian.Uint16(data[2:])
+ data = data[4:]
+ for i := 0; i < len(bmap); i++ {
+ bmap[i] = binary.BigEndian.Uint64(data[i*8:])
+ }
+ return
+}
+
+func calSwnd(bandwidth, rtt int64) int32 {
+ w := int32(bandwidth * rtt / (8000 * _MSS))
+ if w <= _MAX_SWND {
+ if w >= _MIN_SWND {
+ return w
+ } else {
+ return _MIN_SWND
+ }
+ } else {
+ return _MAX_SWND
+ }
+}
+
+func (c *Conn) measure(seq uint32, delayed int64, scnt uint8) {
+ target := c.outQ.get(seq)
+ if target != nil {
+ var lastSent int64
+ switch target.scnt - scnt {
+ case 0:
+ // not sent again since this ack was sent out
+ lastSent = target.sent
+ case 1:
+ // sent again once since this ack was sent out
+ // then use prev sent time
+ lastSent = target.sent_1
+ default:
+ // can't measure here because the packet was sent too many times
+ return
+ }
+ // real-time rtt
+ rtt := Now() - lastSent - delayed
+ // reject these abnormal measures:
+ // 1. rtt too small -> rtt/8
+ // 2. backlogging too long
+ if rtt < maxI64(c.rtt>>3, 1) || delayed > c.rtt>>1 {
+ return
+ }
+ // srtt: update 1/8
+ err := rtt - (c.srtt >> 3)
+ c.srtt += err
+ c.rtt = c.srtt >> 3
+ if c.rtt < _MIN_RTT {
+ c.rtt = _MIN_RTT
+ }
+ // s-swnd: update 1/4
+ swnd := c.swnd<<3 - c.swnd + calSwnd(c.bandwidth, c.rtt)
+ c.swnd = swnd >> 3
+ c.tSlot = c.rtt * 1e6 / int64(c.swnd)
+ c.ato = c.rtt >> 4
+ if c.ato < _MIN_ATO {
+ c.ato = _MIN_ATO
+ } else if c.ato > _MAX_ATO {
+ c.ato = _MAX_ATO
+ }
+ if err < 0 {
+ err = -err
+ err -= c.mdev >> 2
+ if err > 0 {
+ err >>= 3
+ }
+ } else {
+ err -= c.mdev >> 2
+ }
+ // mdev: update 1/4
+ c.mdev += err
+ rto := c.rtt + maxI64(c.rtt<<1, c.mdev)
+ if rto >= c.rto {
+ c.rto = rto
+ } else {
+ c.rto = (c.rto + rto) >> 1
+ }
+ if c.rto < _MIN_RTO {
+ c.rto = _MIN_RTO
+ }
+ if debug >= 1 {
+ log.Printf("--- rtt=%d srtt=%d rto=%d swnd=%d", c.rtt, c.srtt, c.rto, c.swnd)
+ }
+ }
+}
+
+func (c *Conn) processSAck(pk *packet) {
+ c.outlock.Lock()
+ bmap, tbl, delayed, scnt := unmarshallSAck(pk.payload)
+ if bmap == nil { // bad packet
+ c.outlock.Unlock()
+ return
+ }
+ if pk.flag&_F_TIME != 0 {
+ c.measure(pk.seq, int64(delayed), scnt)
+ }
+ deleted, missed, continuous := c.outQ.deleteByBitmap(bmap, pk.ack, tbl)
+ if deleted > 0 {
+ c.ackHit(deleted, missed)
+ // lock is released
+ } else {
+ c.outlock.Unlock()
+ }
+ if c.fastRetransmit && !continuous {
+ // peer Q is uncontinuous, then trigger FR
+ if deleted == 0 {
+ c.evSWnd <- _VRETR_IMMED
+ } else {
+ select {
+ case c.evSWnd <- _VRETR_IMMED:
+ default:
+ }
+ }
+ }
+ if debug >= 2 {
+ log.Printf("SACK qhead=%d deleted=%d outPending=%d on=%d %016x",
+ c.outQ.distanceOfHead(0), deleted, c.outPending, pk.ack, bmap)
+ }
+}
+
+func (c *Conn) processAck(pk *packet) {
+ c.outlock.Lock()
+ if end := c.outQ.get(pk.ack); end != nil { // ack hit
+ _, deleted := c.outQ.deleteBefore(end)
+ c.ackHit(deleted, 0) // lock is released
+ if debug >= 2 {
+ log.Printf("ACK hit on=%d", pk.ack)
+ }
+ // special case: ack the FIN
+ if pk.seq == _FIN_ACK_SEQ {
+ select {
+ case c.evClose <- _S_FIN0:
+ default:
+ }
+ }
+ } else { // duplicated ack
+ if debug >= 2 {
+ log.Printf("ACK miss on=%d", pk.ack)
+ }
+ if pk.flag&_F_SYN != 0 { // No.3 Ack lost
+ if pkAck := c.makeLastAck(); pkAck != nil {
+ c.internalWrite(nodeOf(pkAck))
+ }
+ }
+ c.outlock.Unlock()
+ }
+}
+
+func (c *Conn) ackHit(deleted, missed int32) {
+ // must in outlock
+ c.outPending -= deleted
+ now := Now()
+ if c.cwnd < c.swnd && now-c.lastShrink > c.rto {
+ if c.cwnd < c.swnd>>1 {
+ c.cwnd <<= 1
+ } else {
+ c.cwnd += deleted << 1
+ }
+ }
+ if c.cwnd > c.swnd {
+ c.cwnd = c.swnd
+ }
+ if now-c.lastRstMis > c.ato {
+ c.lastRstMis = now
+ c.missed = missed
+ } else {
+ c.missed = c.missed>>1 + missed
+ }
+ if qswnd := c.swnd >> 4; c.missed > qswnd {
+ c.missed = qswnd
+ }
+ c.outlock.Unlock()
+ select {
+ case c.evSend <- 1:
+ default:
+ }
+}
+
+func (c *Conn) insertData(pk *packet) {
+ c.inlock.Lock()
+ defer c.inlock.Unlock()
+ exists := c.inQ.contains(pk.seq)
+ // duplicated with already queued or history
+ // means: last ACK were lost
+ if exists || pk.seq <= c.inQ.maxCtnSeq {
+ // then send ACK for dups
+ select {
+ case c.evAck <- _VACK_MUST:
+ default:
+ }
+ if debug >= 2 {
+ dumpQ(fmt.Sprint("duplicated ", pk.seq), c.inQ)
+ }
+ c.inDupCnt++
+ return
+ }
+ // record current time in sent and regard as received time
+ item := &qNode{packet: pk, sent: Now()}
+ dis := c.inQ.searchInsert(item, c.lastReadSeq)
+ if debug >= 3 {
+ log.Printf("\t\t\trecv DATA seq=%d dis=%d maxCtn=%d lastReadSeq=%d", item.seq, dis, c.inQ.maxCtnSeq, c.lastReadSeq)
+ }
+
+ var ackState byte = _VACK_MUST
+ var available bool
+ switch dis {
+ case 0: // impossible
+ return
+ case 1:
+ if c.inQDirty {
+ available = c.inQ.updateContinuous(item)
+ if c.inQ.isWholeContinuous() { // whole Q is ordered
+ c.inQDirty = false
+ } else { //those holes still exists.
+ ackState = _VACK_QUICK
+ }
+ } else {
+ // here is an ideal situation
+ c.inQ.maxCtnSeq = pk.seq
+ available = true
+ ackState = _VACK_SCHED
+ }
+
+ default: // there is an unordered packet, hole occurred here.
+ if !c.inQDirty {
+ c.inQDirty = true
+ }
+ }
+
+ // write valid received count
+ c.inPkCnt++
+ c.inQ.lastIns = item
+ // try notify ack
+ select {
+ case c.evAck <- ackState:
+ default:
+ }
+ if available { // try notify reader
+ select {
+ case c.evRead <- 1:
+ default:
+ }
+ }
+}
+
+func (c *Conn) readInQ() bool {
+ c.inlock.Lock()
+ defer c.inlock.Unlock()
+ // read already <-|-> expected Q
+ // [lastReadSeq] | [lastReadSeq+1] [lastReadSeq+2] ......
+ if c.inQ.isEqualsHead(c.lastReadSeq+1) && c.lastReadSeq < c.inQ.maxCtnSeq {
+ c.lastReadSeq = c.inQ.maxCtnSeq
+ availabled := c.inQ.get(c.inQ.maxCtnSeq)
+ availabled, _ = c.inQ.deleteBefore(availabled)
+ for i := availabled; i != nil; i = i.next {
+ c.inQReady = append(c.inQReady, i.payload...)
+ // data was copied, then could recycle memory
+ bpool.Put(i.buffer)
+ i.payload = nil
+ i.buffer = nil
+ }
+ return true
+ }
+ return false
+}
+
+// should not call this function concurrently.
+func (c *Conn) Read(buf []byte) (nr int, err error) {
+ for {
+ if len(c.inQReady) > 0 {
+ n := copy(buf, c.inQReady)
+ c.inQReady = c.inQReady[n:]
+ return n, nil
+ }
+ if !c.readInQ() {
+ if c.rtmo > 0 {
+ var tmo int64
+ tmo, c.rtmo = c.rtmo, 0
+ select {
+ case _, y := <-c.evRead:
+ if !y && len(c.inQReady) == 0 {
+ return 0, io.EOF
+ }
+ case <-NewTimerChan(tmo):
+ return 0, ErrIOTimeout
+ }
+ } else {
+ // only when evRead is closed and inQReady is empty
+ // then could reply eof
+ if _, y := <-c.evRead; !y && len(c.inQReady) == 0 {
+ return 0, io.EOF
+ }
+ }
+ }
+ }
+}
+
+// should not call this function concurrently.
+func (c *Conn) Write(data []byte) (nr int, err error) {
+ for len(data) > 0 && err == nil {
+ //buf := make([]byte, _MSS+_AH_SIZE)
+ buf := bpool.Get(c.mss + _AH_SIZE)
+ body := buf[_TH_SIZE+_CH_SIZE:]
+ n := copy(body, data)
+ nr += n
+ data = data[n:]
+ pk := &packet{flag: _F_DATA, payload: body[:n], buffer: buf[:_AH_SIZE+n]}
+ err = c.inputAndSend(pk)
+ }
+ return
+}
+
+func (c *Conn) LocalAddr() net.Addr {
+ return c.sock.LocalAddr()
+}
+
+func (c *Conn) RemoteAddr() net.Addr {
+ return c.dest
+}
+
+func (c *Conn) SetDeadline(t time.Time) error {
+ c.SetReadDeadline(t)
+ c.SetWriteDeadline(t)
+ return nil
+}
+
+func (c *Conn) SetReadDeadline(t time.Time) error {
+ if d := t.UnixNano()/Millisecond - Now(); d > 0 {
+ c.rtmo = d
+ }
+ return nil
+}
+
+func (c *Conn) SetWriteDeadline(t time.Time) error {
+ if d := t.UnixNano()/Millisecond - Now(); d > 0 {
+ c.wtmo = d
+ }
+ return nil
+}
diff --git a/weed/udptransfer/conn_test.go b/weed/udptransfer/conn_test.go
new file mode 100644
index 000000000..080d778c0
--- /dev/null
+++ b/weed/udptransfer/conn_test.go
@@ -0,0 +1,86 @@
+package udptransfer
+
+import (
+ "math/rand"
+ "sort"
+ "testing"
+)
+
+var conn *Conn
+
+func init() {
+ conn = &Conn{
+ outQ: newLinkedMap(_QModeOut),
+ inQ: newLinkedMap(_QModeIn),
+ }
+}
+
+func assert(cond bool, t testing.TB, format string, args ...interface{}) {
+ if !cond {
+ t.Errorf(format, args...)
+ panic("last error")
+ }
+}
+
+func Test_ordered_insert(t *testing.T) {
+ data := []byte{1}
+ var pk *packet
+ for i := int32(1); i < 33; i++ {
+ pk = &packet{
+ seq: uint32(i),
+ payload: data,
+ }
+ conn.insertData(pk)
+ assert(conn.inQ.size() == i, t, "len inQ=%d", conn.inQ.size())
+ assert(conn.inQ.maxCtnSeq == pk.seq, t, "lastCtnIn")
+ assert(!conn.inQDirty, t, "dirty")
+ }
+}
+
+func Test_unordered_insert(t *testing.T) {
+ conn.inQ.reset()
+
+ data := []byte{1}
+ var pk *packet
+ var seqs = make([]int, 0xfff)
+ // unordered insert, and assert size
+ for i := 1; i < len(seqs); i++ {
+ var seq uint32
+ for conn.inQ.contains(seq) || seq == 0 {
+ seq = uint32(rand.Int31n(0xFFffff))
+ }
+ seqs[i] = int(seq)
+ pk = &packet{
+ seq: seq,
+ payload: data,
+ }
+ conn.insertData(pk)
+ assert(conn.inQ.size() == int32(i), t, "i=%d inQ.len=%d", i, conn.inQ.size())
+ }
+ // assert lastCtnSeq
+ sort.Ints(seqs)
+ var zero = 0
+ var last *int
+ for i := 0; i < len(seqs); i++ {
+ if i == 0 && seqs[0] != 0 {
+ last = &zero
+ break
+ }
+ if last != nil && seqs[i]-*last > 1 {
+ if i == 1 {
+ last = &zero
+ }
+ break
+ }
+ last = &seqs[i]
+ }
+ if *last != int(conn.inQ.maxCtnSeq) {
+ for i, j := range seqs {
+ if i < 10 {
+ t.Logf("seq %d", j)
+ }
+ }
+ }
+ assert(*last == int(conn.inQ.maxCtnSeq), t, "lastCtnSeq=%d but expected=%d", conn.inQ.maxCtnSeq, *last)
+ t.Logf("lastCtnSeq=%d dirty=%v", conn.inQ.maxCtnSeq, conn.inQDirty)
+}
diff --git a/weed/udptransfer/debug.go b/weed/udptransfer/debug.go
new file mode 100644
index 000000000..dc8dbb501
--- /dev/null
+++ b/weed/udptransfer/debug.go
@@ -0,0 +1,95 @@
+package udptransfer
+
+import (
+ "encoding/hex"
+ "fmt"
+ "log"
+ "os"
+ "os/signal"
+ "runtime"
+ "runtime/pprof"
+ "sync/atomic"
+ "syscall"
+)
+
+var enable_pprof bool
+var enable_stacktrace bool
+var debug_inited int32
+
+func set_debug_params(p *Params) {
+ if atomic.CompareAndSwapInt32(&debug_inited, 0, 1) {
+ debug = p.Debug
+ enable_pprof = p.EnablePprof
+ enable_stacktrace = p.Stacktrace
+ if enable_pprof {
+ f, err := os.Create("udptransfer.pprof")
+ if err != nil {
+ log.Fatalln(err)
+ }
+ pprof.StartCPUProfile(f)
+ }
+ }
+}
+
+func (c *Conn) PrintState() {
+ log.Printf("inQ=%d inQReady=%d outQ=%d", c.inQ.size(), len(c.inQReady), c.outQ.size())
+ log.Printf("inMaxCtnSeq=%d lastAck=%d lastReadSeq=%d", c.inQ.maxCtnSeq, c.lastAck, c.lastReadSeq)
+ if c.inPkCnt > 0 {
+ log.Printf("Rx pcnt=%d dups=%d %%d=%f%%", c.inPkCnt, c.inDupCnt, 100*float32(c.inDupCnt)/float32(c.inPkCnt))
+ }
+ if c.outPkCnt > 0 {
+ log.Printf("Tx pcnt=%d dups=%d %%d=%f%%", c.outPkCnt, c.outDupCnt, 100*float32(c.outDupCnt)/float32(c.outPkCnt))
+ }
+ log.Printf("current-rtt=%d FastRetransmit=%d", c.rtt, c.fRCnt)
+ if enable_stacktrace {
+ var buf = make([]byte, 6400)
+ for i := 0; i < 3; i++ {
+ n := runtime.Stack(buf, true)
+ if n >= len(buf) {
+ buf = make([]byte, len(buf)<<1)
+ } else {
+ buf = buf[:n]
+ break
+ }
+ }
+ fmt.Println(string(buf))
+ }
+ if enable_pprof {
+ pprof.StopCPUProfile()
+ }
+}
+
+func (c *Conn) internal_state() {
+ ev := make(chan os.Signal, 10)
+ signal.Notify(ev, syscall.Signal(12), syscall.SIGINT)
+ for v := range ev {
+ c.PrintState()
+ if v == syscall.SIGINT {
+ os.Exit(1)
+ }
+ }
+}
+
+func printBits(b uint64, j, s, d uint32) {
+ fmt.Printf("bits=%064b j=%d seq=%d dis=%d\n", b, j, s, d)
+}
+
+func dumpb(label string, buf []byte) {
+ log.Println(label, "\n", hex.Dump(buf))
+}
+
+func dumpQ(s string, q *linkedMap) {
+ var seqs = make([]uint32, 0, 20)
+ n := q.head
+ for i, m := int32(0), q.size(); i < m && n != nil; i++ {
+ seqs = append(seqs, n.seq)
+ n = n.next
+ if len(seqs) == 20 {
+ log.Printf("%s: Q=%d", s, seqs)
+ seqs = seqs[:0]
+ }
+ }
+ if len(seqs) > 0 {
+ log.Printf("%s: Q=%d", s, seqs)
+ }
+}
diff --git a/weed/udptransfer/endpoint.go b/weed/udptransfer/endpoint.go
new file mode 100644
index 000000000..d19d1a4f5
--- /dev/null
+++ b/weed/udptransfer/endpoint.go
@@ -0,0 +1,339 @@
+package udptransfer
+
+import (
+ "encoding/binary"
+ "fmt"
+ "io"
+ "log"
+ "math/rand"
+ "net"
+ "sort"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/cloudflare/golibs/bytepool"
+)
+
+const (
+ _SO_BUF_SIZE = 8 << 20
+)
+
+var (
+ bpool bytepool.BytePool
+)
+
+type Params struct {
+ LocalAddr string
+ Bandwidth int64
+ Mtu int
+ IsServ bool
+ FastRetransmit bool
+ FlatTraffic bool
+ EnablePprof bool
+ Stacktrace bool
+ Debug int
+}
+
+type connID struct {
+ lid uint32
+ rid uint32
+}
+
+type Endpoint struct {
+ udpconn *net.UDPConn
+ state int32
+ idSeq uint32
+ isServ bool
+ listenChan chan *Conn
+ lRegistry map[uint32]*Conn
+ rRegistry map[string][]uint32
+ mlock sync.RWMutex
+ timeout *time.Timer
+ params Params
+}
+
+func (c *connID) setRid(b []byte) {
+ c.rid = binary.BigEndian.Uint32(b[_MAGIC_SIZE+6:])
+}
+
+func init() {
+ bpool.Init(0, 2000)
+ rand.Seed(NowNS())
+}
+
+func NewEndpoint(p *Params) (*Endpoint, error) {
+ set_debug_params(p)
+ if p.Bandwidth <= 0 || p.Bandwidth > 100 {
+ return nil, fmt.Errorf("bw->(0,100]")
+ }
+ conn, err := net.ListenPacket("udp", p.LocalAddr)
+ if err != nil {
+ return nil, err
+ }
+ e := &Endpoint{
+ udpconn: conn.(*net.UDPConn),
+ idSeq: 1,
+ isServ: p.IsServ,
+ listenChan: make(chan *Conn, 1),
+ lRegistry: make(map[uint32]*Conn),
+ rRegistry: make(map[string][]uint32),
+ timeout: time.NewTimer(0),
+ params: *p,
+ }
+ if e.isServ {
+ e.state = _S_EST0
+ } else { // client
+ e.state = _S_EST1
+ e.idSeq = uint32(rand.Int31())
+ }
+ e.params.Bandwidth = p.Bandwidth << 20 // mbps to bps
+ e.udpconn.SetReadBuffer(_SO_BUF_SIZE)
+ go e.internal_listen()
+ return e, nil
+}
+
+func (e *Endpoint) internal_listen() {
+ const rtmo = time.Duration(30*time.Second)
+ var id connID
+ for {
+ //var buf = make([]byte, 1600)
+ var buf = bpool.Get(1600)
+ e.udpconn.SetReadDeadline(time.Now().Add(rtmo))
+ n, addr, err := e.udpconn.ReadFromUDP(buf)
+ if err == nil && n >= _AH_SIZE {
+ buf = buf[:n]
+ e.getConnID(&id, buf)
+
+ switch id.lid {
+ case 0: // new connection
+ if e.isServ {
+ go e.acceptNewConn(id, addr, buf)
+ } else {
+ dumpb("drop", buf)
+ }
+
+ case _INVALID_SEQ:
+ dumpb("drop invalid", buf)
+
+ default: // old connection
+ e.mlock.RLock()
+ conn := e.lRegistry[id.lid]
+ e.mlock.RUnlock()
+ if conn != nil {
+ e.dispatch(conn, buf)
+ } else {
+ e.resetPeer(addr, id)
+ dumpb("drop null", buf)
+ }
+ }
+
+ } else if err != nil {
+ // idle process
+ if nerr, y := err.(net.Error); y && nerr.Timeout() {
+ e.idleProcess()
+ continue
+ }
+ // other errors
+ if atomic.LoadInt32(&e.state) == _S_FIN {
+ return
+ } else {
+ log.Println("Error: read sock", err)
+ }
+ }
+ }
+}
+
+func (e *Endpoint) idleProcess() {
+ // recycle/shrink memory
+ bpool.Drain()
+ e.mlock.Lock()
+ defer e.mlock.Unlock()
+ // reset urgent
+ for _, c := range e.lRegistry {
+ c.outlock.Lock()
+ if c.outQ.size() == 0 && c.urgent != 0 {
+ c.urgent = 0
+ }
+ c.outlock.Unlock()
+ }
+}
+
+func (e *Endpoint) Dial(addr string) (*Conn, error) {
+ rAddr, err := net.ResolveUDPAddr("udp", addr)
+ if err != nil {
+ return nil, err
+ }
+ e.mlock.Lock()
+ e.idSeq++
+ id := connID{e.idSeq, 0}
+ conn := NewConn(e, rAddr, id)
+ e.lRegistry[id.lid] = conn
+ e.mlock.Unlock()
+ if atomic.LoadInt32(&e.state) != _S_FIN {
+ err = conn.initConnection(nil)
+ return conn, err
+ }
+ return nil, io.EOF
+}
+
+func (e *Endpoint) acceptNewConn(id connID, addr *net.UDPAddr, buf []byte) {
+ rKey := addr.String()
+ e.mlock.Lock()
+ // map: remoteAddr => remoteConnID
+ // filter duplicated syn packets
+ if newArr := insertRid(e.rRegistry[rKey], id.rid); newArr != nil {
+ e.rRegistry[rKey] = newArr
+ } else {
+ e.mlock.Unlock()
+ log.Println("Warn: duplicated connection", addr)
+ return
+ }
+ e.idSeq++
+ id.lid = e.idSeq
+ conn := NewConn(e, addr, id)
+ e.lRegistry[id.lid] = conn
+ e.mlock.Unlock()
+ err := conn.initConnection(buf)
+ if err == nil {
+ select {
+ case e.listenChan <- conn:
+ case <-time.After(_10ms):
+ log.Println("Warn: no listener")
+ }
+ } else {
+ e.removeConn(id, addr)
+ log.Println("Error: init_connection", addr, err)
+ }
+}
+
+func (e *Endpoint) removeConn(id connID, addr *net.UDPAddr) {
+ e.mlock.Lock()
+ delete(e.lRegistry, id.lid)
+ rKey := addr.String()
+ if newArr := deleteRid(e.rRegistry[rKey], id.rid); newArr != nil {
+ if len(newArr) > 0 {
+ e.rRegistry[rKey] = newArr
+ } else {
+ delete(e.rRegistry, rKey)
+ }
+ }
+ e.mlock.Unlock()
+}
+
+// net.Listener
+func (e *Endpoint) Close() error {
+ state := atomic.LoadInt32(&e.state)
+ if state > 0 && atomic.CompareAndSwapInt32(&e.state, state, _S_FIN) {
+ err := e.udpconn.Close()
+ e.lRegistry = nil
+ e.rRegistry = nil
+ select { // release listeners
+ case e.listenChan <- nil:
+ default:
+ }
+ return err
+ }
+ return nil
+}
+
+// net.Listener
+func (e *Endpoint) Addr() net.Addr {
+ return e.udpconn.LocalAddr()
+}
+
+// net.Listener
+func (e *Endpoint) Accept() (net.Conn, error) {
+ if atomic.LoadInt32(&e.state) == _S_EST0 {
+ return <-e.listenChan, nil
+ } else {
+ return nil, io.EOF
+ }
+}
+
+func (e *Endpoint) Listen() *Conn {
+ if atomic.LoadInt32(&e.state) == _S_EST0 {
+ return <-e.listenChan
+ } else {
+ return nil
+ }
+}
+
+// tmo in MS
+func (e *Endpoint) ListenTimeout(tmo int64) *Conn {
+ if tmo <= 0 {
+ return e.Listen()
+ }
+ if atomic.LoadInt32(&e.state) == _S_EST0 {
+ select {
+ case c := <-e.listenChan:
+ return c
+ case <-NewTimerChan(tmo):
+ }
+ }
+ return nil
+}
+
+func (e *Endpoint) getConnID(idPtr *connID, buf []byte) {
+ // TODO determine magic header
+ magicAndLen := binary.BigEndian.Uint64(buf)
+ if int(magicAndLen&0xFFff) == len(buf) {
+ id := binary.BigEndian.Uint64(buf[_MAGIC_SIZE+2:])
+ idPtr.lid = uint32(id >> 32)
+ idPtr.rid = uint32(id)
+ } else {
+ idPtr.lid = _INVALID_SEQ
+ }
+}
+
+func (e *Endpoint) dispatch(c *Conn, buf []byte) {
+ e.timeout.Reset(30*time.Millisecond)
+ select {
+ case c.evRecv <- buf:
+ case <-e.timeout.C:
+ log.Println("Warn: dispatch packet failed")
+ }
+}
+
+func (e *Endpoint) resetPeer(addr *net.UDPAddr, id connID) {
+ pk := &packet{flag: _F_FIN | _F_RESET}
+ buf := nodeOf(pk).marshall(id)
+ e.udpconn.WriteToUDP(buf, addr)
+}
+
+type u32Slice []uint32
+
+func (p u32Slice) Len() int { return len(p) }
+func (p u32Slice) Less(i, j int) bool { return p[i] < p[j] }
+func (p u32Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
+
+// if the rid is not existed in array then insert it return new array
+func insertRid(array []uint32, rid uint32) []uint32 {
+ if len(array) > 0 {
+ pos := sort.Search(len(array), func(n int) bool {
+ return array[n] >= rid
+ })
+ if pos < len(array) && array[pos] == rid {
+ return nil
+ }
+ }
+ array = append(array, rid)
+ sort.Sort(u32Slice(array))
+ return array
+}
+
+// if rid was existed in array then delete it return new array
+func deleteRid(array []uint32, rid uint32) []uint32 {
+ if len(array) > 0 {
+ pos := sort.Search(len(array), func(n int) bool {
+ return array[n] >= rid
+ })
+ if pos < len(array) && array[pos] == rid {
+ newArray := make([]uint32, len(array)-1)
+ n := copy(newArray, array[:pos])
+ copy(newArray[n:], array[pos+1:])
+ return newArray
+ }
+ }
+ return nil
+}
diff --git a/weed/udptransfer/endpoint_test.go b/weed/udptransfer/endpoint_test.go
new file mode 100644
index 000000000..01be073bc
--- /dev/null
+++ b/weed/udptransfer/endpoint_test.go
@@ -0,0 +1,42 @@
+package udptransfer
+
+import (
+ "fmt"
+ "math/rand"
+ "sort"
+ "testing"
+)
+
+func Test_insert_delete_rid(t *testing.T) {
+ var a []uint32
+ var b = make([]uint32, 0, 1e3)
+ var uniq = make(map[uint32]int)
+ // insert into a with random
+ for i := 0; i < cap(b); i++ {
+ n := uint32(rand.Int31())
+ if _, y := uniq[n]; !y {
+ b = append(b, n)
+ uniq[n] = 1
+ }
+ dups := 1
+ if i&0xf == 0xf {
+ dups = 3
+ }
+ for j := 0; j < dups; j++ {
+ if aa := insertRid(a, n); aa != nil {
+ a = aa
+ }
+ }
+ }
+ sort.Sort(u32Slice(b))
+ bStr := fmt.Sprintf("%d", b)
+ aStr := fmt.Sprintf("%d", a)
+ assert(aStr == bStr, t, "a!=b")
+
+ for i := 0; i < len(b); i++ {
+ if aa := deleteRid(a, b[i]); aa != nil {
+ a = aa
+ }
+ }
+ assert(len(a) == 0, t, "a!=0")
+}
diff --git a/weed/udptransfer/linked.go b/weed/udptransfer/linked.go
new file mode 100644
index 000000000..061cfacd2
--- /dev/null
+++ b/weed/udptransfer/linked.go
@@ -0,0 +1,333 @@
+package udptransfer
+
+type qNode struct {
+ *packet
+ prev *qNode
+ next *qNode
+ sent int64 // last sent time
+ sent_1 int64 // prev sent time
+ miss int // sack miss count
+}
+
+type linkedMap struct {
+ head *qNode
+ tail *qNode
+ qmap map[uint32]*qNode
+ lastIns *qNode
+ maxCtnSeq uint32
+ mode int
+}
+
+const (
+ _QModeIn = 1
+ _QModeOut = 2
+)
+
+func newLinkedMap(qmode int) *linkedMap {
+ return &linkedMap{
+ qmap: make(map[uint32]*qNode),
+ mode: qmode,
+ }
+}
+
+func (l *linkedMap) get(seq uint32) (i *qNode) {
+ i = l.qmap[seq]
+ return
+}
+
+func (l *linkedMap) contains(seq uint32) bool {
+ _, y := l.qmap[seq]
+ return y
+}
+
+func (l *linkedMap) size() int32 {
+ return int32(len(l.qmap))
+}
+
+func (l *linkedMap) reset() {
+ l.head = nil
+ l.tail = nil
+ l.lastIns = nil
+ l.maxCtnSeq = 0
+ l.qmap = make(map[uint32]*qNode)
+}
+
+func (l *linkedMap) isEqualsHead(seq uint32) bool {
+ return l.head != nil && seq == l.head.seq
+}
+
+func (l *linkedMap) distanceOfHead(seq uint32) int32 {
+ if l.head != nil {
+ return int32(seq - l.head.seq)
+ } else {
+ return -1
+ }
+}
+
+func (l *linkedMap) appendTail(one *qNode) {
+ if l.tail != nil {
+ l.tail.next = one
+ one.prev = l.tail
+ l.tail = one
+ } else {
+ l.head = one
+ l.tail = one
+ }
+ l.qmap[one.seq] = one
+}
+
+// xxx - n - yyy
+// xxx - yyy
+func (l *linkedMap) deleteAt(n *qNode) {
+ x, y := n.prev, n.next
+ if x != nil {
+ x.next = y
+ } else {
+ l.head = y
+ }
+ if y != nil {
+ y.prev = x
+ } else {
+ l.tail = x
+ }
+ n.prev, n.next = nil, nil
+ delete(l.qmap, n.seq)
+}
+
+// delete with n <- ...n |
+func (l *linkedMap) deleteBefore(n *qNode) (left *qNode, deleted int32) {
+ for i := n; i != nil; i = i.prev {
+ delete(l.qmap, i.seq)
+ if i.scnt != _SENT_OK {
+ deleted++
+ // only outQ could delete at here
+ if l.mode == _QModeOut {
+ bpool.Put(i.buffer)
+ i.buffer = nil
+ }
+ }
+ }
+ left = l.head
+ l.head = n.next
+ n.next = nil
+ if l.head != nil {
+ l.head.prev = nil
+ } else { // n.next is the tail and is nil
+ l.tail = nil
+ }
+ return
+}
+
+// xxx - ref
+// xxx - one - ref
+func (l *linkedMap) insertBefore(ref, one *qNode) {
+ x := ref.prev
+ if x != nil {
+ x.next = one
+ one.prev = x
+ } else {
+ l.head = one
+ }
+ ref.prev = one
+ one.next = ref
+ l.qmap[one.seq] = one
+}
+
+// ref - zzz
+// ref - one - zzz
+func (l *linkedMap) insertAfter(ref, one *qNode) {
+ z := ref.next
+ if z == nil { // append
+ ref.next = one
+ l.tail = one
+ } else { // insert mid
+ z.prev = one
+ ref.next = one
+ }
+ one.prev = ref
+ one.next = z
+ l.qmap[one.seq] = one
+}
+
+// baseHead: the left outside boundary
+// if inserted, return the distance between newNode with baseHead
+func (l *linkedMap) searchInsert(one *qNode, baseHead uint32) (dis int64) {
+ for i := l.tail; i != nil; i = i.prev {
+ dis = int64(one.seq) - int64(i.seq)
+ if dis > 0 {
+ l.insertAfter(i, one)
+ return
+ } else if dis == 0 {
+ // duplicated
+ return
+ }
+ }
+ if one.seq <= baseHead {
+ return 0
+ }
+ if l.head != nil {
+ l.insertBefore(l.head, one)
+ } else {
+ l.head = one
+ l.tail = one
+ l.qmap[one.seq] = one
+ }
+ dis = int64(one.seq) - int64(baseHead)
+ return
+}
+
+func (l *linkedMap) updateContinuous(i *qNode) bool {
+ var lastCtnSeq = l.maxCtnSeq
+ for ; i != nil; i = i.next {
+ if i.seq-lastCtnSeq == 1 {
+ lastCtnSeq = i.seq
+ } else {
+ break
+ }
+ }
+ if lastCtnSeq != l.maxCtnSeq {
+ l.maxCtnSeq = lastCtnSeq
+ return true
+ }
+ return false
+}
+
+func (l *linkedMap) isWholeContinuous() bool {
+ return l.tail != nil && l.maxCtnSeq == l.tail.seq
+}
+
+/*
+func (l *linkedMap) searchMaxContinued(baseHead uint32) (*qNode, bool) {
+ var last *qNode
+ for i := l.head; i != nil; i = i.next {
+ if last != nil {
+ if i.seq-last.seq > 1 {
+ return last, true
+ }
+ } else {
+ if i.seq != baseHead {
+ return nil, false
+ }
+ }
+ last = i
+ }
+ if last != nil {
+ return last, true
+ } else {
+ return nil, false
+ }
+}
+*/
+
+func (q *qNode) forward(n int) *qNode {
+ for ; n > 0 && q != nil; n-- {
+ q = q.next
+ }
+ return q
+}
+
+// prev of bitmap start point
+func (l *linkedMap) makeHolesBitmap(prev uint32) ([]uint64, uint32) {
+ var start *qNode
+ var bits uint64
+ var j uint32
+ for i := l.head; i != nil; i = i.next {
+ if i.seq >= prev+1 {
+ start = i
+ break
+ }
+ }
+ var bitmap []uint64
+ // search start which is the recent successor of [prev]
+Iterator:
+ for i := start; i != nil; i = i.next {
+ dis := i.seq - prev
+ prev = i.seq
+ j += dis // j is next bit index
+ for j >= 65 {
+ if len(bitmap) >= 20 { // bitmap too long
+ break Iterator
+ }
+ bitmap = append(bitmap, bits)
+ bits = 0
+ j -= 64
+ }
+ bits |= 1 << (j - 1)
+ }
+ if j > 0 {
+ // j -> (0, 64]
+ bitmap = append(bitmap, bits)
+ }
+ return bitmap, j
+}
+
+// from= the bitmap start point
+func (l *linkedMap) deleteByBitmap(bmap []uint64, from uint32, tailBitsLen uint32) (deleted, missed int32, lastContinued bool) {
+ var start = l.qmap[from]
+ if start != nil {
+ // delete predecessors
+ if pred := start.prev; pred != nil {
+ _, n := l.deleteBefore(pred)
+ deleted += n
+ }
+ } else {
+ // [from] is out of bounds
+ return
+ }
+ var j, bitsLen uint32
+ var bits uint64
+ bits, bmap = bmap[0], bmap[1:]
+ if len(bmap) > 0 {
+ bitsLen = 64
+ } else {
+ // bmap.len==1, tail is here
+ bitsLen = tailBitsLen
+ }
+ // maxContinued will save the max continued node (from [start]) which could be deleted safely.
+ // keep the queue smallest
+ var maxContinued *qNode
+ lastContinued = true
+
+ for i := start; i != nil; j++ {
+ if j >= bitsLen {
+ if len(bmap) > 0 {
+ j = 0
+ bits, bmap = bmap[0], bmap[1:]
+ if len(bmap) > 0 {
+ bitsLen = 64
+ } else {
+ bitsLen = tailBitsLen
+ }
+ } else {
+ // no more pages
+ goto finished
+ }
+ }
+ if bits&1 == 1 {
+ if lastContinued {
+ maxContinued = i
+ }
+ if i.scnt != _SENT_OK {
+ // no mark means first deleting
+ deleted++
+ }
+ // don't delete, just mark it
+ i.scnt = _SENT_OK
+ } else {
+ // known it may be lost
+ if i.miss == 0 {
+ missed++
+ }
+ i.miss++
+ lastContinued = false
+ }
+ bits >>= 1
+ i = i.next
+ }
+
+finished:
+ if maxContinued != nil {
+ l.deleteBefore(maxContinued)
+ }
+ return
+}
diff --git a/weed/udptransfer/linked_test.go b/weed/udptransfer/linked_test.go
new file mode 100644
index 000000000..563d6264a
--- /dev/null
+++ b/weed/udptransfer/linked_test.go
@@ -0,0 +1,236 @@
+package udptransfer
+
+import (
+ "fmt"
+ "math/rand"
+ "testing"
+)
+
+var lmap *linkedMap
+
+func init() {
+ lmap = newLinkedMap(_QModeIn)
+}
+
+func node(seq int) *qNode {
+ return &qNode{packet: &packet{seq: uint32(seq)}}
+}
+
+func Test_get(t *testing.T) {
+ var i, n *qNode
+ assert(!lmap.contains(0), t, "0 nil")
+ n = node(0)
+ lmap.head = n
+ lmap.tail = n
+ lmap.qmap[0] = n
+ i = lmap.get(0)
+ assert(i == n, t, "0=n")
+}
+
+func Test_insert(t *testing.T) {
+ lmap.reset()
+ n := node(1)
+ // appendTail
+ lmap.appendTail(n)
+ assert(lmap.head == n, t, "head n")
+ assert(lmap.tail == n, t, "head n")
+ n = node(2)
+ lmap.appendTail(n)
+ assert(lmap.head != n, t, "head n")
+ assert(lmap.tail == n, t, "head n")
+ assert(lmap.size() == 2, t, "size")
+}
+
+func Test_insertAfter(t *testing.T) {
+ n1 := lmap.get(1)
+ n2 := n1.next
+ n3 := node(3)
+ lmap.insertAfter(n1, n3)
+ assert(n1.next == n3, t, "n3")
+ assert(n1 == n3.prev, t, "left n3")
+ assert(n2 == n3.next, t, "n3 right")
+}
+
+func Test_insertBefore(t *testing.T) {
+ n3 := lmap.get(3)
+ n2 := n3.next
+ n4 := node(4)
+ lmap.insertAfter(n3, n4)
+ assert(n3.next == n4, t, "n4")
+ assert(n3 == n4.prev, t, "left n4")
+ assert(n2 == n4.next, t, "n4 right")
+}
+
+func Test_deleteBefore(t *testing.T) {
+ lmap.reset()
+ for i := 1; i < 10; i++ {
+ n := node(i)
+ lmap.appendTail(n)
+ }
+
+ var assertRangeEquals = func(n *qNode, start, wantCount int) {
+ var last *qNode
+ var count int
+ for ; n != nil; n = n.next {
+ assert(int(n.seq) == start, t, "nseq=%d start=%d", n.seq, start)
+ last = n
+ start++
+ count++
+ }
+ assert(last.next == nil, t, "tail nil")
+ assert(count == wantCount, t, "count")
+ }
+ assertRangeEquals(lmap.head, 1, 9)
+ var n *qNode
+ n = lmap.get(3)
+ n, _ = lmap.deleteBefore(n)
+ assertRangeEquals(n, 1, 3)
+ assert(lmap.head.seq == 4, t, "head")
+
+ n = lmap.get(8)
+ n, _ = lmap.deleteBefore(n)
+ assertRangeEquals(n, 4, 5)
+ assert(lmap.head.seq == 9, t, "head")
+
+ n = lmap.get(9)
+ n, _ = lmap.deleteBefore(n)
+ assertRangeEquals(n, 9, 1)
+
+ assert(lmap.size() == 0, t, "size 0")
+ assert(lmap.head == nil, t, "head nil")
+ assert(lmap.tail == nil, t, "tail nil")
+}
+
+func testBitmap(t *testing.T, bmap []uint64, prev uint32) {
+ var j uint
+ var k int
+ bits := bmap[k]
+ t.Logf("test-%d %016x", k, bits)
+ var checkNextPage = func() {
+ if j >= 64 {
+ j = 0
+ k++
+ bits = bmap[k]
+ t.Logf("test-%d %016x", k, bits)
+ }
+ }
+ for i := lmap.head; i != nil && k < len(bmap); i = i.next {
+ checkNextPage()
+ dis := i.seq - prev
+ prev = i.seq
+ if dis == 1 {
+ bit := (bits >> j) & 1
+ assert(bit == 1, t, "1 bit=%d j=%d", bit, j)
+ j++
+ } else {
+ for ; dis > 0; dis-- {
+ checkNextPage()
+ bit := (bits >> j) & 1
+ want := uint64(0)
+ if dis == 1 {
+ want = 1
+ }
+ assert(bit == want, t, "?=%d bit=%d j=%d", want, bit, j)
+ j++
+ }
+ }
+ }
+ // remains bits should be 0
+ for i := j & 63; i > 0; i-- {
+ bit := (bits >> j) & 1
+ assert(bit == 0, t, "00 bit=%d j=%d", bit, j)
+ j++
+ }
+}
+
+func Test_bitmap(t *testing.T) {
+ var prev uint32
+ var head uint32 = prev + 1
+
+ lmap.reset()
+ // test 66-%3 and record holes
+ var holes = make([]uint32, 0, 50)
+ for i := head; i < 366; i++ {
+ if i%3 == 0 {
+ holes = append(holes, i)
+ continue
+ }
+ n := node(int(i))
+ lmap.appendTail(n)
+ }
+ bmap, tbl := lmap.makeHolesBitmap(prev)
+ testBitmap(t, bmap, prev)
+
+ lmap.reset()
+ // full 66, do deleteByBitmap then compare
+ for i := head; i < 366; i++ {
+ n := node(int(i))
+ lmap.appendTail(n)
+ }
+
+ lmap.deleteByBitmap(bmap, head, tbl)
+ var holesResult = make([]uint32, 0, 50)
+ for i := lmap.head; i != nil; i = i.next {
+ if i.scnt != _SENT_OK {
+ holesResult = append(holesResult, i.seq)
+ }
+ }
+ a := fmt.Sprintf("%x", holes)
+ b := fmt.Sprintf("%x", holesResult)
+ assert(a == b, t, "deleteByBitmap \na=%s \nb=%s", a, b)
+
+ lmap.reset()
+ // test stride across page 1
+ for i := head; i < 69; i++ {
+ if i >= 63 && i <= 65 {
+ continue
+ }
+ n := node(int(i))
+ lmap.appendTail(n)
+ }
+ bmap, _ = lmap.makeHolesBitmap(prev)
+ testBitmap(t, bmap, prev)
+
+ lmap.reset()
+ prev = 65
+ head = prev + 1
+ // test stride across page 0
+ for i := head; i < 68; i++ {
+ n := node(int(i))
+ lmap.appendTail(n)
+ }
+ bmap, _ = lmap.makeHolesBitmap(prev)
+ testBitmap(t, bmap, prev)
+}
+
+var ackbitmap []uint64
+
+func init_benchmark_map() {
+ if lmap.size() != 640 {
+ lmap.reset()
+ for i := 1; i <= 640; i++ {
+ lmap.appendTail(node(i))
+ }
+ ackbitmap = make([]uint64, 10)
+ for i := 0; i < len(ackbitmap); i++ {
+ n := rand.Int63()
+ ackbitmap[i] = uint64(n) << 1
+ }
+ }
+}
+
+func Benchmark_make_bitmap(b *testing.B) {
+ init_benchmark_map()
+
+ for i := 0; i < b.N; i++ {
+ lmap.makeHolesBitmap(0)
+ }
+}
+
+func Benchmark_apply_bitmap(b *testing.B) {
+ init_benchmark_map()
+
+ for i := 0; i < b.N; i++ {
+ lmap.deleteByBitmap(ackbitmap, 1, 64)
+ }
+}
diff --git a/weed/udptransfer/packet.go b/weed/udptransfer/packet.go
new file mode 100644
index 000000000..2413f1ccf
--- /dev/null
+++ b/weed/udptransfer/packet.go
@@ -0,0 +1,142 @@
+package udptransfer
+
+import (
+ "encoding/binary"
+ "fmt"
+)
+
+const (
+ _F_NIL = 0
+ _F_SYN = 1
+ _F_ACK = 1 << 1
+ _F_SACK = 1 << 2
+ _F_TIME = 1 << 3
+ _F_DATA = 1 << 4
+ // reserved = 1 << 5
+ _F_RESET = 1 << 6
+ _F_FIN = 1 << 7
+)
+
+var packetTypeNames = map[byte]string{
+ 0: "NOOP",
+ 1: "SYN",
+ 2: "ACK",
+ 3: "SYN+ACK",
+ 4: "SACK",
+ 8: "TIME",
+ 12: "SACK+TIME",
+ 16: "DATA",
+ 64: "RESET",
+ 128: "FIN",
+ 192: "FIN+RESET",
+}
+
+const (
+ _S_FIN = iota
+ _S_FIN0
+ _S_FIN1
+ _S_SYN0
+ _S_SYN1
+ _S_EST0
+ _S_EST1
+)
+
+// Magic-6 | TH-10 | CH-10 | payload
+const (
+ _MAGIC_SIZE = 6
+ _TH_SIZE = 10 + _MAGIC_SIZE
+ _CH_SIZE = 10
+ _AH_SIZE = _TH_SIZE + _CH_SIZE
+)
+
+const (
+ // Max UDP payload: 1500 MTU - 20 IP hdr - 8 UDP hdr = 1472 bytes
+ // Then: MSS = 1472-26 = 1446
+ // And For ADSL: 1446-8 = 1438
+ _MSS = 1438
+)
+
+const (
+ _SENT_OK = 0xff
+)
+
+type packet struct {
+ seq uint32
+ ack uint32
+ flag uint8
+ scnt uint8
+ payload []byte
+ buffer []byte
+}
+
+func (p *packet) marshall(id connID) []byte {
+ buf := p.buffer
+ if buf == nil {
+ buf = make([]byte, _AH_SIZE+len(p.payload))
+ copy(buf[_TH_SIZE+10:], p.payload)
+ }
+ binary.BigEndian.PutUint16(buf[_MAGIC_SIZE:], uint16(len(buf)))
+ binary.BigEndian.PutUint32(buf[_MAGIC_SIZE+2:], id.rid)
+ binary.BigEndian.PutUint32(buf[_MAGIC_SIZE+6:], id.lid)
+ binary.BigEndian.PutUint32(buf[_TH_SIZE:], p.seq)
+ binary.BigEndian.PutUint32(buf[_TH_SIZE+4:], p.ack)
+ buf[_TH_SIZE+8] = p.flag
+ buf[_TH_SIZE+9] = p.scnt
+ return buf
+}
+
+func unmarshall(pk *packet, buf []byte) {
+ if len(buf) >= _CH_SIZE {
+ pk.seq = binary.BigEndian.Uint32(buf)
+ pk.ack = binary.BigEndian.Uint32(buf[4:])
+ pk.flag = buf[8]
+ pk.scnt = buf[9]
+ pk.payload = buf[10:]
+ }
+}
+
+func (n *qNode) String() string {
+ now := Now()
+ return fmt.Sprintf("type=%s seq=%d scnt=%d sndtime~=%d,%d miss=%d",
+ packetTypeNames[n.flag], n.seq, n.scnt, n.sent-now, n.sent_1-now, n.miss)
+}
+
+func maxI64(a, b int64) int64 {
+ if a >= b {
+ return a
+ } else {
+ return b
+ }
+}
+
+func maxU32(a, b uint32) uint32 {
+ if a >= b {
+ return a
+ } else {
+ return b
+ }
+}
+
+func minI64(a, b int64) int64 {
+ if a <= b {
+ return a
+ } else {
+ return b
+ }
+}
+
+func maxI32(a, b int32) int32 {
+ if a >= b {
+ return a
+ } else {
+ return b
+ }
+}
+
+func minI32(a, b int32) int32 {
+ if a <= b {
+ return a
+ } else {
+ return b
+ }
+}
diff --git a/weed/udptransfer/state.go b/weed/udptransfer/state.go
new file mode 100644
index 000000000..e0b4f1791
--- /dev/null
+++ b/weed/udptransfer/state.go
@@ -0,0 +1,516 @@
+package udptransfer
+
+import (
+ "errors"
+ "log"
+ "net"
+ "sync"
+ "sync/atomic"
+ "time"
+)
+
+const (
+ _10ms = time.Millisecond * 10
+ _100ms = time.Millisecond * 100
+)
+
+const (
+ _FIN_ACK_SEQ uint32 = 0xffFF0000
+ _INVALID_SEQ uint32 = 0xffFFffFF
+)
+
+var (
+ ErrIOTimeout error = &TimeoutError{}
+ ErrUnknown = errors.New("Unknown error")
+ ErrInexplicableData = errors.New("Inexplicable data")
+ ErrTooManyAttempts = errors.New("Too many attempts to connect")
+)
+
+type TimeoutError struct{}
+
+func (e *TimeoutError) Error() string { return "i/o timeout" }
+func (e *TimeoutError) Timeout() bool { return true }
+func (e *TimeoutError) Temporary() bool { return true }
+
+type Conn struct {
+ sock *net.UDPConn
+ dest *net.UDPAddr
+ edp *Endpoint
+ connID connID // 8 bytes
+ // events
+ evRecv chan []byte
+ evRead chan byte
+ evSend chan byte
+ evSWnd chan byte
+ evAck chan byte
+ evClose chan byte
+ // protocol state
+ inlock sync.Mutex
+ outlock sync.Mutex
+ state int32
+ mySeq uint32
+ swnd int32
+ cwnd int32
+ missed int32
+ outPending int32
+ lastAck uint32
+ lastAckTime int64
+ lastAckTime2 int64
+ lastShrink int64
+ lastRstMis int64
+ ato int64
+ rto int64
+ rtt int64
+ srtt int64
+ mdev int64
+ rtmo int64
+ wtmo int64
+ tSlot int64
+ tSlotT0 int64
+ lastSErr int64
+ // queue
+ outQ *linkedMap
+ inQ *linkedMap
+ inQReady []byte
+ inQDirty bool
+ lastReadSeq uint32 // last user read seq
+ // params
+ bandwidth int64
+ fastRetransmit bool
+ flatTraffic bool
+ mss int
+ // statistics
+ urgent int
+ inPkCnt int
+ inDupCnt int
+ outPkCnt int
+ outDupCnt int
+ fRCnt int
+}
+
+func NewConn(e *Endpoint, dest *net.UDPAddr, id connID) *Conn {
+ c := &Conn{
+ sock: e.udpconn,
+ dest: dest,
+ edp: e,
+ connID: id,
+ evRecv: make(chan []byte, 128),
+ evRead: make(chan byte, 1),
+ evSWnd: make(chan byte, 2),
+ evSend: make(chan byte, 4),
+ evAck: make(chan byte, 1),
+ evClose: make(chan byte, 2),
+ outQ: newLinkedMap(_QModeOut),
+ inQ: newLinkedMap(_QModeIn),
+ }
+ p := e.params
+ c.bandwidth = p.Bandwidth
+ c.fastRetransmit = p.FastRetransmit
+ c.flatTraffic = p.FlatTraffic
+ c.mss = _MSS
+ if dest.IP.To4() == nil {
+ // typical ipv6 header length=40
+ c.mss -= 20
+ }
+ return c
+}
+
+func (c *Conn) initConnection(buf []byte) (err error) {
+ if buf == nil {
+ err = c.initDialing()
+ } else { //server
+ err = c.acceptConnection(buf[_TH_SIZE:])
+ }
+ if err != nil {
+ return
+ }
+ if c.state == _S_EST1 {
+ c.lastReadSeq = c.lastAck
+ c.inQ.maxCtnSeq = c.lastAck
+ c.rtt = maxI64(c.rtt, _MIN_RTT)
+ c.mdev = c.rtt << 1
+ c.srtt = c.rtt << 3
+ c.rto = maxI64(c.rtt*2, _MIN_RTO)
+ c.ato = maxI64(c.rtt>>4, _MIN_ATO)
+ c.ato = minI64(c.ato, _MAX_ATO)
+ // initial cwnd
+ c.swnd = calSwnd(c.bandwidth, c.rtt) >> 1
+ c.cwnd = 8
+ go c.internalRecvLoop()
+ go c.internalSendLoop()
+ go c.internalAckLoop()
+ if debug >= 0 {
+ go c.internal_state()
+ }
+ return nil
+ } else {
+ return ErrUnknown
+ }
+}
+
+func (c *Conn) initDialing() error {
+ // first syn
+ pk := &packet{
+ seq: c.mySeq,
+ flag: _F_SYN,
+ }
+ item := nodeOf(pk)
+ var buf []byte
+ c.state = _S_SYN0
+ t0 := Now()
+ for i := 0; i < _MAX_RETRIES && c.state == _S_SYN0; i++ {
+ // send syn
+ c.internalWrite(item)
+ select {
+ case buf = <-c.evRecv:
+ c.rtt = Now() - t0
+ c.state = _S_SYN1
+ c.connID.setRid(buf)
+ buf = buf[_TH_SIZE:]
+ case <-time.After(time.Second):
+ continue
+ }
+ }
+ if c.state == _S_SYN0 {
+ return ErrTooManyAttempts
+ }
+
+ unmarshall(pk, buf)
+ // expected syn+ack
+ if pk.flag == _F_SYN|_F_ACK && pk.ack == c.mySeq {
+ if scnt := pk.scnt - 1; scnt > 0 {
+ c.rtt -= int64(scnt) * 1e3
+ }
+ log.Println("rtt", c.rtt)
+ c.state = _S_EST0
+ // build ack3
+ pk.scnt = 0
+ pk.ack = pk.seq
+ pk.flag = _F_ACK
+ item := nodeOf(pk)
+ // send ack3
+ c.internalWrite(item)
+ // update lastAck
+ c.logAck(pk.ack)
+ c.state = _S_EST1
+ return nil
+ } else {
+ return ErrInexplicableData
+ }
+}
+
+func (c *Conn) acceptConnection(buf []byte) error {
+ var pk = new(packet)
+ var item *qNode
+ unmarshall(pk, buf)
+ // expected syn
+ if pk.flag == _F_SYN {
+ c.state = _S_SYN1
+ // build syn+ack
+ pk.ack = pk.seq
+ pk.seq = c.mySeq
+ pk.flag |= _F_ACK
+ // update lastAck
+ c.logAck(pk.ack)
+ item = nodeOf(pk)
+ item.scnt = pk.scnt - 1
+ } else {
+ dumpb("Syn1 ?", buf)
+ return ErrInexplicableData
+ }
+ for i := 0; i < 5 && c.state == _S_SYN1; i++ {
+ t0 := Now()
+ // reply syn+ack
+ c.internalWrite(item)
+ // recv ack3
+ select {
+ case buf = <-c.evRecv:
+ c.state = _S_EST0
+ c.rtt = Now() - t0
+ buf = buf[_TH_SIZE:]
+ log.Println("rtt", c.rtt)
+ case <-time.After(time.Second):
+ continue
+ }
+ }
+ if c.state == _S_SYN1 {
+ return ErrTooManyAttempts
+ }
+
+ pk = new(packet)
+ unmarshall(pk, buf)
+ // expected ack3
+ if pk.flag == _F_ACK && pk.ack == c.mySeq {
+ c.state = _S_EST1
+ } else {
+ // if ack3 lost, resend syn+ack 3-times
+ // and drop these coming data
+ if pk.flag&_F_DATA != 0 && pk.seq > c.lastAck {
+ c.internalWrite(item)
+ c.state = _S_EST1
+ } else {
+ dumpb("Ack3 ?", buf)
+ return ErrInexplicableData
+ }
+ }
+ return nil
+}
+
+// 20,20,20,20, 100,100,100,100, 1s,1s,1s,1s
+func selfSpinWait(fn func() bool) error {
+ const _MAX_SPIN = 12
+ for i := 0; i < _MAX_SPIN; i++ {
+ if fn() {
+ return nil
+ } else if i <= 3 {
+ time.Sleep(_10ms * 2)
+ } else if i <= 7 {
+ time.Sleep(_100ms)
+ } else {
+ time.Sleep(time.Second)
+ }
+ }
+ return ErrIOTimeout
+}
+
+func (c *Conn) IsClosed() bool {
+ return atomic.LoadInt32(&c.state) <= _S_FIN1
+}
+
+/*
+active close:
+1 <- send fin-W: closeW()
+ before sending, ensure all outQ items has beed sent out and all of them has been acked.
+2 -> wait to recv ack{fin-W}
+ then trigger closeR, including send fin-R and wait to recv ack{fin-R}
+
+passive close:
+-> fin:
+ if outQ is not empty then self-spin wait.
+ if outQ empty, send ack{fin-W} then goto closeW().
+*/
+func (c *Conn) Close() (err error) {
+ if !atomic.CompareAndSwapInt32(&c.state, _S_EST1, _S_FIN0) {
+ return selfSpinWait(func() bool {
+ return atomic.LoadInt32(&c.state) == _S_FIN
+ })
+ }
+ var err0 error
+ err0 = c.closeW()
+ // waiting for fin-2 of peer
+ err = selfSpinWait(func() bool {
+ select {
+ case v := <-c.evClose:
+ if v == _S_FIN {
+ return true
+ } else {
+ time.AfterFunc(_100ms, func() { c.evClose <- v })
+ }
+ default:
+ }
+ return false
+ })
+ defer c.afterShutdown()
+ if err != nil {
+ // backup path for wait ack(finW) timeout
+ c.closeR(nil)
+ }
+ if err0 != nil {
+ return err0
+ } else {
+ return
+ }
+}
+
+func (c *Conn) beforeCloseW() (err error) {
+ // check outQ was empty and all has been acked.
+ // self-spin waiting
+ for i := 0; i < 2; i++ {
+ err = selfSpinWait(func() bool {
+ return atomic.LoadInt32(&c.outPending) <= 0
+ })
+ if err == nil {
+ break
+ }
+ }
+ // send fin, reliably
+ c.outlock.Lock()
+ c.mySeq++
+ c.outPending++
+ pk := &packet{seq: c.mySeq, flag: _F_FIN}
+ item := nodeOf(pk)
+ c.outQ.appendTail(item)
+ c.internalWrite(item)
+ c.outlock.Unlock()
+ c.evSWnd <- _VSWND_ACTIVE
+ return
+}
+
+func (c *Conn) closeW() (err error) {
+ // close resource of sending
+ defer c.afterCloseW()
+ // send fin
+ err = c.beforeCloseW()
+ var closed bool
+ var max = 20
+ if c.rtt > 200 {
+ max = int(c.rtt) / 10
+ }
+ // waiting for outQ means:
+ // 1. all outQ has been acked, for passive
+ // 2. fin has been acked, for active
+ for i := 0; i < max && (atomic.LoadInt32(&c.outPending) > 0 || !closed); i++ {
+ select {
+ case v := <-c.evClose:
+ if v == _S_FIN0 {
+ // namely, last fin has been acked.
+ closed = true
+ } else {
+ time.AfterFunc(_100ms, func() { c.evClose <- v })
+ }
+ case <-time.After(_100ms):
+ }
+ }
+ if closed || err != nil {
+ return
+ } else {
+ return ErrIOTimeout
+ }
+}
+
+func (c *Conn) afterCloseW() {
+ // can't close(c.evRecv), avoid endpoint dispatch exception
+ // stop pending inputAndSend
+ select {
+ case c.evSend <- _CLOSE:
+ default:
+ }
+ // stop internalSendLoop
+ c.evSWnd <- _CLOSE
+}
+
+// called by active and passive close()
+func (c *Conn) afterShutdown() {
+ // stop internalRecvLoop
+ c.evRecv <- nil
+ // remove registry
+ c.edp.removeConn(c.connID, c.dest)
+ log.Println("shutdown", c.state)
+}
+
+// trigger by reset
+func (c *Conn) forceShutdownWithLock() {
+ c.outlock.Lock()
+ defer c.outlock.Unlock()
+ c.forceShutdown()
+}
+
+// called by:
+// 1/ send exception
+// 2/ recv reset
+// drop outQ and force shutdown
+func (c *Conn) forceShutdown() {
+ if atomic.CompareAndSwapInt32(&c.state, _S_EST1, _S_FIN) {
+ defer c.afterShutdown()
+ // stop sender
+ for i := 0; i < cap(c.evSend); i++ {
+ select {
+ case <-c.evSend:
+ default:
+ }
+ }
+ select {
+ case c.evSend <- _CLOSE:
+ default:
+ }
+ c.outQ.reset()
+ // stop reader
+ close(c.evRead)
+ c.inQ.reset()
+ // stop internalLoops
+ c.evSWnd <- _CLOSE
+ c.evAck <- _CLOSE
+ //log.Println("force shutdown")
+ }
+}
+
+// for sending fin failed
+func (c *Conn) fakeShutdown() {
+ select {
+ case c.evClose <- _S_FIN0:
+ default:
+ }
+}
+
+func (c *Conn) closeR(pk *packet) {
+ var passive = true
+ for {
+ state := atomic.LoadInt32(&c.state)
+ switch state {
+ case _S_FIN:
+ return
+ case _S_FIN1: // multiple FIN, maybe lost
+ c.passiveCloseReply(pk, false)
+ return
+ case _S_FIN0: // active close preformed
+ passive = false
+ }
+ if !atomic.CompareAndSwapInt32(&c.state, state, _S_FIN1) {
+ continue
+ }
+ c.passiveCloseReply(pk, true)
+ break
+ }
+ // here, R is closed.
+ // ^^^^^^^^^^^^^^^^^^^^^
+ if passive {
+ // passive closing call closeW contains sending fin and recv ack
+ // may the ack of fin-2 was lost, then the closeW will timeout
+ c.closeW()
+ }
+ // here, R,W both were closed.
+ // ^^^^^^^^^^^^^^^^^^^^^
+ atomic.StoreInt32(&c.state, _S_FIN)
+ // stop internalAckLoop
+ c.evAck <- _CLOSE
+
+ if passive {
+ // close evRecv within here
+ c.afterShutdown()
+ } else {
+ // notify active close thread
+ select {
+ case c.evClose <- _S_FIN:
+ default:
+ }
+ }
+}
+
+func (c *Conn) passiveCloseReply(pk *packet, first bool) {
+ if pk != nil && pk.flag&_F_FIN != 0 {
+ if first {
+ c.checkInQ(pk)
+ close(c.evRead)
+ }
+ // ack the FIN
+ pk = &packet{seq: _FIN_ACK_SEQ, ack: pk.seq, flag: _F_ACK}
+ item := nodeOf(pk)
+ c.internalWrite(item)
+ }
+}
+
+// check inQ ends orderly, and copy queue data to user space
+func (c *Conn) checkInQ(pk *packet) {
+ if nil != selfSpinWait(func() bool {
+ return c.inQ.maxCtnSeq+1 == pk.seq
+ }) { // timeout for waiting inQ to finish
+ return
+ }
+ c.inlock.Lock()
+ defer c.inlock.Unlock()
+ if c.inQ.size() > 0 {
+ for i := c.inQ.head; i != nil; i = i.next {
+ c.inQReady = append(c.inQReady, i.payload...)
+ }
+ }
+}
diff --git a/weed/udptransfer/stopwatch.go b/weed/udptransfer/stopwatch.go
new file mode 100644
index 000000000..2ee4feb57
--- /dev/null
+++ b/weed/udptransfer/stopwatch.go
@@ -0,0 +1,36 @@
+package udptransfer
+
+import (
+ "fmt"
+ "os"
+ "time"
+)
+
+type watch struct {
+ label string
+ t1 time.Time
+}
+
+func StartWatch(s string) *watch {
+ return &watch{
+ label: s,
+ t1: time.Now(),
+ }
+}
+
+func (w *watch) StopLoops(loop int, size int) {
+ tu := time.Now().Sub(w.t1).Nanoseconds()
+ timePerLoop := float64(tu) / float64(loop)
+ throughput := float64(loop*size) * 1e6 / float64(tu)
+ tu_ms := float64(tu) / 1e6
+ fmt.Fprintf(os.Stderr, "%s tu=%.2f ms tpl=%.0f ns throughput=%.2f K/s\n", w.label, tu_ms, timePerLoop, throughput)
+}
+
+var _kt = float64(1e9 / 1024)
+
+func (w *watch) Stop(size int) {
+ tu := time.Now().Sub(w.t1).Nanoseconds()
+ throughput := float64(size) * _kt / float64(tu)
+ tu_ms := float64(tu) / 1e6
+ fmt.Fprintf(os.Stderr, "%s tu=%.2f ms throughput=%.2f K/s\n", w.label, tu_ms, throughput)
+}
diff --git a/weed/udptransfer/timer.go b/weed/udptransfer/timer.go
new file mode 100644
index 000000000..adcfe50c3
--- /dev/null
+++ b/weed/udptransfer/timer.go
@@ -0,0 +1,18 @@
+package udptransfer
+
+import "time"
+
+const Millisecond = 1e6
+
+func Now() int64 {
+ return time.Now().UnixNano()/Millisecond
+}
+
+func NowNS() int64 {
+ return time.Now().UnixNano()
+}
+
+func NewTimerChan(d int64) <-chan time.Time {
+ ticker := time.NewTimer(time.Duration(d) * time.Millisecond)
+ return ticker.C
+}
diff --git a/weed/udptransfer/timing_test.go b/weed/udptransfer/timing_test.go
new file mode 100644
index 000000000..4f73bc33a
--- /dev/null
+++ b/weed/udptransfer/timing_test.go
@@ -0,0 +1,32 @@
+package udptransfer
+
+import (
+ "math"
+ "testing"
+ "time"
+)
+
+func Test_sleep(t *testing.T) {
+ const loops = 10
+ var intervals = [...]int64{1, 1e3, 1e4, 1e5, 1e6, 1e7}
+ var ret [len(intervals)][loops]int64
+ for j := 0; j < len(intervals); j++ {
+ v := time.Duration(intervals[j])
+ for i := 0; i < loops; i++ {
+ t0 := NowNS()
+ time.Sleep(v)
+ ret[j][i] = NowNS() - t0
+ }
+ }
+ for j := 0; j < len(intervals); j++ {
+ var exp, sum, stdev float64
+ exp = float64(intervals[j])
+ for _, v := range ret[j] {
+ sum += float64(v)
+ stdev += math.Pow(float64(v)-exp, 2)
+ }
+ stdev /= float64(loops)
+ stdev = math.Sqrt(stdev)
+ t.Logf("interval=%s sleeping=%s stdev/k=%.2f", time.Duration(intervals[j]), time.Duration(sum/loops), stdev/1e3)
+ }
+}
diff --git a/weed/wdclient/volume_udp_client.go b/weed/wdclient/volume_udp_client.go
index b8bdcec90..93fd2b227 100644
--- a/weed/wdclient/volume_udp_client.go
+++ b/weed/wdclient/volume_udp_client.go
@@ -1,21 +1,68 @@
package wdclient
import (
+ "bufio"
+ "bytes"
+ "fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/udptransfer"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/chrislusf/seaweedfs/weed/wdclient/net2"
"io"
- "pack.ag/tftp"
+ "net"
+ "time"
)
-// VolumeTcpClient put/get/delete file chunks directly on volume servers without replication
+// VolumeUdpClient put/get/delete file chunks directly on volume servers without replication
type VolumeUdpClient struct {
+ cp net2.ConnectionPool
+}
+
+type VolumeUdpConn struct {
+ net.Conn
+ bufWriter *bufio.Writer
+ bufReader *bufio.Reader
}
func NewVolumeUdpClient() *VolumeUdpClient {
+ MaxIdleTime := 10 * time.Second
return &VolumeUdpClient{
+ cp: net2.NewMultiConnectionPool(net2.ConnectionOptions{
+ MaxActiveConnections: 16,
+ MaxIdleConnections: 1,
+ MaxIdleTime: &MaxIdleTime,
+ DialMaxConcurrency: 0,
+ Dial: func(network string, address string) (net.Conn, error) {
+
+ listener, err := udptransfer.NewEndpoint(&udptransfer.Params{
+ LocalAddr: "",
+ Bandwidth: 100,
+ FastRetransmit: true,
+ FlatTraffic: true,
+ IsServ: false,
+ })
+ if err != nil {
+ return nil, err
+ }
+
+ conn, err := listener.Dial(address)
+ if err != nil {
+ return nil, err
+ }
+ return &VolumeUdpConn{
+ conn,
+ bufio.NewWriter(conn),
+ bufio.NewReader(conn),
+ }, err
+
+ },
+ NowFunc: nil,
+ ReadTimeout: 0,
+ WriteTimeout: 0,
+ }),
}
}
-
func (c *VolumeUdpClient) PutFileChunk(volumeServerAddress string, fileId string, fileSize uint32, fileReader io.Reader) (err error) {
udpAddress, parseErr := pb.ParseServerAddress(volumeServerAddress, 20001)
@@ -23,24 +70,45 @@ func (c *VolumeUdpClient) PutFileChunk(volumeServerAddress string, fileId string
return parseErr
}
- udpClient, _ := tftp.NewClient(
- tftp.ClientMode(tftp.ModeOctet),
- tftp.ClientBlocksize(9000),
- tftp.ClientWindowsize(16),
- tftp.ClientTimeout(1),
- tftp.ClientTransferSize(true),
- tftp.ClientRetransmit(3),
- )
+ c.cp.Register("udp", udpAddress)
+ udpConn, getErr := c.cp.Get("udp", udpAddress)
+ if getErr != nil {
+ return fmt.Errorf("get connection to %s: %v", udpAddress, getErr)
+ }
+ conn := udpConn.RawConn().(*VolumeUdpConn)
+ defer func() {
+ if err != nil {
+ udpConn.DiscardConnection()
+ } else {
+ udpConn.ReleaseConnection()
+ }
+ }()
- fileUrl := "tftp://" + udpAddress + "/" + fileId
+ buf := []byte("+" + fileId + "\n")
+ _, err = conn.bufWriter.Write([]byte(buf))
+ if err != nil {
+ return
+ }
+ util.Uint32toBytes(buf[0:4], fileSize)
+ _, err = conn.bufWriter.Write(buf[0:4])
+ if err != nil {
+ return
+ }
+ _, err = io.Copy(conn.bufWriter, fileReader)
+ if err != nil {
+ return
+ }
+ conn.bufWriter.Write([]byte("!\n"))
+ conn.bufWriter.Flush()
- // println("put", fileUrl, "...")
- err = udpClient.Put(fileUrl, fileReader, int64(fileSize))
+ ret, _, err := conn.bufReader.ReadLine()
if err != nil {
- glog.Errorf("udp put %s: %v", fileUrl, err)
+ glog.V(0).Infof("upload by udp: %v", err)
return
}
- // println("sent", fileUrl)
+ if !bytes.HasPrefix(ret, []byte("+OK")) {
+ glog.V(0).Infof("upload by udp: %v", string(ret))
+ }
- return
+ return nil
}