pool.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. package proxy
  2. import (
  3. "fmt"
  4. "log"
  5. "strings"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. )
  10. // Entry represents a single proxy in the pool.
  11. type Entry struct {
  12. mu sync.Mutex
  13. ID uint
  14. Name string
  15. URL string // protocol://[user:pass@]host:port
  16. Region string
  17. failures int32
  18. disabled bool
  19. coolDown time.Time
  20. }
  21. func (e *Entry) isAvailable(now time.Time) bool {
  22. e.mu.Lock()
  23. defer e.mu.Unlock()
  24. if !e.disabled {
  25. return true
  26. }
  27. if now.After(e.coolDown) {
  28. e.disabled = false
  29. e.failures = 0
  30. return true
  31. }
  32. return false
  33. }
  34. func (e *Entry) markSuccess() {
  35. e.mu.Lock()
  36. defer e.mu.Unlock()
  37. e.failures = 0
  38. e.disabled = false
  39. }
  40. func (e *Entry) markFailure(maxFailures int32, coolDur time.Duration) {
  41. e.mu.Lock()
  42. defer e.mu.Unlock()
  43. e.failures++
  44. if maxFailures > 0 && e.failures >= maxFailures {
  45. e.disabled = true
  46. e.coolDown = time.Now().Add(coolDur)
  47. log.Printf("[proxy_pool] proxy %s (%s) disabled for %v after %d failures",
  48. e.Name, e.URL, coolDur, e.failures)
  49. }
  50. }
  51. func (e *Entry) snapshot() EntrySnapshot {
  52. e.mu.Lock()
  53. defer e.mu.Unlock()
  54. return EntrySnapshot{
  55. ID: e.ID,
  56. Name: e.Name,
  57. URL: e.URL,
  58. Region: e.Region,
  59. Failures: e.failures,
  60. Disabled: e.disabled,
  61. CoolDown: e.coolDown,
  62. }
  63. }
  64. // EntrySnapshot is a point-in-time copy of an Entry for display.
  65. type EntrySnapshot struct {
  66. ID uint `json:"id"`
  67. Name string `json:"name"`
  68. URL string `json:"url"`
  69. Region string `json:"region"`
  70. Failures int32 `json:"failures"`
  71. Disabled bool `json:"disabled"`
  72. CoolDown time.Time `json:"cool_down"`
  73. }
  74. // Pool is a thread-safe proxy pool with round-robin rotation and health tracking.
  75. type Pool struct {
  76. mu sync.RWMutex
  77. entries []*Entry
  78. index atomic.Uint64
  79. maxFailures int32 // disable proxy after this many consecutive failures
  80. coolDown time.Duration // cooldown period after disabling
  81. }
  82. // NewPool creates a new proxy pool.
  83. // maxFailures: number of consecutive failures before disabling a proxy (0 = never disable).
  84. // coolDown: how long to disable a failed proxy before retrying.
  85. func NewPool(maxFailures int, coolDown time.Duration) *Pool {
  86. return &Pool{
  87. maxFailures: int32(maxFailures),
  88. coolDown: coolDown,
  89. }
  90. }
  91. // Add adds a proxy entry to the pool.
  92. func (p *Pool) Add(id uint, name, proxyURL, region string) {
  93. p.mu.Lock()
  94. defer p.mu.Unlock()
  95. p.entries = append(p.entries, &Entry{
  96. ID: id,
  97. Name: name,
  98. URL: proxyURL,
  99. Region: region,
  100. })
  101. }
  102. // Size returns the total number of proxies in the pool.
  103. func (p *Pool) Size() int {
  104. p.mu.RLock()
  105. defer p.mu.RUnlock()
  106. return len(p.entries)
  107. }
  108. // Next returns the next available proxy URL using round-robin.
  109. // Returns empty string if no proxies are available.
  110. func (p *Pool) Next() string {
  111. entry := p.NextEntry()
  112. if entry == nil {
  113. return ""
  114. }
  115. return entry.URL
  116. }
  117. // NextEntry returns the next available proxy entry using round-robin.
  118. // Skips disabled proxies (unless cooldown has expired).
  119. func (p *Pool) NextEntry() *Entry {
  120. p.mu.RLock()
  121. n := len(p.entries)
  122. entries := p.entries
  123. p.mu.RUnlock()
  124. if n == 0 {
  125. return nil
  126. }
  127. now := time.Now()
  128. // Try up to n times to find an available proxy
  129. for i := 0; i < n; i++ {
  130. idx := p.index.Add(1) - 1
  131. entry := entries[int(idx)%n]
  132. if entry.isAvailable(now) {
  133. return entry
  134. }
  135. }
  136. // All proxies disabled — return the one with earliest cooldown expiry
  137. p.mu.RLock()
  138. defer p.mu.RUnlock()
  139. var best *Entry
  140. var bestCool time.Time
  141. for _, e := range p.entries {
  142. snap := e.snapshot()
  143. if best == nil || snap.CoolDown.Before(bestCool) {
  144. best = e
  145. bestCool = snap.CoolDown
  146. }
  147. }
  148. if best != nil {
  149. log.Printf("[proxy_pool] all proxies disabled, using %s (cooldown until %s)", best.Name, bestCool.Format("15:04:05"))
  150. }
  151. return best
  152. }
  153. // ReportSuccess marks a proxy as healthy, resetting its failure count.
  154. func (p *Pool) ReportSuccess(proxyURL string) {
  155. entry := p.findByURL(proxyURL)
  156. if entry != nil {
  157. entry.markSuccess()
  158. }
  159. }
  160. // ReportFailure records a failure for the proxy.
  161. // If consecutive failures exceed maxFailures, the proxy is temporarily disabled.
  162. func (p *Pool) ReportFailure(proxyURL string) {
  163. entry := p.findByURL(proxyURL)
  164. if entry != nil {
  165. entry.markFailure(p.maxFailures, p.coolDown)
  166. }
  167. }
  168. func (p *Pool) findByURL(proxyURL string) *Entry {
  169. p.mu.RLock()
  170. defer p.mu.RUnlock()
  171. for _, e := range p.entries {
  172. if e.URL == proxyURL {
  173. return e
  174. }
  175. }
  176. return nil
  177. }
  178. // ActiveCount returns the number of currently active (non-disabled) proxies.
  179. func (p *Pool) ActiveCount() int {
  180. p.mu.RLock()
  181. defer p.mu.RUnlock()
  182. count := 0
  183. now := time.Now()
  184. for _, e := range p.entries {
  185. if e.isAvailable(now) {
  186. count++
  187. }
  188. }
  189. return count
  190. }
  191. // AllEntries returns a snapshot of all proxy entries for status display.
  192. func (p *Pool) AllEntries() []EntrySnapshot {
  193. p.mu.RLock()
  194. defer p.mu.RUnlock()
  195. result := make([]EntrySnapshot, len(p.entries))
  196. for i, e := range p.entries {
  197. result[i] = e.snapshot()
  198. }
  199. return result
  200. }
  201. // URLs returns all proxy URLs in the pool (including disabled ones).
  202. func (p *Pool) URLs() []string {
  203. p.mu.RLock()
  204. defer p.mu.RUnlock()
  205. urls := make([]string, 0, len(p.entries))
  206. for _, e := range p.entries {
  207. urls = append(urls, e.URL)
  208. }
  209. return urls
  210. }
  211. // Summary returns a human-readable summary of the pool state for logging.
  212. func (p *Pool) Summary() string {
  213. p.mu.RLock()
  214. defer p.mu.RUnlock()
  215. now := time.Now()
  216. total := len(p.entries)
  217. active := 0
  218. var parts []string
  219. for _, e := range p.entries {
  220. snap := e.snapshot()
  221. status := "ok"
  222. if snap.Disabled && now.Before(snap.CoolDown) {
  223. status = "disabled"
  224. } else {
  225. active++
  226. }
  227. if snap.Failures > 0 {
  228. parts = append(parts, fmt.Sprintf("%s(%s,failures=%d)", snap.Name, status, snap.Failures))
  229. }
  230. }
  231. summary := fmt.Sprintf("代理池: %d/%d 活跃", active, total)
  232. if len(parts) > 0 {
  233. summary += ", " + strings.Join(parts, ", ")
  234. }
  235. return summary
  236. }