account_manager.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. package telegram
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "sync"
  7. "time"
  8. "github.com/redis/go-redis/v9"
  9. )
  10. // ErrAllCooling 所有账号都在冷却
  11. var ErrAllCooling = errors.New("all TG accounts are cooling down")
  12. // ErrNoAccounts 没有配置账号
  13. var ErrNoAccounts = errors.New("no TG accounts configured")
  14. // ManagedAccount 带状态的账号
  15. type ManagedAccount struct {
  16. Account Account
  17. Client *Client
  18. CoolUntil time.Time // 冷却截止时间
  19. InUse bool
  20. }
  21. // AccountManager 管理多个 TG 账号
  22. type AccountManager struct {
  23. accounts []*ManagedAccount
  24. mu sync.Mutex
  25. redis *redis.Client
  26. }
  27. // NewAccountManager 创建 AccountManager
  28. func NewAccountManager(accounts []Account, rdb *redis.Client) *AccountManager {
  29. m := &AccountManager{
  30. redis: rdb,
  31. }
  32. m.Init(accounts)
  33. return m
  34. }
  35. // Init 初始化所有账号的客户端(不连接),从 Redis 恢复冷却状态
  36. func (m *AccountManager) Init(accounts []Account) {
  37. m.mu.Lock()
  38. defer m.mu.Unlock()
  39. m.accounts = make([]*ManagedAccount, 0, len(accounts))
  40. for _, acc := range accounts {
  41. m.accounts = append(m.accounts, &ManagedAccount{
  42. Account: acc,
  43. Client: New(acc),
  44. })
  45. }
  46. m.loadCooldowns()
  47. }
  48. // Acquire 获取一个可用账号(非冷却中、非使用中)
  49. // 如果所有账号都在冷却,返回等待时间最短的账号的剩余冷却时间作为错误
  50. func (m *AccountManager) Acquire(ctx context.Context) (*ManagedAccount, error) {
  51. m.mu.Lock()
  52. defer m.mu.Unlock()
  53. if len(m.accounts) == 0 {
  54. return nil, ErrNoAccounts
  55. }
  56. now := time.Now()
  57. var soonestCool *ManagedAccount
  58. for _, acc := range m.accounts {
  59. if acc.InUse {
  60. continue
  61. }
  62. if now.Before(acc.CoolUntil) {
  63. // Still cooling; track shortest cooldown
  64. if soonestCool == nil || acc.CoolUntil.Before(soonestCool.CoolUntil) {
  65. soonestCool = acc
  66. }
  67. continue
  68. }
  69. // Available
  70. acc.InUse = true
  71. return acc, nil
  72. }
  73. if soonestCool != nil {
  74. remaining := time.Until(soonestCool.CoolUntil)
  75. return nil, fmt.Errorf("%w: shortest cooldown %s", ErrAllCooling, remaining.Round(time.Second))
  76. }
  77. // All accounts are InUse
  78. return nil, fmt.Errorf("%w", ErrAllCooling)
  79. }
  80. // Release 归还账号
  81. // floodWait > 0 时标记冷却,写入 Redis 持久化
  82. // Redis key: spider:tg:floodwait:{phone} value: 冷却截止 Unix 时间戳 TTL: 冷却时长
  83. func (m *AccountManager) Release(acc *ManagedAccount, floodWait time.Duration) {
  84. m.mu.Lock()
  85. defer m.mu.Unlock()
  86. acc.InUse = false
  87. if floodWait > 0 {
  88. coolUntil := time.Now().Add(floodWait)
  89. acc.CoolUntil = coolUntil
  90. m.saveCooldown(acc.Account.Phone, coolUntil)
  91. }
  92. }
  93. // HandleFloodWait FloodWait 处理策略:
  94. // ≤60s → 标记当前账号冷却,等待后重试(返回 nil error 表示可重试)
  95. // >60s → 标记当前账号冷却,切换其他账号(返回 nil,调用方重新 Acquire)
  96. // >300s → 标记所有账号最少冷却 300s,返回 ErrAllCooling
  97. func (m *AccountManager) HandleFloodWait(acc *ManagedAccount, waitSeconds int) error {
  98. wait := time.Duration(waitSeconds) * time.Second
  99. if waitSeconds > 300 {
  100. // Mark all accounts with at least 300s cooling
  101. m.mu.Lock()
  102. minCool := time.Now().Add(300 * time.Second)
  103. for _, a := range m.accounts {
  104. a.InUse = false
  105. if a.CoolUntil.Before(minCool) {
  106. a.CoolUntil = minCool
  107. m.saveCooldown(a.Account.Phone, a.CoolUntil)
  108. }
  109. }
  110. m.mu.Unlock()
  111. return ErrAllCooling
  112. }
  113. if waitSeconds > 60 {
  114. // Mark current account cooling, caller should re-Acquire
  115. m.Release(acc, wait)
  116. return nil
  117. }
  118. // ≤60s: mark cooling and wait
  119. m.Release(acc, wait)
  120. // Caller is responsible for waiting; we just mark the cooldown
  121. return nil
  122. }
  123. // loadCooldowns 从 Redis 加载冷却状态(在持有锁时调用)
  124. func (m *AccountManager) loadCooldowns() {
  125. if m.redis == nil {
  126. return
  127. }
  128. ctx := context.Background()
  129. for _, acc := range m.accounts {
  130. key := "spider:tg:floodwait:" + acc.Account.Phone
  131. val, err := m.redis.Get(ctx, key).Int64()
  132. if err != nil {
  133. continue
  134. }
  135. coolUntil := time.Unix(val, 0)
  136. if time.Now().Before(coolUntil) {
  137. acc.CoolUntil = coolUntil
  138. }
  139. }
  140. }
  141. // saveCooldown 保存冷却状态到 Redis(在持有锁时调用)
  142. func (m *AccountManager) saveCooldown(phone string, coolUntil time.Time) {
  143. if m.redis == nil {
  144. return
  145. }
  146. ctx := context.Background()
  147. key := "spider:tg:floodwait:" + phone
  148. ttl := time.Until(coolUntil)
  149. if ttl <= 0 {
  150. return
  151. }
  152. _ = m.redis.Set(ctx, key, coolUntil.Unix(), ttl).Err()
  153. }