group.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. package features
  2. import (
  3. "fmt"
  4. "strconv"
  5. "sync"
  6. "time"
  7. "github.com/fatedier/frp/test/e2e/framework"
  8. "github.com/fatedier/frp/test/e2e/framework/consts"
  9. "github.com/fatedier/frp/test/e2e/mock/server/httpserver"
  10. "github.com/fatedier/frp/test/e2e/mock/server/streamserver"
  11. "github.com/fatedier/frp/test/e2e/pkg/request"
  12. . "github.com/onsi/ginkgo"
  13. )
  14. var _ = Describe("[Feature: Group]", func() {
  15. f := framework.NewDefaultFramework()
  16. newHTTPServer := func(port int, respContent string) *httpserver.Server {
  17. return httpserver.New(
  18. httpserver.WithBindPort(port),
  19. httpserver.WithHandler(framework.SpecifiedHTTPBodyHandler([]byte(respContent))),
  20. )
  21. }
  22. validateFooBarResponse := func(resp *request.Response) bool {
  23. if string(resp.Content) == "foo" || string(resp.Content) == "bar" {
  24. return true
  25. }
  26. return false
  27. }
  28. doFooBarHTTPRequest := func(vhostPort int, host string) []string {
  29. results := []string{}
  30. var wait sync.WaitGroup
  31. var mu sync.Mutex
  32. expectFn := func() {
  33. framework.NewRequestExpect(f).Port(vhostPort).
  34. RequestModify(func(r *request.Request) {
  35. r.HTTP().HTTPHost(host)
  36. }).
  37. Ensure(validateFooBarResponse, func(resp *request.Response) bool {
  38. mu.Lock()
  39. defer mu.Unlock()
  40. results = append(results, string(resp.Content))
  41. return true
  42. })
  43. }
  44. for i := 0; i < 10; i++ {
  45. wait.Add(1)
  46. go func() {
  47. defer wait.Done()
  48. expectFn()
  49. }()
  50. }
  51. wait.Wait()
  52. return results
  53. }
  54. Describe("Load Balancing", func() {
  55. It("TCP", func() {
  56. serverConf := consts.DefaultServerConfig
  57. clientConf := consts.DefaultClientConfig
  58. fooPort := f.AllocPort()
  59. fooServer := streamserver.New(streamserver.TCP, streamserver.WithBindPort(fooPort), streamserver.WithRespContent([]byte("foo")))
  60. f.RunServer("", fooServer)
  61. barPort := f.AllocPort()
  62. barServer := streamserver.New(streamserver.TCP, streamserver.WithBindPort(barPort), streamserver.WithRespContent([]byte("bar")))
  63. f.RunServer("", barServer)
  64. remotePort := f.AllocPort()
  65. clientConf += fmt.Sprintf(`
  66. [foo]
  67. type = tcp
  68. local_port = %d
  69. remote_port = %d
  70. group = test
  71. group_key = 123
  72. [bar]
  73. type = tcp
  74. local_port = %d
  75. remote_port = %d
  76. group = test
  77. group_key = 123
  78. `, fooPort, remotePort, barPort, remotePort)
  79. f.RunProcesses([]string{serverConf}, []string{clientConf})
  80. fooCount := 0
  81. barCount := 0
  82. for i := 0; i < 10; i++ {
  83. framework.NewRequestExpect(f).Explain("times " + strconv.Itoa(i)).Port(remotePort).Ensure(func(resp *request.Response) bool {
  84. switch string(resp.Content) {
  85. case "foo":
  86. fooCount++
  87. case "bar":
  88. barCount++
  89. default:
  90. return false
  91. }
  92. return true
  93. })
  94. }
  95. framework.ExpectTrue(fooCount > 1 && barCount > 1, "fooCount: %d, barCount: %d", fooCount, barCount)
  96. })
  97. })
  98. Describe("Health Check", func() {
  99. It("TCP", func() {
  100. serverConf := consts.DefaultServerConfig
  101. clientConf := consts.DefaultClientConfig
  102. fooPort := f.AllocPort()
  103. fooServer := streamserver.New(streamserver.TCP, streamserver.WithBindPort(fooPort), streamserver.WithRespContent([]byte("foo")))
  104. f.RunServer("", fooServer)
  105. barPort := f.AllocPort()
  106. barServer := streamserver.New(streamserver.TCP, streamserver.WithBindPort(barPort), streamserver.WithRespContent([]byte("bar")))
  107. f.RunServer("", barServer)
  108. remotePort := f.AllocPort()
  109. clientConf += fmt.Sprintf(`
  110. [foo]
  111. type = tcp
  112. local_port = %d
  113. remote_port = %d
  114. group = test
  115. group_key = 123
  116. health_check_type = tcp
  117. health_check_interval_s = 1
  118. [bar]
  119. type = tcp
  120. local_port = %d
  121. remote_port = %d
  122. group = test
  123. group_key = 123
  124. health_check_type = tcp
  125. health_check_interval_s = 1
  126. `, fooPort, remotePort, barPort, remotePort)
  127. f.RunProcesses([]string{serverConf}, []string{clientConf})
  128. // check foo and bar is ok
  129. results := []string{}
  130. for i := 0; i < 10; i++ {
  131. framework.NewRequestExpect(f).Port(remotePort).Ensure(validateFooBarResponse, func(resp *request.Response) bool {
  132. results = append(results, string(resp.Content))
  133. return true
  134. })
  135. }
  136. framework.ExpectContainElements(results, []string{"foo", "bar"})
  137. // close bar server, check foo is ok
  138. barServer.Close()
  139. time.Sleep(2 * time.Second)
  140. for i := 0; i < 10; i++ {
  141. framework.NewRequestExpect(f).Port(remotePort).ExpectResp([]byte("foo")).Ensure()
  142. }
  143. // resume bar server, check foo and bar is ok
  144. f.RunServer("", barServer)
  145. time.Sleep(2 * time.Second)
  146. results = []string{}
  147. for i := 0; i < 10; i++ {
  148. framework.NewRequestExpect(f).Port(remotePort).Ensure(validateFooBarResponse, func(resp *request.Response) bool {
  149. results = append(results, string(resp.Content))
  150. return true
  151. })
  152. }
  153. framework.ExpectContainElements(results, []string{"foo", "bar"})
  154. })
  155. It("HTTP", func() {
  156. vhostPort := f.AllocPort()
  157. serverConf := consts.DefaultServerConfig + fmt.Sprintf(`
  158. vhost_http_port = %d
  159. `, vhostPort)
  160. clientConf := consts.DefaultClientConfig
  161. fooPort := f.AllocPort()
  162. fooServer := newHTTPServer(fooPort, "foo")
  163. f.RunServer("", fooServer)
  164. barPort := f.AllocPort()
  165. barServer := newHTTPServer(barPort, "bar")
  166. f.RunServer("", barServer)
  167. clientConf += fmt.Sprintf(`
  168. [foo]
  169. type = http
  170. local_port = %d
  171. custom_domains = example.com
  172. group = test
  173. group_key = 123
  174. health_check_type = http
  175. health_check_interval_s = 1
  176. health_check_url = /healthz
  177. [bar]
  178. type = http
  179. local_port = %d
  180. custom_domains = example.com
  181. group = test
  182. group_key = 123
  183. health_check_type = http
  184. health_check_interval_s = 1
  185. health_check_url = /healthz
  186. `, fooPort, barPort)
  187. f.RunProcesses([]string{serverConf}, []string{clientConf})
  188. // check foo and bar is ok
  189. results := doFooBarHTTPRequest(vhostPort, "example.com")
  190. framework.ExpectContainElements(results, []string{"foo", "bar"})
  191. // close bar server, check foo is ok
  192. barServer.Close()
  193. time.Sleep(2 * time.Second)
  194. results = doFooBarHTTPRequest(vhostPort, "example.com")
  195. framework.ExpectContainElements(results, []string{"foo"})
  196. framework.ExpectNotContainElements(results, []string{"bar"})
  197. // resume bar server, check foo and bar is ok
  198. f.RunServer("", barServer)
  199. time.Sleep(2 * time.Second)
  200. results = doFooBarHTTPRequest(vhostPort, "example.com")
  201. framework.ExpectContainElements(results, []string{"foo", "bar"})
  202. })
  203. })
  204. })