| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260 |
- package proxy
- import (
- "fmt"
- "log"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- )
- // Entry represents a single proxy in the pool.
- type Entry struct {
- mu sync.Mutex
- ID uint
- Name string
- URL string // protocol://[user:pass@]host:port
- Region string
- failures int32
- disabled bool
- coolDown time.Time
- }
- func (e *Entry) isAvailable(now time.Time) bool {
- e.mu.Lock()
- defer e.mu.Unlock()
- if !e.disabled {
- return true
- }
- if now.After(e.coolDown) {
- e.disabled = false
- e.failures = 0
- return true
- }
- return false
- }
- func (e *Entry) markSuccess() {
- e.mu.Lock()
- defer e.mu.Unlock()
- e.failures = 0
- e.disabled = false
- }
- func (e *Entry) markFailure(maxFailures int32, coolDur time.Duration) {
- e.mu.Lock()
- defer e.mu.Unlock()
- e.failures++
- if maxFailures > 0 && e.failures >= maxFailures {
- e.disabled = true
- e.coolDown = time.Now().Add(coolDur)
- log.Printf("[proxy_pool] proxy %s (%s) disabled for %v after %d failures",
- e.Name, e.URL, coolDur, e.failures)
- }
- }
- func (e *Entry) snapshot() EntrySnapshot {
- e.mu.Lock()
- defer e.mu.Unlock()
- return EntrySnapshot{
- ID: e.ID,
- Name: e.Name,
- URL: e.URL,
- Region: e.Region,
- Failures: e.failures,
- Disabled: e.disabled,
- CoolDown: e.coolDown,
- }
- }
- // EntrySnapshot is a point-in-time copy of an Entry for display.
- type EntrySnapshot struct {
- ID uint `json:"id"`
- Name string `json:"name"`
- URL string `json:"url"`
- Region string `json:"region"`
- Failures int32 `json:"failures"`
- Disabled bool `json:"disabled"`
- CoolDown time.Time `json:"cool_down"`
- }
- // Pool is a thread-safe proxy pool with round-robin rotation and health tracking.
- type Pool struct {
- mu sync.RWMutex
- entries []*Entry
- index atomic.Uint64
- maxFailures int32 // disable proxy after this many consecutive failures
- coolDown time.Duration // cooldown period after disabling
- }
- // NewPool creates a new proxy pool.
- // maxFailures: number of consecutive failures before disabling a proxy (0 = never disable).
- // coolDown: how long to disable a failed proxy before retrying.
- func NewPool(maxFailures int, coolDown time.Duration) *Pool {
- return &Pool{
- maxFailures: int32(maxFailures),
- coolDown: coolDown,
- }
- }
- // Add adds a proxy entry to the pool.
- func (p *Pool) Add(id uint, name, proxyURL, region string) {
- p.mu.Lock()
- defer p.mu.Unlock()
- p.entries = append(p.entries, &Entry{
- ID: id,
- Name: name,
- URL: proxyURL,
- Region: region,
- })
- }
- // Size returns the total number of proxies in the pool.
- func (p *Pool) Size() int {
- p.mu.RLock()
- defer p.mu.RUnlock()
- return len(p.entries)
- }
- // Next returns the next available proxy URL using round-robin.
- // Returns empty string if no proxies are available.
- func (p *Pool) Next() string {
- entry := p.NextEntry()
- if entry == nil {
- return ""
- }
- return entry.URL
- }
- // NextEntry returns the next available proxy entry using round-robin.
- // Skips disabled proxies (unless cooldown has expired).
- func (p *Pool) NextEntry() *Entry {
- p.mu.RLock()
- n := len(p.entries)
- entries := p.entries
- p.mu.RUnlock()
- if n == 0 {
- return nil
- }
- now := time.Now()
- // Try up to n times to find an available proxy
- for i := 0; i < n; i++ {
- idx := p.index.Add(1) - 1
- entry := entries[int(idx)%n]
- if entry.isAvailable(now) {
- return entry
- }
- }
- // All proxies disabled — return the one with earliest cooldown expiry
- p.mu.RLock()
- defer p.mu.RUnlock()
- var best *Entry
- var bestCool time.Time
- for _, e := range p.entries {
- snap := e.snapshot()
- if best == nil || snap.CoolDown.Before(bestCool) {
- best = e
- bestCool = snap.CoolDown
- }
- }
- if best != nil {
- log.Printf("[proxy_pool] all proxies disabled, using %s (cooldown until %s)", best.Name, bestCool.Format("15:04:05"))
- }
- return best
- }
- // ReportSuccess marks a proxy as healthy, resetting its failure count.
- func (p *Pool) ReportSuccess(proxyURL string) {
- entry := p.findByURL(proxyURL)
- if entry != nil {
- entry.markSuccess()
- }
- }
- // ReportFailure records a failure for the proxy.
- // If consecutive failures exceed maxFailures, the proxy is temporarily disabled.
- func (p *Pool) ReportFailure(proxyURL string) {
- entry := p.findByURL(proxyURL)
- if entry != nil {
- entry.markFailure(p.maxFailures, p.coolDown)
- }
- }
- func (p *Pool) findByURL(proxyURL string) *Entry {
- p.mu.RLock()
- defer p.mu.RUnlock()
- for _, e := range p.entries {
- if e.URL == proxyURL {
- return e
- }
- }
- return nil
- }
- // ActiveCount returns the number of currently active (non-disabled) proxies.
- func (p *Pool) ActiveCount() int {
- p.mu.RLock()
- defer p.mu.RUnlock()
- count := 0
- now := time.Now()
- for _, e := range p.entries {
- if e.isAvailable(now) {
- count++
- }
- }
- return count
- }
- // AllEntries returns a snapshot of all proxy entries for status display.
- func (p *Pool) AllEntries() []EntrySnapshot {
- p.mu.RLock()
- defer p.mu.RUnlock()
- result := make([]EntrySnapshot, len(p.entries))
- for i, e := range p.entries {
- result[i] = e.snapshot()
- }
- return result
- }
- // URLs returns all proxy URLs in the pool (including disabled ones).
- func (p *Pool) URLs() []string {
- p.mu.RLock()
- defer p.mu.RUnlock()
- urls := make([]string, 0, len(p.entries))
- for _, e := range p.entries {
- urls = append(urls, e.URL)
- }
- return urls
- }
- // Summary returns a human-readable summary of the pool state for logging.
- func (p *Pool) Summary() string {
- p.mu.RLock()
- defer p.mu.RUnlock()
- now := time.Now()
- total := len(p.entries)
- active := 0
- var parts []string
- for _, e := range p.entries {
- snap := e.snapshot()
- status := "ok"
- if snap.Disabled && now.Before(snap.CoolDown) {
- status = "disabled"
- } else {
- active++
- }
- if snap.Failures > 0 {
- parts = append(parts, fmt.Sprintf("%s(%s,failures=%d)", snap.Name, status, snap.Failures))
- }
- }
- summary := fmt.Sprintf("代理池: %d/%d 活跃", active, total)
- if len(parts) > 0 {
- summary += ", " + strings.Join(parts, ", ")
- }
- return summary
- }
|