proxy_manager.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. package proxy
  2. import (
  3. "context"
  4. "fmt"
  5. "net"
  6. "sync"
  7. "github.com/fatedier/frp/client/event"
  8. "github.com/fatedier/frp/pkg/config"
  9. "github.com/fatedier/frp/pkg/msg"
  10. "github.com/fatedier/frp/pkg/util/xlog"
  11. "github.com/fatedier/golib/errors"
  12. )
  13. type Manager struct {
  14. sendCh chan (msg.Message)
  15. proxies map[string]*Wrapper
  16. closed bool
  17. mu sync.RWMutex
  18. clientCfg config.ClientCommonConf
  19. // The UDP port that the server is listening on
  20. serverUDPPort int
  21. ctx context.Context
  22. }
  23. func NewManager(ctx context.Context, msgSendCh chan (msg.Message), clientCfg config.ClientCommonConf, serverUDPPort int) *Manager {
  24. return &Manager{
  25. sendCh: msgSendCh,
  26. proxies: make(map[string]*Wrapper),
  27. closed: false,
  28. clientCfg: clientCfg,
  29. serverUDPPort: serverUDPPort,
  30. ctx: ctx,
  31. }
  32. }
  33. func (pm *Manager) StartProxy(name string, remoteAddr string, serverRespErr string) error {
  34. pm.mu.RLock()
  35. pxy, ok := pm.proxies[name]
  36. pm.mu.RUnlock()
  37. if !ok {
  38. return fmt.Errorf("proxy [%s] not found", name)
  39. }
  40. err := pxy.SetRunningStatus(remoteAddr, serverRespErr)
  41. if err != nil {
  42. return err
  43. }
  44. return nil
  45. }
  46. func (pm *Manager) Close() {
  47. pm.mu.Lock()
  48. defer pm.mu.Unlock()
  49. for _, pxy := range pm.proxies {
  50. pxy.Stop()
  51. }
  52. pm.proxies = make(map[string]*Wrapper)
  53. }
  54. func (pm *Manager) HandleWorkConn(name string, workConn net.Conn, m *msg.StartWorkConn) {
  55. pm.mu.RLock()
  56. pw, ok := pm.proxies[name]
  57. pm.mu.RUnlock()
  58. if ok {
  59. pw.InWorkConn(workConn, m)
  60. } else {
  61. workConn.Close()
  62. }
  63. }
  64. func (pm *Manager) HandleEvent(evType event.Type, payload interface{}) error {
  65. var m msg.Message
  66. switch e := payload.(type) {
  67. case *event.StartProxyPayload:
  68. m = e.NewProxyMsg
  69. case *event.CloseProxyPayload:
  70. m = e.CloseProxyMsg
  71. default:
  72. return event.ErrPayloadType
  73. }
  74. err := errors.PanicToError(func() {
  75. pm.sendCh <- m
  76. })
  77. return err
  78. }
  79. func (pm *Manager) GetAllProxyStatus() []*WorkingStatus {
  80. ps := make([]*WorkingStatus, 0)
  81. pm.mu.RLock()
  82. defer pm.mu.RUnlock()
  83. for _, pxy := range pm.proxies {
  84. ps = append(ps, pxy.GetStatus())
  85. }
  86. return ps
  87. }
  88. func (pm *Manager) Reload(pxyCfgs map[string]config.ProxyConf) {
  89. xl := xlog.FromContextSafe(pm.ctx)
  90. pm.mu.Lock()
  91. defer pm.mu.Unlock()
  92. delPxyNames := make([]string, 0)
  93. for name, pxy := range pm.proxies {
  94. del := false
  95. cfg, ok := pxyCfgs[name]
  96. if !ok {
  97. del = true
  98. } else {
  99. if !pxy.Cfg.Compare(cfg) {
  100. del = true
  101. }
  102. }
  103. if del {
  104. delPxyNames = append(delPxyNames, name)
  105. delete(pm.proxies, name)
  106. pxy.Stop()
  107. }
  108. }
  109. if len(delPxyNames) > 0 {
  110. xl.Info("proxy removed: %v", delPxyNames)
  111. }
  112. addPxyNames := make([]string, 0)
  113. for name, cfg := range pxyCfgs {
  114. if _, ok := pm.proxies[name]; !ok {
  115. pxy := NewWrapper(pm.ctx, cfg, pm.clientCfg, pm.HandleEvent, pm.serverUDPPort)
  116. pm.proxies[name] = pxy
  117. addPxyNames = append(addPxyNames, name)
  118. pxy.Start()
  119. }
  120. }
  121. if len(addPxyNames) > 0 {
  122. xl.Info("proxy added: %v", addPxyNames)
  123. }
  124. }