account_manager.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. package telegram
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "sync"
  7. "time"
  8. "github.com/redis/go-redis/v9"
  9. )
  10. // ErrAllCooling 所有账号都在冷却
  11. var ErrAllCooling = errors.New("all TG accounts are cooling down")
  12. // ErrNoAccounts 没有配置账号
  13. var ErrNoAccounts = errors.New("no TG accounts configured")
  14. // ManagedAccount 带状态的账号
  15. type ManagedAccount struct {
  16. Account Account
  17. Client *Client
  18. CoolUntil time.Time // 冷却截止时间
  19. InUse bool
  20. }
  21. // AccountManager 管理多个 TG 账号
  22. type AccountManager struct {
  23. accounts []*ManagedAccount
  24. mu sync.Mutex
  25. redis *redis.Client
  26. proxyURL string // optional proxy for TG connections
  27. }
  28. // NewAccountManager 创建 AccountManager
  29. func NewAccountManager(accounts []Account, rdb *redis.Client) *AccountManager {
  30. m := &AccountManager{
  31. redis: rdb,
  32. }
  33. m.Init(accounts)
  34. return m
  35. }
  36. // Init 初始化所有账号的客户端(不连接),从 Redis 恢复冷却状态
  37. func (m *AccountManager) Init(accounts []Account) {
  38. m.mu.Lock()
  39. defer m.mu.Unlock()
  40. m.accounts = make([]*ManagedAccount, 0, len(accounts))
  41. for _, acc := range accounts {
  42. m.accounts = append(m.accounts, &ManagedAccount{
  43. Account: acc,
  44. Client: New(acc),
  45. })
  46. }
  47. m.loadCooldowns()
  48. }
  49. // Acquire 获取一个可用账号(非冷却中、非使用中)
  50. // 如果所有账号都在冷却,返回等待时间最短的账号的剩余冷却时间作为错误
  51. func (m *AccountManager) Acquire(ctx context.Context) (*ManagedAccount, error) {
  52. m.mu.Lock()
  53. defer m.mu.Unlock()
  54. if len(m.accounts) == 0 {
  55. return nil, ErrNoAccounts
  56. }
  57. now := time.Now()
  58. var soonestCool *ManagedAccount
  59. for _, acc := range m.accounts {
  60. if acc.InUse {
  61. continue
  62. }
  63. if now.Before(acc.CoolUntil) {
  64. // Still cooling; track shortest cooldown
  65. if soonestCool == nil || acc.CoolUntil.Before(soonestCool.CoolUntil) {
  66. soonestCool = acc
  67. }
  68. continue
  69. }
  70. // Available — apply current proxy to the client
  71. acc.InUse = true
  72. if m.proxyURL != "" {
  73. acc.Client.SetProxy(m.proxyURL)
  74. }
  75. return acc, nil
  76. }
  77. if soonestCool != nil {
  78. remaining := time.Until(soonestCool.CoolUntil)
  79. return nil, fmt.Errorf("%w: shortest cooldown %s", ErrAllCooling, remaining.Round(time.Second))
  80. }
  81. // All accounts are InUse
  82. return nil, fmt.Errorf("%w", ErrAllCooling)
  83. }
  84. // Release 归还账号
  85. // floodWait > 0 时标记冷却,写入 Redis 持久化
  86. // Redis key: spider:tg:floodwait:{phone} value: 冷却截止 Unix 时间戳 TTL: 冷却时长
  87. func (m *AccountManager) Release(acc *ManagedAccount, floodWait time.Duration) {
  88. m.mu.Lock()
  89. defer m.mu.Unlock()
  90. acc.InUse = false
  91. if floodWait > 0 {
  92. coolUntil := time.Now().Add(floodWait)
  93. acc.CoolUntil = coolUntil
  94. m.saveCooldown(acc.Account.Phone, coolUntil)
  95. }
  96. }
  97. // HandleFloodWait FloodWait 处理策略:
  98. // ≤60s → 标记当前账号冷却,等待后重试(返回 nil error 表示可重试)
  99. // >60s → 标记当前账号冷却,切换其他账号(返回 nil,调用方重新 Acquire)
  100. // >300s → 标记所有账号最少冷却 300s,返回 ErrAllCooling
  101. func (m *AccountManager) HandleFloodWait(acc *ManagedAccount, waitSeconds int) error {
  102. wait := time.Duration(waitSeconds) * time.Second
  103. if waitSeconds > 300 {
  104. // Mark all accounts with at least 300s cooling
  105. m.mu.Lock()
  106. minCool := time.Now().Add(300 * time.Second)
  107. for _, a := range m.accounts {
  108. a.InUse = false
  109. if a.CoolUntil.Before(minCool) {
  110. a.CoolUntil = minCool
  111. m.saveCooldown(a.Account.Phone, a.CoolUntil)
  112. }
  113. }
  114. m.mu.Unlock()
  115. return ErrAllCooling
  116. }
  117. if waitSeconds > 60 {
  118. // Mark current account cooling, caller should re-Acquire
  119. m.Release(acc, wait)
  120. return nil
  121. }
  122. // ≤60s: mark cooling and wait
  123. m.Release(acc, wait)
  124. // Caller is responsible for waiting; we just mark the cooldown
  125. return nil
  126. }
  127. // GetStatuses returns the current status of all managed accounts.
  128. func (m *AccountManager) GetStatuses() map[string]string {
  129. m.mu.Lock()
  130. defer m.mu.Unlock()
  131. result := make(map[string]string, len(m.accounts))
  132. now := time.Now()
  133. for _, acc := range m.accounts {
  134. switch {
  135. case acc.InUse:
  136. result[acc.Account.Phone] = "online"
  137. case now.Before(acc.CoolUntil):
  138. result[acc.Account.Phone] = "cooling"
  139. default:
  140. result[acc.Account.Phone] = "idle"
  141. }
  142. }
  143. return result
  144. }
  145. // SetProxy sets the proxy URL for TG connections.
  146. // Disconnects all idle clients so the next Acquire+Connect uses the new proxy.
  147. func (m *AccountManager) SetProxy(proxyURL string) {
  148. m.mu.Lock()
  149. defer m.mu.Unlock()
  150. if m.proxyURL == proxyURL {
  151. return
  152. }
  153. m.proxyURL = proxyURL
  154. // Disconnect idle clients so they reconnect with new proxy on next use
  155. for _, acc := range m.accounts {
  156. if !acc.InUse {
  157. acc.Client.Disconnect()
  158. acc.Client.SetProxy(proxyURL)
  159. }
  160. }
  161. }
  162. // GetProxy returns the current proxy URL.
  163. func (m *AccountManager) GetProxy() string {
  164. m.mu.Lock()
  165. defer m.mu.Unlock()
  166. return m.proxyURL
  167. }
  168. // loadCooldowns 从 Redis 加载冷却状态(在持有锁时调用)
  169. func (m *AccountManager) loadCooldowns() {
  170. if m.redis == nil {
  171. return
  172. }
  173. ctx := context.Background()
  174. for _, acc := range m.accounts {
  175. key := "spider:tg:floodwait:" + acc.Account.Phone
  176. val, err := m.redis.Get(ctx, key).Int64()
  177. if err != nil {
  178. continue
  179. }
  180. coolUntil := time.Unix(val, 0)
  181. if time.Now().Before(coolUntil) {
  182. acc.CoolUntil = coolUntil
  183. }
  184. }
  185. }
  186. // saveCooldown 保存冷却状态到 Redis(在持有锁时调用)
  187. func (m *AccountManager) saveCooldown(phone string, coolUntil time.Time) {
  188. if m.redis == nil {
  189. return
  190. }
  191. ctx := context.Background()
  192. key := "spider:tg:floodwait:" + phone
  193. ttl := time.Until(coolUntil)
  194. if ttl <= 0 {
  195. return
  196. }
  197. _ = m.redis.Set(ctx, key, coolUntil.Unix(), ttl).Err()
  198. }