package telegram import ( "context" "errors" "fmt" "sync" "time" "github.com/redis/go-redis/v9" ) // ErrAllCooling 所有账号都在冷却 var ErrAllCooling = errors.New("all TG accounts are cooling down") // ErrNoAccounts 没有配置账号 var ErrNoAccounts = errors.New("no TG accounts configured") // ManagedAccount 带状态的账号 type ManagedAccount struct { Account Account Client *Client CoolUntil time.Time // 冷却截止时间 InUse bool } // AccountManager 管理多个 TG 账号 type AccountManager struct { accounts []*ManagedAccount mu sync.Mutex redis *redis.Client proxyURL string // optional proxy for TG connections } // NewAccountManager 创建 AccountManager func NewAccountManager(accounts []Account, rdb *redis.Client) *AccountManager { m := &AccountManager{ redis: rdb, } m.Init(accounts) return m } // Init 初始化所有账号的客户端(不连接),从 Redis 恢复冷却状态 func (m *AccountManager) Init(accounts []Account) { m.mu.Lock() defer m.mu.Unlock() m.accounts = make([]*ManagedAccount, 0, len(accounts)) for _, acc := range accounts { m.accounts = append(m.accounts, &ManagedAccount{ Account: acc, Client: New(acc), }) } m.loadCooldowns() } // Acquire 获取一个可用账号(非冷却中、非使用中) // 如果所有账号都在冷却,返回等待时间最短的账号的剩余冷却时间作为错误 func (m *AccountManager) Acquire(ctx context.Context) (*ManagedAccount, error) { m.mu.Lock() defer m.mu.Unlock() if len(m.accounts) == 0 { return nil, ErrNoAccounts } now := time.Now() var soonestCool *ManagedAccount for _, acc := range m.accounts { if acc.InUse { continue } if now.Before(acc.CoolUntil) { // Still cooling; track shortest cooldown if soonestCool == nil || acc.CoolUntil.Before(soonestCool.CoolUntil) { soonestCool = acc } continue } // Available — apply current proxy to the client acc.InUse = true if m.proxyURL != "" { acc.Client.SetProxy(m.proxyURL) } return acc, nil } if soonestCool != nil { remaining := time.Until(soonestCool.CoolUntil) return nil, fmt.Errorf("%w: shortest cooldown %s", ErrAllCooling, remaining.Round(time.Second)) } // All accounts are InUse return nil, fmt.Errorf("%w", ErrAllCooling) } // Release 归还账号 // floodWait > 0 时标记冷却,写入 Redis 持久化 // Redis key: spider:tg:floodwait:{phone} value: 冷却截止 Unix 时间戳 TTL: 冷却时长 func (m *AccountManager) Release(acc *ManagedAccount, floodWait time.Duration) { m.mu.Lock() defer m.mu.Unlock() acc.InUse = false if floodWait > 0 { coolUntil := time.Now().Add(floodWait) acc.CoolUntil = coolUntil m.saveCooldown(acc.Account.Phone, coolUntil) } } // HandleFloodWait FloodWait 处理策略: // ≤60s → 标记当前账号冷却,等待后重试(返回 nil error 表示可重试) // >60s → 标记当前账号冷却,切换其他账号(返回 nil,调用方重新 Acquire) // >300s → 标记所有账号最少冷却 300s,返回 ErrAllCooling func (m *AccountManager) HandleFloodWait(acc *ManagedAccount, waitSeconds int) error { wait := time.Duration(waitSeconds) * time.Second if waitSeconds > 300 { // Mark all accounts with at least 300s cooling m.mu.Lock() minCool := time.Now().Add(300 * time.Second) for _, a := range m.accounts { a.InUse = false if a.CoolUntil.Before(minCool) { a.CoolUntil = minCool m.saveCooldown(a.Account.Phone, a.CoolUntil) } } m.mu.Unlock() return ErrAllCooling } if waitSeconds > 60 { // Mark current account cooling, caller should re-Acquire m.Release(acc, wait) return nil } // ≤60s: mark cooling and wait m.Release(acc, wait) // Caller is responsible for waiting; we just mark the cooldown return nil } // GetStatuses returns the current status of all managed accounts. func (m *AccountManager) GetStatuses() map[string]string { m.mu.Lock() defer m.mu.Unlock() result := make(map[string]string, len(m.accounts)) now := time.Now() for _, acc := range m.accounts { switch { case acc.InUse: result[acc.Account.Phone] = "online" case now.Before(acc.CoolUntil): result[acc.Account.Phone] = "cooling" default: result[acc.Account.Phone] = "idle" } } return result } // SetProxy sets the proxy URL for TG connections. // Disconnects all idle clients so the next Acquire+Connect uses the new proxy. func (m *AccountManager) SetProxy(proxyURL string) { m.mu.Lock() defer m.mu.Unlock() if m.proxyURL == proxyURL { return } m.proxyURL = proxyURL // Disconnect idle clients so they reconnect with new proxy on next use for _, acc := range m.accounts { if !acc.InUse { acc.Client.Disconnect() acc.Client.SetProxy(proxyURL) } } } // GetProxy returns the current proxy URL. func (m *AccountManager) GetProxy() string { m.mu.Lock() defer m.mu.Unlock() return m.proxyURL } // loadCooldowns 从 Redis 加载冷却状态(在持有锁时调用) func (m *AccountManager) loadCooldowns() { if m.redis == nil { return } ctx := context.Background() for _, acc := range m.accounts { key := "spider:tg:floodwait:" + acc.Account.Phone val, err := m.redis.Get(ctx, key).Int64() if err != nil { continue } coolUntil := time.Unix(val, 0) if time.Now().Before(coolUntil) { acc.CoolUntil = coolUntil } } } // saveCooldown 保存冷却状态到 Redis(在持有锁时调用) func (m *AccountManager) saveCooldown(phone string, coolUntil time.Time) { if m.redis == nil { return } ctx := context.Background() key := "spider:tg:floodwait:" + phone ttl := time.Until(coolUntil) if ttl <= 0 { return } _ = m.redis.Set(ctx, key, coolUntil.Unix(), ttl).Err() }