123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250 |
- package proxy
- import (
- "context"
- "fmt"
- "net"
- "sync"
- "sync/atomic"
- "time"
- "github.com/fatedier/frp/client/event"
- "github.com/fatedier/frp/client/health"
- "github.com/fatedier/frp/pkg/config"
- "github.com/fatedier/frp/pkg/msg"
- "github.com/fatedier/frp/pkg/util/xlog"
- "github.com/fatedier/golib/errors"
- )
- const (
- ProxyPhaseNew = "new"
- ProxyPhaseWaitStart = "wait start"
- ProxyPhaseStartErr = "start error"
- ProxyPhaseRunning = "running"
- ProxyPhaseCheckFailed = "check failed"
- ProxyPhaseClosed = "closed"
- )
- var (
- statusCheckInterval time.Duration = 3 * time.Second
- waitResponseTimeout = 20 * time.Second
- startErrTimeout = 30 * time.Second
- )
- type WorkingStatus struct {
- Name string `json:"name"`
- Type string `json:"type"`
- Phase string `json:"status"`
- Err string `json:"err"`
- Cfg config.ProxyConf `json:"cfg"`
- // Got from server.
- RemoteAddr string `json:"remote_addr"`
- }
- type Wrapper struct {
- WorkingStatus
- // underlying proxy
- pxy Proxy
- // if ProxyConf has healcheck config
- // monitor will watch if it is alive
- monitor *health.Monitor
- // event handler
- handler event.Handler
- health uint32
- lastSendStartMsg time.Time
- lastStartErr time.Time
- closeCh chan struct{}
- healthNotifyCh chan struct{}
- mu sync.RWMutex
- xl *xlog.Logger
- ctx context.Context
- }
- func NewWrapper(ctx context.Context, cfg config.ProxyConf, clientCfg config.ClientCommonConf, eventHandler event.Handler, serverUDPPort int) *Wrapper {
- baseInfo := cfg.GetBaseInfo()
- xl := xlog.FromContextSafe(ctx).Spawn().AppendPrefix(baseInfo.ProxyName)
- pw := &Wrapper{
- WorkingStatus: WorkingStatus{
- Name: baseInfo.ProxyName,
- Type: baseInfo.ProxyType,
- Phase: ProxyPhaseNew,
- Cfg: cfg,
- },
- closeCh: make(chan struct{}),
- healthNotifyCh: make(chan struct{}),
- handler: eventHandler,
- xl: xl,
- ctx: xlog.NewContext(ctx, xl),
- }
- if baseInfo.HealthCheckType != "" {
- pw.health = 1 // means failed
- pw.monitor = health.NewMonitor(pw.ctx, baseInfo.HealthCheckType, baseInfo.HealthCheckIntervalS,
- baseInfo.HealthCheckTimeoutS, baseInfo.HealthCheckMaxFailed, baseInfo.HealthCheckAddr,
- baseInfo.HealthCheckURL, pw.statusNormalCallback, pw.statusFailedCallback)
- xl.Trace("enable health check monitor")
- }
- pw.pxy = NewProxy(pw.ctx, pw.Cfg, clientCfg, serverUDPPort)
- return pw
- }
- func (pw *Wrapper) SetRunningStatus(remoteAddr string, respErr string) error {
- pw.mu.Lock()
- defer pw.mu.Unlock()
- if pw.Phase != ProxyPhaseWaitStart {
- return fmt.Errorf("status not wait start, ignore start message")
- }
- pw.RemoteAddr = remoteAddr
- if respErr != "" {
- pw.Phase = ProxyPhaseStartErr
- pw.Err = respErr
- pw.lastStartErr = time.Now()
- return fmt.Errorf(pw.Err)
- }
- if err := pw.pxy.Run(); err != nil {
- pw.close()
- pw.Phase = ProxyPhaseStartErr
- pw.Err = err.Error()
- pw.lastStartErr = time.Now()
- return err
- }
- pw.Phase = ProxyPhaseRunning
- pw.Err = ""
- return nil
- }
- func (pw *Wrapper) Start() {
- go pw.checkWorker()
- if pw.monitor != nil {
- go pw.monitor.Start()
- }
- }
- func (pw *Wrapper) Stop() {
- pw.mu.Lock()
- defer pw.mu.Unlock()
- close(pw.closeCh)
- close(pw.healthNotifyCh)
- pw.pxy.Close()
- if pw.monitor != nil {
- pw.monitor.Stop()
- }
- pw.Phase = ProxyPhaseClosed
- pw.close()
- }
- func (pw *Wrapper) close() {
- pw.handler(event.EvCloseProxy, &event.CloseProxyPayload{
- CloseProxyMsg: &msg.CloseProxy{
- ProxyName: pw.Name,
- },
- })
- }
- func (pw *Wrapper) checkWorker() {
- xl := pw.xl
- if pw.monitor != nil {
- // let monitor do check request first
- time.Sleep(500 * time.Millisecond)
- }
- for {
- // check proxy status
- now := time.Now()
- if atomic.LoadUint32(&pw.health) == 0 {
- pw.mu.Lock()
- if pw.Phase == ProxyPhaseNew ||
- pw.Phase == ProxyPhaseCheckFailed ||
- (pw.Phase == ProxyPhaseWaitStart && now.After(pw.lastSendStartMsg.Add(waitResponseTimeout))) ||
- (pw.Phase == ProxyPhaseStartErr && now.After(pw.lastStartErr.Add(startErrTimeout))) {
- xl.Trace("change status from [%s] to [%s]", pw.Phase, ProxyPhaseWaitStart)
- pw.Phase = ProxyPhaseWaitStart
- var newProxyMsg msg.NewProxy
- pw.Cfg.MarshalToMsg(&newProxyMsg)
- pw.lastSendStartMsg = now
- pw.handler(event.EvStartProxy, &event.StartProxyPayload{
- NewProxyMsg: &newProxyMsg,
- })
- }
- pw.mu.Unlock()
- } else {
- pw.mu.Lock()
- if pw.Phase == ProxyPhaseRunning || pw.Phase == ProxyPhaseWaitStart {
- pw.close()
- xl.Trace("change status from [%s] to [%s]", pw.Phase, ProxyPhaseCheckFailed)
- pw.Phase = ProxyPhaseCheckFailed
- }
- pw.mu.Unlock()
- }
- select {
- case <-pw.closeCh:
- return
- case <-time.After(statusCheckInterval):
- case <-pw.healthNotifyCh:
- }
- }
- }
- func (pw *Wrapper) statusNormalCallback() {
- xl := pw.xl
- atomic.StoreUint32(&pw.health, 0)
- errors.PanicToError(func() {
- select {
- case pw.healthNotifyCh <- struct{}{}:
- default:
- }
- })
- xl.Info("health check success")
- }
- func (pw *Wrapper) statusFailedCallback() {
- xl := pw.xl
- atomic.StoreUint32(&pw.health, 1)
- errors.PanicToError(func() {
- select {
- case pw.healthNotifyCh <- struct{}{}:
- default:
- }
- })
- xl.Info("health check failed")
- }
- func (pw *Wrapper) InWorkConn(workConn net.Conn, m *msg.StartWorkConn) {
- xl := pw.xl
- pw.mu.RLock()
- pxy := pw.pxy
- pw.mu.RUnlock()
- if pxy != nil && pw.Phase == ProxyPhaseRunning {
- xl.Debug("start a new work connection, localAddr: %s remoteAddr: %s", workConn.LocalAddr().String(), workConn.RemoteAddr().String())
- go pxy.InWorkConn(workConn, m)
- } else {
- workConn.Close()
- }
- }
- func (pw *Wrapper) GetStatus() *WorkingStatus {
- pw.mu.RLock()
- defer pw.mu.RUnlock()
- ps := &WorkingStatus{
- Name: pw.Name,
- Type: pw.Type,
- Phase: pw.Phase,
- Err: pw.Err,
- Cfg: pw.Cfg,
- RemoteAddr: pw.RemoteAddr,
- }
- return ps
- }
|