| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225 |
- package telegram
- import (
- "context"
- "errors"
- "fmt"
- "sync"
- "time"
- "github.com/redis/go-redis/v9"
- )
- // ErrAllCooling 所有账号都在冷却
- var ErrAllCooling = errors.New("all TG accounts are cooling down")
- // ErrNoAccounts 没有配置账号
- var ErrNoAccounts = errors.New("no TG accounts configured")
- // ManagedAccount 带状态的账号
- type ManagedAccount struct {
- Account Account
- Client *Client
- CoolUntil time.Time // 冷却截止时间
- InUse bool
- }
- // AccountManager 管理多个 TG 账号
- type AccountManager struct {
- accounts []*ManagedAccount
- mu sync.Mutex
- redis *redis.Client
- proxyURL string // optional proxy for TG connections
- }
- // NewAccountManager 创建 AccountManager
- func NewAccountManager(accounts []Account, rdb *redis.Client) *AccountManager {
- m := &AccountManager{
- redis: rdb,
- }
- m.Init(accounts)
- return m
- }
- // Init 初始化所有账号的客户端(不连接),从 Redis 恢复冷却状态
- func (m *AccountManager) Init(accounts []Account) {
- m.mu.Lock()
- defer m.mu.Unlock()
- m.accounts = make([]*ManagedAccount, 0, len(accounts))
- for _, acc := range accounts {
- m.accounts = append(m.accounts, &ManagedAccount{
- Account: acc,
- Client: New(acc),
- })
- }
- m.loadCooldowns()
- }
- // Acquire 获取一个可用账号(非冷却中、非使用中)
- // 如果所有账号都在冷却,返回等待时间最短的账号的剩余冷却时间作为错误
- func (m *AccountManager) Acquire(ctx context.Context) (*ManagedAccount, error) {
- m.mu.Lock()
- defer m.mu.Unlock()
- if len(m.accounts) == 0 {
- return nil, ErrNoAccounts
- }
- now := time.Now()
- var soonestCool *ManagedAccount
- for _, acc := range m.accounts {
- if acc.InUse {
- continue
- }
- if now.Before(acc.CoolUntil) {
- // Still cooling; track shortest cooldown
- if soonestCool == nil || acc.CoolUntil.Before(soonestCool.CoolUntil) {
- soonestCool = acc
- }
- continue
- }
- // Available — apply current proxy to the client
- acc.InUse = true
- if m.proxyURL != "" {
- acc.Client.SetProxy(m.proxyURL)
- }
- return acc, nil
- }
- if soonestCool != nil {
- remaining := time.Until(soonestCool.CoolUntil)
- return nil, fmt.Errorf("%w: shortest cooldown %s", ErrAllCooling, remaining.Round(time.Second))
- }
- // All accounts are InUse
- return nil, fmt.Errorf("%w", ErrAllCooling)
- }
- // Release 归还账号
- // floodWait > 0 时标记冷却,写入 Redis 持久化
- // Redis key: spider:tg:floodwait:{phone} value: 冷却截止 Unix 时间戳 TTL: 冷却时长
- func (m *AccountManager) Release(acc *ManagedAccount, floodWait time.Duration) {
- m.mu.Lock()
- defer m.mu.Unlock()
- acc.InUse = false
- if floodWait > 0 {
- coolUntil := time.Now().Add(floodWait)
- acc.CoolUntil = coolUntil
- m.saveCooldown(acc.Account.Phone, coolUntil)
- }
- }
- // HandleFloodWait FloodWait 处理策略:
- // ≤60s → 标记当前账号冷却,等待后重试(返回 nil error 表示可重试)
- // >60s → 标记当前账号冷却,切换其他账号(返回 nil,调用方重新 Acquire)
- // >300s → 标记所有账号最少冷却 300s,返回 ErrAllCooling
- func (m *AccountManager) HandleFloodWait(acc *ManagedAccount, waitSeconds int) error {
- wait := time.Duration(waitSeconds) * time.Second
- if waitSeconds > 300 {
- // Mark all accounts with at least 300s cooling
- m.mu.Lock()
- minCool := time.Now().Add(300 * time.Second)
- for _, a := range m.accounts {
- a.InUse = false
- if a.CoolUntil.Before(minCool) {
- a.CoolUntil = minCool
- m.saveCooldown(a.Account.Phone, a.CoolUntil)
- }
- }
- m.mu.Unlock()
- return ErrAllCooling
- }
- if waitSeconds > 60 {
- // Mark current account cooling, caller should re-Acquire
- m.Release(acc, wait)
- return nil
- }
- // ≤60s: mark cooling and wait
- m.Release(acc, wait)
- // Caller is responsible for waiting; we just mark the cooldown
- return nil
- }
- // GetStatuses returns the current status of all managed accounts.
- func (m *AccountManager) GetStatuses() map[string]string {
- m.mu.Lock()
- defer m.mu.Unlock()
- result := make(map[string]string, len(m.accounts))
- now := time.Now()
- for _, acc := range m.accounts {
- switch {
- case acc.InUse:
- result[acc.Account.Phone] = "online"
- case now.Before(acc.CoolUntil):
- result[acc.Account.Phone] = "cooling"
- default:
- result[acc.Account.Phone] = "idle"
- }
- }
- return result
- }
- // SetProxy sets the proxy URL for TG connections.
- // Disconnects all idle clients so the next Acquire+Connect uses the new proxy.
- func (m *AccountManager) SetProxy(proxyURL string) {
- m.mu.Lock()
- defer m.mu.Unlock()
- if m.proxyURL == proxyURL {
- return
- }
- m.proxyURL = proxyURL
- // Disconnect idle clients so they reconnect with new proxy on next use
- for _, acc := range m.accounts {
- if !acc.InUse {
- acc.Client.Disconnect()
- acc.Client.SetProxy(proxyURL)
- }
- }
- }
- // GetProxy returns the current proxy URL.
- func (m *AccountManager) GetProxy() string {
- m.mu.Lock()
- defer m.mu.Unlock()
- return m.proxyURL
- }
- // loadCooldowns 从 Redis 加载冷却状态(在持有锁时调用)
- func (m *AccountManager) loadCooldowns() {
- if m.redis == nil {
- return
- }
- ctx := context.Background()
- for _, acc := range m.accounts {
- key := "spider:tg:floodwait:" + acc.Account.Phone
- val, err := m.redis.Get(ctx, key).Int64()
- if err != nil {
- continue
- }
- coolUntil := time.Unix(val, 0)
- if time.Now().Before(coolUntil) {
- acc.CoolUntil = coolUntil
- }
- }
- }
- // saveCooldown 保存冷却状态到 Redis(在持有锁时调用)
- func (m *AccountManager) saveCooldown(phone string, coolUntil time.Time) {
- if m.redis == nil {
- return
- }
- ctx := context.Background()
- key := "spider:tg:floodwait:" + phone
- ttl := time.Until(coolUntil)
- if ttl <= 0 {
- return
- }
- _ = m.redis.Set(ctx, key, coolUntil.Unix(), ttl).Err()
- }
|