udp.go 4.8 KB


  1. // Copyright 2017 fatedier, fatedier@gmail.com
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package net
  15. import (
  16. "fmt"
  17. "io"
  18. "net"
  19. "strconv"
  20. "sync"
  21. "time"
  22. "github.com/fatedier/golib/pool"
  23. )
  24. type UDPPacket struct {
  25. Buf []byte
  26. LocalAddr net.Addr
  27. RemoteAddr net.Addr
  28. }
  29. type FakeUDPConn struct {
  30. l *UDPListener
  31. localAddr net.Addr
  32. remoteAddr net.Addr
  33. packets chan []byte
  34. closeFlag bool
  35. lastActive time.Time
  36. mu sync.RWMutex
  37. }
  38. func NewFakeUDPConn(l *UDPListener, laddr, raddr net.Addr) *FakeUDPConn {
  39. fc := &FakeUDPConn{
  40. l: l,
  41. localAddr: laddr,
  42. remoteAddr: raddr,
  43. packets: make(chan []byte, 20),
  44. }
  45. go func() {
  46. for {
  47. time.Sleep(5 * time.Second)
  48. fc.mu.RLock()
  49. if time.Now().Sub(fc.lastActive) > 10*time.Second {
  50. fc.mu.RUnlock()
  51. fc.Close()
  52. break
  53. }
  54. fc.mu.RUnlock()
  55. }
  56. }()
  57. return fc
  58. }
  59. func (c *FakeUDPConn) putPacket(content []byte) {
  60. defer func() {
  61. if err := recover(); err != nil {
  62. }
  63. }()
  64. select {
  65. case c.packets <- content:
  66. default:
  67. }
  68. }
  69. func (c *FakeUDPConn) Read(b []byte) (n int, err error) {
  70. content, ok := <-c.packets
  71. if !ok {
  72. return 0, io.EOF
  73. }
  74. c.mu.Lock()
  75. c.lastActive = time.Now()
  76. c.mu.Unlock()
  77. if len(b) < len(content) {
  78. n = len(b)
  79. } else {
  80. n = len(content)
  81. }
  82. copy(b, content)
  83. return n, nil
  84. }
  85. func (c *FakeUDPConn) Write(b []byte) (n int, err error) {
  86. c.mu.RLock()
  87. if c.closeFlag {
  88. c.mu.RUnlock()
  89. return 0, io.ErrClosedPipe
  90. }
  91. c.mu.RUnlock()
  92. packet := &UDPPacket{
  93. Buf: b,
  94. LocalAddr: c.localAddr,
  95. RemoteAddr: c.remoteAddr,
  96. }
  97. c.l.writeUDPPacket(packet)
  98. c.mu.Lock()
  99. c.lastActive = time.Now()
  100. c.mu.Unlock()
  101. return len(b), nil
  102. }
  103. func (c *FakeUDPConn) Close() error {
  104. c.mu.Lock()
  105. defer c.mu.Unlock()
  106. if !c.closeFlag {
  107. c.closeFlag = true
  108. close(c.packets)
  109. }
  110. return nil
  111. }
  112. func (c *FakeUDPConn) IsClosed() bool {
  113. c.mu.RLock()
  114. defer c.mu.RUnlock()
  115. return c.closeFlag
  116. }
  117. func (c *FakeUDPConn) LocalAddr() net.Addr {
  118. return c.localAddr
  119. }
  120. func (c *FakeUDPConn) RemoteAddr() net.Addr {
  121. return c.remoteAddr
  122. }
  123. func (c *FakeUDPConn) SetDeadline(t time.Time) error {
  124. return nil
  125. }
  126. func (c *FakeUDPConn) SetReadDeadline(t time.Time) error {
  127. return nil
  128. }
  129. func (c *FakeUDPConn) SetWriteDeadline(t time.Time) error {
  130. return nil
  131. }
  132. type UDPListener struct {
  133. addr net.Addr
  134. acceptCh chan net.Conn
  135. writeCh chan *UDPPacket
  136. readConn net.Conn
  137. closeFlag bool
  138. fakeConns map[string]*FakeUDPConn
  139. }
  140. func ListenUDP(bindAddr string, bindPort int) (l *UDPListener, err error) {
  141. udpAddr, err := net.ResolveUDPAddr("udp", net.JoinHostPort(bindAddr, strconv.Itoa(bindPort)))
  142. if err != nil {
  143. return l, err
  144. }
  145. readConn, err := net.ListenUDP("udp", udpAddr)
  146. l = &UDPListener{
  147. addr: udpAddr,
  148. acceptCh: make(chan net.Conn),
  149. writeCh: make(chan *UDPPacket, 1000),
  150. fakeConns: make(map[string]*FakeUDPConn),
  151. }
  152. // for reading
  153. go func() {
  154. for {
  155. buf := pool.GetBuf(1450)
  156. n, remoteAddr, err := readConn.ReadFromUDP(buf)
  157. if err != nil {
  158. close(l.acceptCh)
  159. close(l.writeCh)
  160. return
  161. }
  162. fakeConn, exist := l.fakeConns[remoteAddr.String()]
  163. if !exist || fakeConn.IsClosed() {
  164. fakeConn = NewFakeUDPConn(l, l.Addr(), remoteAddr)
  165. l.fakeConns[remoteAddr.String()] = fakeConn
  166. }
  167. fakeConn.putPacket(buf[:n])
  168. l.acceptCh <- fakeConn
  169. }
  170. }()
  171. // for writing
  172. go func() {
  173. for {
  174. packet, ok := <-l.writeCh
  175. if !ok {
  176. return
  177. }
  178. if addr, ok := packet.RemoteAddr.(*net.UDPAddr); ok {
  179. readConn.WriteToUDP(packet.Buf, addr)
  180. }
  181. }
  182. }()
  183. return
  184. }
  185. func (l *UDPListener) writeUDPPacket(packet *UDPPacket) (err error) {
  186. defer func() {
  187. if errRet := recover(); errRet != nil {
  188. err = fmt.Errorf("udp write closed listener")
  189. }
  190. }()
  191. l.writeCh <- packet
  192. return
  193. }
  194. func (l *UDPListener) WriteMsg(buf []byte, remoteAddr *net.UDPAddr) (err error) {
  195. // only set remote addr here
  196. packet := &UDPPacket{
  197. Buf: buf,
  198. RemoteAddr: remoteAddr,
  199. }
  200. err = l.writeUDPPacket(packet)
  201. return
  202. }
  203. func (l *UDPListener) Accept() (net.Conn, error) {
  204. conn, ok := <-l.acceptCh
  205. if !ok {
  206. return conn, fmt.Errorf("channel for udp listener closed")
  207. }
  208. return conn, nil
  209. }
  210. func (l *UDPListener) Close() error {
  211. if !l.closeFlag {
  212. l.closeFlag = true
  213. if l.readConn != nil {
  214. l.readConn.Close()
  215. }
  216. }
  217. return nil
  218. }
  219. func (l *UDPListener) Addr() net.Addr {
  220. return l.addr
  221. }