nathole.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. package nathole
  2. import (
  3. "bytes"
  4. "fmt"
  5. "net"
  6. "sync"
  7. "time"
  8. "github.com/fatedier/frp/pkg/msg"
  9. "github.com/fatedier/frp/pkg/util/log"
  10. "github.com/fatedier/frp/pkg/util/util"
  11. "github.com/fatedier/golib/errors"
  12. "github.com/fatedier/golib/pool"
  13. )
  14. // Timeout seconds.
  15. var NatHoleTimeout int64 = 10
  16. type SidRequest struct {
  17. Sid string
  18. NotifyCh chan struct{}
  19. }
  20. type Controller struct {
  21. listener *net.UDPConn
  22. clientCfgs map[string]*ClientCfg
  23. sessions map[string]*Session
  24. mu sync.RWMutex
  25. }
  26. func NewController(udpBindAddr string) (nc *Controller, err error) {
  27. addr, err := net.ResolveUDPAddr("udp", udpBindAddr)
  28. if err != nil {
  29. return nil, err
  30. }
  31. lconn, err := net.ListenUDP("udp", addr)
  32. if err != nil {
  33. return nil, err
  34. }
  35. nc = &Controller{
  36. listener: lconn,
  37. clientCfgs: make(map[string]*ClientCfg),
  38. sessions: make(map[string]*Session),
  39. }
  40. return nc, nil
  41. }
  42. func (nc *Controller) ListenClient(name string, sk string) (sidCh chan *SidRequest) {
  43. clientCfg := &ClientCfg{
  44. Name: name,
  45. Sk: sk,
  46. SidCh: make(chan *SidRequest),
  47. }
  48. nc.mu.Lock()
  49. nc.clientCfgs[name] = clientCfg
  50. nc.mu.Unlock()
  51. return clientCfg.SidCh
  52. }
  53. func (nc *Controller) CloseClient(name string) {
  54. nc.mu.Lock()
  55. defer nc.mu.Unlock()
  56. delete(nc.clientCfgs, name)
  57. }
  58. func (nc *Controller) Run() {
  59. for {
  60. buf := pool.GetBuf(1024)
  61. n, raddr, err := nc.listener.ReadFromUDP(buf)
  62. if err != nil {
  63. log.Trace("nat hole listener read from udp error: %v", err)
  64. return
  65. }
  66. rd := bytes.NewReader(buf[:n])
  67. rawMsg, err := msg.ReadMsg(rd)
  68. if err != nil {
  69. log.Trace("read nat hole message error: %v", err)
  70. continue
  71. }
  72. switch m := rawMsg.(type) {
  73. case *msg.NatHoleVisitor:
  74. go nc.HandleVisitor(m, raddr)
  75. case *msg.NatHoleClient:
  76. go nc.HandleClient(m, raddr)
  77. default:
  78. log.Trace("error nat hole message type")
  79. continue
  80. }
  81. pool.PutBuf(buf)
  82. }
  83. }
  84. func (nc *Controller) GenSid() string {
  85. t := time.Now().Unix()
  86. id, _ := util.RandID()
  87. return fmt.Sprintf("%d%s", t, id)
  88. }
  89. func (nc *Controller) HandleVisitor(m *msg.NatHoleVisitor, raddr *net.UDPAddr) {
  90. sid := nc.GenSid()
  91. session := &Session{
  92. Sid: sid,
  93. VisitorAddr: raddr,
  94. NotifyCh: make(chan struct{}, 0),
  95. }
  96. nc.mu.Lock()
  97. clientCfg, ok := nc.clientCfgs[m.ProxyName]
  98. if !ok {
  99. nc.mu.Unlock()
  100. errInfo := fmt.Sprintf("xtcp server for [%s] doesn't exist", m.ProxyName)
  101. log.Debug(errInfo)
  102. nc.listener.WriteToUDP(nc.GenNatHoleResponse(nil, errInfo), raddr)
  103. return
  104. }
  105. if m.SignKey != util.GetAuthKey(clientCfg.Sk, m.Timestamp) {
  106. nc.mu.Unlock()
  107. errInfo := fmt.Sprintf("xtcp connection of [%s] auth failed", m.ProxyName)
  108. log.Debug(errInfo)
  109. nc.listener.WriteToUDP(nc.GenNatHoleResponse(nil, errInfo), raddr)
  110. return
  111. }
  112. nc.sessions[sid] = session
  113. nc.mu.Unlock()
  114. log.Trace("handle visitor message, sid [%s]", sid)
  115. defer func() {
  116. nc.mu.Lock()
  117. delete(nc.sessions, sid)
  118. nc.mu.Unlock()
  119. }()
  120. err := errors.PanicToError(func() {
  121. clientCfg.SidCh <- &SidRequest{
  122. Sid: sid,
  123. NotifyCh: session.NotifyCh,
  124. }
  125. })
  126. if err != nil {
  127. return
  128. }
  129. // Wait client connections.
  130. select {
  131. case <-session.NotifyCh:
  132. resp := nc.GenNatHoleResponse(session, "")
  133. log.Trace("send nat hole response to visitor")
  134. nc.listener.WriteToUDP(resp, raddr)
  135. case <-time.After(time.Duration(NatHoleTimeout) * time.Second):
  136. return
  137. }
  138. }
  139. func (nc *Controller) HandleClient(m *msg.NatHoleClient, raddr *net.UDPAddr) {
  140. nc.mu.RLock()
  141. session, ok := nc.sessions[m.Sid]
  142. nc.mu.RUnlock()
  143. if !ok {
  144. return
  145. }
  146. log.Trace("handle client message, sid [%s]", session.Sid)
  147. session.ClientAddr = raddr
  148. resp := nc.GenNatHoleResponse(session, "")
  149. log.Trace("send nat hole response to client")
  150. nc.listener.WriteToUDP(resp, raddr)
  151. }
  152. func (nc *Controller) GenNatHoleResponse(session *Session, errInfo string) []byte {
  153. var (
  154. sid string
  155. visitorAddr string
  156. clientAddr string
  157. )
  158. if session != nil {
  159. sid = session.Sid
  160. visitorAddr = session.VisitorAddr.String()
  161. clientAddr = session.ClientAddr.String()
  162. }
  163. m := &msg.NatHoleResp{
  164. Sid: sid,
  165. VisitorAddr: visitorAddr,
  166. ClientAddr: clientAddr,
  167. Error: errInfo,
  168. }
  169. b := bytes.NewBuffer(nil)
  170. err := msg.WriteMsg(b, m)
  171. if err != nil {
  172. return []byte("")
  173. }
  174. return b.Bytes()
  175. }
  176. type Session struct {
  177. Sid string
  178. VisitorAddr *net.UDPAddr
  179. ClientAddr *net.UDPAddr
  180. NotifyCh chan struct{}
  181. }
  182. type ClientCfg struct {
  183. Name string
  184. Sk string
  185. SidCh chan *SidRequest
  186. }