aboutsummaryrefslogtreecommitdiff
path: root/weed/udptransfer/state.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/udptransfer/state.go')
-rw-r--r--weed/udptransfer/state.go516
1 files changed, 516 insertions, 0 deletions
diff --git a/weed/udptransfer/state.go b/weed/udptransfer/state.go
new file mode 100644
index 000000000..e0b4f1791
--- /dev/null
+++ b/weed/udptransfer/state.go
@@ -0,0 +1,516 @@
+package udptransfer
+
+import (
+ "errors"
+ "log"
+ "net"
+ "sync"
+ "sync/atomic"
+ "time"
+)
+
+const (
+ _10ms = time.Millisecond * 10
+ _100ms = time.Millisecond * 100
+)
+
+const (
+ _FIN_ACK_SEQ uint32 = 0xffFF0000
+ _INVALID_SEQ uint32 = 0xffFFffFF
+)
+
+var (
+ ErrIOTimeout error = &TimeoutError{}
+ ErrUnknown = errors.New("Unknown error")
+ ErrInexplicableData = errors.New("Inexplicable data")
+ ErrTooManyAttempts = errors.New("Too many attempts to connect")
+)
+
+type TimeoutError struct{}
+
+func (e *TimeoutError) Error() string { return "i/o timeout" }
+func (e *TimeoutError) Timeout() bool { return true }
+func (e *TimeoutError) Temporary() bool { return true }
+
+type Conn struct {
+ sock *net.UDPConn
+ dest *net.UDPAddr
+ edp *Endpoint
+ connID connID // 8 bytes
+ // events
+ evRecv chan []byte
+ evRead chan byte
+ evSend chan byte
+ evSWnd chan byte
+ evAck chan byte
+ evClose chan byte
+ // protocol state
+ inlock sync.Mutex
+ outlock sync.Mutex
+ state int32
+ mySeq uint32
+ swnd int32
+ cwnd int32
+ missed int32
+ outPending int32
+ lastAck uint32
+ lastAckTime int64
+ lastAckTime2 int64
+ lastShrink int64
+ lastRstMis int64
+ ato int64
+ rto int64
+ rtt int64
+ srtt int64
+ mdev int64
+ rtmo int64
+ wtmo int64
+ tSlot int64
+ tSlotT0 int64
+ lastSErr int64
+ // queue
+ outQ *linkedMap
+ inQ *linkedMap
+ inQReady []byte
+ inQDirty bool
+ lastReadSeq uint32 // last user read seq
+ // params
+ bandwidth int64
+ fastRetransmit bool
+ flatTraffic bool
+ mss int
+ // statistics
+ urgent int
+ inPkCnt int
+ inDupCnt int
+ outPkCnt int
+ outDupCnt int
+ fRCnt int
+}
+
+func NewConn(e *Endpoint, dest *net.UDPAddr, id connID) *Conn {
+ c := &Conn{
+ sock: e.udpconn,
+ dest: dest,
+ edp: e,
+ connID: id,
+ evRecv: make(chan []byte, 128),
+ evRead: make(chan byte, 1),
+ evSWnd: make(chan byte, 2),
+ evSend: make(chan byte, 4),
+ evAck: make(chan byte, 1),
+ evClose: make(chan byte, 2),
+ outQ: newLinkedMap(_QModeOut),
+ inQ: newLinkedMap(_QModeIn),
+ }
+ p := e.params
+ c.bandwidth = p.Bandwidth
+ c.fastRetransmit = p.FastRetransmit
+ c.flatTraffic = p.FlatTraffic
+ c.mss = _MSS
+ if dest.IP.To4() == nil {
+ // typical ipv6 header length=40
+ c.mss -= 20
+ }
+ return c
+}
+
+func (c *Conn) initConnection(buf []byte) (err error) {
+ if buf == nil {
+ err = c.initDialing()
+ } else { //server
+ err = c.acceptConnection(buf[_TH_SIZE:])
+ }
+ if err != nil {
+ return
+ }
+ if c.state == _S_EST1 {
+ c.lastReadSeq = c.lastAck
+ c.inQ.maxCtnSeq = c.lastAck
+ c.rtt = maxI64(c.rtt, _MIN_RTT)
+ c.mdev = c.rtt << 1
+ c.srtt = c.rtt << 3
+ c.rto = maxI64(c.rtt*2, _MIN_RTO)
+ c.ato = maxI64(c.rtt>>4, _MIN_ATO)
+ c.ato = minI64(c.ato, _MAX_ATO)
+ // initial cwnd
+ c.swnd = calSwnd(c.bandwidth, c.rtt) >> 1
+ c.cwnd = 8
+ go c.internalRecvLoop()
+ go c.internalSendLoop()
+ go c.internalAckLoop()
+ if debug >= 0 {
+ go c.internal_state()
+ }
+ return nil
+ } else {
+ return ErrUnknown
+ }
+}
+
+func (c *Conn) initDialing() error {
+ // first syn
+ pk := &packet{
+ seq: c.mySeq,
+ flag: _F_SYN,
+ }
+ item := nodeOf(pk)
+ var buf []byte
+ c.state = _S_SYN0
+ t0 := Now()
+ for i := 0; i < _MAX_RETRIES && c.state == _S_SYN0; i++ {
+ // send syn
+ c.internalWrite(item)
+ select {
+ case buf = <-c.evRecv:
+ c.rtt = Now() - t0
+ c.state = _S_SYN1
+ c.connID.setRid(buf)
+ buf = buf[_TH_SIZE:]
+ case <-time.After(time.Second):
+ continue
+ }
+ }
+ if c.state == _S_SYN0 {
+ return ErrTooManyAttempts
+ }
+
+ unmarshall(pk, buf)
+ // expected syn+ack
+ if pk.flag == _F_SYN|_F_ACK && pk.ack == c.mySeq {
+ if scnt := pk.scnt - 1; scnt > 0 {
+ c.rtt -= int64(scnt) * 1e3
+ }
+ log.Println("rtt", c.rtt)
+ c.state = _S_EST0
+ // build ack3
+ pk.scnt = 0
+ pk.ack = pk.seq
+ pk.flag = _F_ACK
+ item := nodeOf(pk)
+ // send ack3
+ c.internalWrite(item)
+ // update lastAck
+ c.logAck(pk.ack)
+ c.state = _S_EST1
+ return nil
+ } else {
+ return ErrInexplicableData
+ }
+}
+
+func (c *Conn) acceptConnection(buf []byte) error {
+ var pk = new(packet)
+ var item *qNode
+ unmarshall(pk, buf)
+ // expected syn
+ if pk.flag == _F_SYN {
+ c.state = _S_SYN1
+ // build syn+ack
+ pk.ack = pk.seq
+ pk.seq = c.mySeq
+ pk.flag |= _F_ACK
+ // update lastAck
+ c.logAck(pk.ack)
+ item = nodeOf(pk)
+ item.scnt = pk.scnt - 1
+ } else {
+ dumpb("Syn1 ?", buf)
+ return ErrInexplicableData
+ }
+ for i := 0; i < 5 && c.state == _S_SYN1; i++ {
+ t0 := Now()
+ // reply syn+ack
+ c.internalWrite(item)
+ // recv ack3
+ select {
+ case buf = <-c.evRecv:
+ c.state = _S_EST0
+ c.rtt = Now() - t0
+ buf = buf[_TH_SIZE:]
+ log.Println("rtt", c.rtt)
+ case <-time.After(time.Second):
+ continue
+ }
+ }
+ if c.state == _S_SYN1 {
+ return ErrTooManyAttempts
+ }
+
+ pk = new(packet)
+ unmarshall(pk, buf)
+ // expected ack3
+ if pk.flag == _F_ACK && pk.ack == c.mySeq {
+ c.state = _S_EST1
+ } else {
+ // if ack3 lost, resend syn+ack 3-times
+ // and drop these coming data
+ if pk.flag&_F_DATA != 0 && pk.seq > c.lastAck {
+ c.internalWrite(item)
+ c.state = _S_EST1
+ } else {
+ dumpb("Ack3 ?", buf)
+ return ErrInexplicableData
+ }
+ }
+ return nil
+}
+
+// 20,20,20,20, 100,100,100,100, 1s,1s,1s,1s
+func selfSpinWait(fn func() bool) error {
+ const _MAX_SPIN = 12
+ for i := 0; i < _MAX_SPIN; i++ {
+ if fn() {
+ return nil
+ } else if i <= 3 {
+ time.Sleep(_10ms * 2)
+ } else if i <= 7 {
+ time.Sleep(_100ms)
+ } else {
+ time.Sleep(time.Second)
+ }
+ }
+ return ErrIOTimeout
+}
+
+func (c *Conn) IsClosed() bool {
+ return atomic.LoadInt32(&c.state) <= _S_FIN1
+}
+
+/*
+active close:
+1 <- send fin-W: closeW()
+ before sending, ensure all outQ items has beed sent out and all of them has been acked.
+2 -> wait to recv ack{fin-W}
+ then trigger closeR, including send fin-R and wait to recv ack{fin-R}
+
+passive close:
+-> fin:
+ if outQ is not empty then self-spin wait.
+ if outQ empty, send ack{fin-W} then goto closeW().
+*/
+func (c *Conn) Close() (err error) {
+ if !atomic.CompareAndSwapInt32(&c.state, _S_EST1, _S_FIN0) {
+ return selfSpinWait(func() bool {
+ return atomic.LoadInt32(&c.state) == _S_FIN
+ })
+ }
+ var err0 error
+ err0 = c.closeW()
+ // waiting for fin-2 of peer
+ err = selfSpinWait(func() bool {
+ select {
+ case v := <-c.evClose:
+ if v == _S_FIN {
+ return true
+ } else {
+ time.AfterFunc(_100ms, func() { c.evClose <- v })
+ }
+ default:
+ }
+ return false
+ })
+ defer c.afterShutdown()
+ if err != nil {
+ // backup path for wait ack(finW) timeout
+ c.closeR(nil)
+ }
+ if err0 != nil {
+ return err0
+ } else {
+ return
+ }
+}
+
+func (c *Conn) beforeCloseW() (err error) {
+ // check outQ was empty and all has been acked.
+ // self-spin waiting
+ for i := 0; i < 2; i++ {
+ err = selfSpinWait(func() bool {
+ return atomic.LoadInt32(&c.outPending) <= 0
+ })
+ if err == nil {
+ break
+ }
+ }
+ // send fin, reliably
+ c.outlock.Lock()
+ c.mySeq++
+ c.outPending++
+ pk := &packet{seq: c.mySeq, flag: _F_FIN}
+ item := nodeOf(pk)
+ c.outQ.appendTail(item)
+ c.internalWrite(item)
+ c.outlock.Unlock()
+ c.evSWnd <- _VSWND_ACTIVE
+ return
+}
+
+func (c *Conn) closeW() (err error) {
+ // close resource of sending
+ defer c.afterCloseW()
+ // send fin
+ err = c.beforeCloseW()
+ var closed bool
+ var max = 20
+ if c.rtt > 200 {
+ max = int(c.rtt) / 10
+ }
+ // waiting for outQ means:
+ // 1. all outQ has been acked, for passive
+ // 2. fin has been acked, for active
+ for i := 0; i < max && (atomic.LoadInt32(&c.outPending) > 0 || !closed); i++ {
+ select {
+ case v := <-c.evClose:
+ if v == _S_FIN0 {
+ // namely, last fin has been acked.
+ closed = true
+ } else {
+ time.AfterFunc(_100ms, func() { c.evClose <- v })
+ }
+ case <-time.After(_100ms):
+ }
+ }
+ if closed || err != nil {
+ return
+ } else {
+ return ErrIOTimeout
+ }
+}
+
+func (c *Conn) afterCloseW() {
+ // can't close(c.evRecv), avoid endpoint dispatch exception
+ // stop pending inputAndSend
+ select {
+ case c.evSend <- _CLOSE:
+ default:
+ }
+ // stop internalSendLoop
+ c.evSWnd <- _CLOSE
+}
+
+// called by active and passive close()
+func (c *Conn) afterShutdown() {
+ // stop internalRecvLoop
+ c.evRecv <- nil
+ // remove registry
+ c.edp.removeConn(c.connID, c.dest)
+ log.Println("shutdown", c.state)
+}
+
+// trigger by reset
+func (c *Conn) forceShutdownWithLock() {
+ c.outlock.Lock()
+ defer c.outlock.Unlock()
+ c.forceShutdown()
+}
+
+// called by:
+// 1/ send exception
+// 2/ recv reset
+// drop outQ and force shutdown
+func (c *Conn) forceShutdown() {
+ if atomic.CompareAndSwapInt32(&c.state, _S_EST1, _S_FIN) {
+ defer c.afterShutdown()
+ // stop sender
+ for i := 0; i < cap(c.evSend); i++ {
+ select {
+ case <-c.evSend:
+ default:
+ }
+ }
+ select {
+ case c.evSend <- _CLOSE:
+ default:
+ }
+ c.outQ.reset()
+ // stop reader
+ close(c.evRead)
+ c.inQ.reset()
+ // stop internalLoops
+ c.evSWnd <- _CLOSE
+ c.evAck <- _CLOSE
+ //log.Println("force shutdown")
+ }
+}
+
+// for sending fin failed
+func (c *Conn) fakeShutdown() {
+ select {
+ case c.evClose <- _S_FIN0:
+ default:
+ }
+}
+
+func (c *Conn) closeR(pk *packet) {
+ var passive = true
+ for {
+ state := atomic.LoadInt32(&c.state)
+ switch state {
+ case _S_FIN:
+ return
+ case _S_FIN1: // multiple FIN, maybe lost
+ c.passiveCloseReply(pk, false)
+ return
+ case _S_FIN0: // active close preformed
+ passive = false
+ }
+ if !atomic.CompareAndSwapInt32(&c.state, state, _S_FIN1) {
+ continue
+ }
+ c.passiveCloseReply(pk, true)
+ break
+ }
+ // here, R is closed.
+ // ^^^^^^^^^^^^^^^^^^^^^
+ if passive {
+ // passive closing call closeW contains sending fin and recv ack
+ // may the ack of fin-2 was lost, then the closeW will timeout
+ c.closeW()
+ }
+ // here, R,W both were closed.
+ // ^^^^^^^^^^^^^^^^^^^^^
+ atomic.StoreInt32(&c.state, _S_FIN)
+ // stop internalAckLoop
+ c.evAck <- _CLOSE
+
+ if passive {
+ // close evRecv within here
+ c.afterShutdown()
+ } else {
+ // notify active close thread
+ select {
+ case c.evClose <- _S_FIN:
+ default:
+ }
+ }
+}
+
+func (c *Conn) passiveCloseReply(pk *packet, first bool) {
+ if pk != nil && pk.flag&_F_FIN != 0 {
+ if first {
+ c.checkInQ(pk)
+ close(c.evRead)
+ }
+ // ack the FIN
+ pk = &packet{seq: _FIN_ACK_SEQ, ack: pk.seq, flag: _F_ACK}
+ item := nodeOf(pk)
+ c.internalWrite(item)
+ }
+}
+
+// check inQ ends orderly, and copy queue data to user space
+func (c *Conn) checkInQ(pk *packet) {
+ if nil != selfSpinWait(func() bool {
+ return c.inQ.maxCtnSeq+1 == pk.seq
+ }) { // timeout for waiting inQ to finish
+ return
+ }
+ c.inlock.Lock()
+ defer c.inlock.Unlock()
+ if c.inQ.size() > 0 {
+ for i := c.inQ.head; i != nil; i = i.next {
+ c.inQReady = append(c.inQReady, i.payload...)
+ }
+ }
+}