server.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. package streamserver
  2. import (
  3. "bufio"
  4. "fmt"
  5. "io"
  6. "net"
  7. "strconv"
  8. libnet "github.com/fatedier/frp/pkg/util/net"
  9. "github.com/fatedier/frp/test/e2e/pkg/rpc"
  10. )
  11. type Type string
  12. const (
  13. TCP Type = "tcp"
  14. UDP Type = "udp"
  15. Unix Type = "unix"
  16. )
  17. type Server struct {
  18. netType Type
  19. bindAddr string
  20. bindPort int
  21. respContent []byte
  22. handler func(net.Conn)
  23. l net.Listener
  24. }
  25. type Option func(*Server) *Server
  26. func New(netType Type, options ...Option) *Server {
  27. s := &Server{
  28. netType: netType,
  29. bindAddr: "127.0.0.1",
  30. }
  31. s.handler = s.handle
  32. for _, option := range options {
  33. s = option(s)
  34. }
  35. return s
  36. }
  37. func WithBindAddr(addr string) Option {
  38. return func(s *Server) *Server {
  39. s.bindAddr = addr
  40. return s
  41. }
  42. }
  43. func WithBindPort(port int) Option {
  44. return func(s *Server) *Server {
  45. s.bindPort = port
  46. return s
  47. }
  48. }
  49. func WithRespContent(content []byte) Option {
  50. return func(s *Server) *Server {
  51. s.respContent = content
  52. return s
  53. }
  54. }
  55. func WithCustomHandler(handler func(net.Conn)) Option {
  56. return func(s *Server) *Server {
  57. s.handler = handler
  58. return s
  59. }
  60. }
  61. func (s *Server) Run() error {
  62. if err := s.initListener(); err != nil {
  63. return err
  64. }
  65. go func() {
  66. for {
  67. c, err := s.l.Accept()
  68. if err != nil {
  69. return
  70. }
  71. go s.handler(c)
  72. }
  73. }()
  74. return nil
  75. }
  76. func (s *Server) Close() error {
  77. if s.l != nil {
  78. return s.l.Close()
  79. }
  80. return nil
  81. }
  82. func (s *Server) initListener() (err error) {
  83. switch s.netType {
  84. case TCP:
  85. s.l, err = net.Listen("tcp", net.JoinHostPort(s.bindAddr, strconv.Itoa(s.bindPort)))
  86. case UDP:
  87. s.l, err = libnet.ListenUDP(s.bindAddr, s.bindPort)
  88. case Unix:
  89. s.l, err = net.Listen("unix", s.bindAddr)
  90. default:
  91. return fmt.Errorf("unknown server type: %s", s.netType)
  92. }
  93. return err
  94. }
  95. func (s *Server) handle(c net.Conn) {
  96. defer c.Close()
  97. var reader io.Reader = c
  98. if s.netType == UDP {
  99. reader = bufio.NewReader(c)
  100. }
  101. for {
  102. buf, err := rpc.ReadBytes(reader)
  103. if err != nil {
  104. return
  105. }
  106. if len(s.respContent) > 0 {
  107. buf = s.respContent
  108. }
  109. rpc.WriteBytes(c, buf)
  110. }
  111. }
  112. func (s *Server) BindAddr() string {
  113. return s.bindAddr
  114. }
  115. func (s *Server) BindPort() int {
  116. return s.bindPort
  117. }