clonegroup.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. package telegram
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "log"
  8. "strconv"
  9. "time"
  10. "github.com/redis/go-redis/v9"
  11. )
  12. // CloneResult is the outcome of a (possibly partial) group-clone run.
  13. type CloneResult struct {
  14. Username string `json:"group_username"`
  15. GroupTitle string `json:"group_title"`
  16. Participants []GroupParticipant `json:"participants"`
  17. Total int `json:"total"` // TG-reported total participant count
  18. Partial bool `json:"partial"` // true when the run stopped before done_queries was exhausted
  19. Status string `json:"status"` // "done" | "paused" (all accounts cooling) | "running" (transient)
  20. QueriesDone int `json:"queries_done"` // how many search queries have been completed across all runs
  21. QueriesTotal int `json:"queries_total"` // how many search queries participantSearchQueries defines
  22. }
  23. const (
  24. cloneKeyPrefix = "spider:tg:clone:"
  25. cloneTTL = 24 * time.Hour
  26. )
  27. func cloneKey(username, suffix string) string {
  28. return cloneKeyPrefix + username + ":" + suffix
  29. }
  30. // CloneGroupMembers fetches all visible participants of a group, persisting
  31. // progress in Redis so that FloodWait-cooled accounts can be swapped in and
  32. // subsequent calls pick up from where the last one stopped.
  33. //
  34. // Behavior:
  35. // - Phase 1 (empty query) runs once per session to obtain the current total.
  36. // - Phase 2 iterates the extended search query set; each query is marked in
  37. // Redis `done_queries` only on success, so a FloodWait in mid-query means
  38. // that query will be retried on the next run.
  39. // - On FloodWait, the current account is cooled via mgr.HandleFloodWait and
  40. // a new account is acquired. When ErrAllCooling is returned, the function
  41. // returns a partial result with Status="paused".
  42. // - Set reset=true to discard the Redis state and start over.
  43. func CloneGroupMembers(ctx context.Context, mgr *AccountManager, rdb *redis.Client, username string, reset bool) (*CloneResult, error) {
  44. if mgr == nil {
  45. return nil, errors.New("account manager is nil")
  46. }
  47. if rdb == nil {
  48. return nil, errors.New("redis client is nil")
  49. }
  50. if reset {
  51. _ = rdb.Del(ctx,
  52. cloneKey(username, "seen"),
  53. cloneKey(username, "done_queries"),
  54. cloneKey(username, "participants"),
  55. cloneKey(username, "total"),
  56. cloneKey(username, "status"),
  57. ).Err()
  58. }
  59. res := &CloneResult{Username: username, Status: "running"}
  60. queries := participantSearchQueries()
  61. res.QueriesTotal = len(queries)
  62. // Restore state from Redis.
  63. seen := make(map[int64]bool)
  64. if ids, err := rdb.SMembers(ctx, cloneKey(username, "seen")).Result(); err == nil {
  65. for _, s := range ids {
  66. if id, err := strconv.ParseInt(s, 10, 64); err == nil {
  67. seen[id] = true
  68. }
  69. }
  70. }
  71. doneQueries := make(map[string]bool)
  72. if qs, err := rdb.SMembers(ctx, cloneKey(username, "done_queries")).Result(); err == nil {
  73. for _, q := range qs {
  74. doneQueries[q] = true
  75. }
  76. }
  77. res.QueriesDone = len(doneQueries)
  78. if pts, err := rdb.LRange(ctx, cloneKey(username, "participants"), 0, -1).Result(); err == nil {
  79. for _, s := range pts {
  80. var p GroupParticipant
  81. if err := json.Unmarshal([]byte(s), &p); err == nil {
  82. res.Participants = append(res.Participants, p)
  83. }
  84. }
  85. }
  86. if t, err := rdb.Get(ctx, cloneKey(username, "total")).Int(); err == nil {
  87. res.Total = t
  88. }
  89. _ = rdb.Set(ctx, cloneKey(username, "status"), "running", cloneTTL).Err()
  90. // Persist helpers.
  91. addUsers := func(users []GroupParticipant) int {
  92. added := 0
  93. if len(users) == 0 {
  94. return 0
  95. }
  96. pipe := rdb.TxPipeline()
  97. for _, p := range users {
  98. if seen[p.ID] {
  99. continue
  100. }
  101. seen[p.ID] = true
  102. res.Participants = append(res.Participants, p)
  103. pipe.SAdd(ctx, cloneKey(username, "seen"), strconv.FormatInt(p.ID, 10))
  104. if b, err := json.Marshal(p); err == nil {
  105. pipe.RPush(ctx, cloneKey(username, "participants"), string(b))
  106. }
  107. added++
  108. }
  109. if added > 0 {
  110. pipe.Expire(ctx, cloneKey(username, "seen"), cloneTTL)
  111. pipe.Expire(ctx, cloneKey(username, "participants"), cloneTTL)
  112. _, _ = pipe.Exec(ctx)
  113. }
  114. return added
  115. }
  116. markQueryDone := func(q string) {
  117. if doneQueries[q] {
  118. return
  119. }
  120. doneQueries[q] = true
  121. res.QueriesDone = len(doneQueries)
  122. pipe := rdb.TxPipeline()
  123. pipe.SAdd(ctx, cloneKey(username, "done_queries"), q)
  124. pipe.Expire(ctx, cloneKey(username, "done_queries"), cloneTTL)
  125. _, _ = pipe.Exec(ctx)
  126. }
  127. setTotal := func(n int) {
  128. if n > res.Total {
  129. res.Total = n
  130. _ = rdb.Set(ctx, cloneKey(username, "total"), n, cloneTTL).Err()
  131. }
  132. }
  133. collectedEverything := func() bool {
  134. return res.Total > 0 && len(seen) >= res.Total
  135. }
  136. allQueriesDone := func() bool {
  137. return len(doneQueries) >= len(queries) && res.Total > 0 // require total known so we know we saw phase 1
  138. }
  139. // Main loop: keep rotating accounts until done or all cooling.
  140. for {
  141. if ctx.Err() != nil {
  142. res.Partial = true
  143. res.Status = "paused"
  144. _ = rdb.Set(ctx, cloneKey(username, "status"), res.Status, cloneTTL).Err()
  145. return res, ctx.Err()
  146. }
  147. if collectedEverything() || allQueriesDone() {
  148. break
  149. }
  150. acc, err := mgr.Acquire(ctx)
  151. if err != nil {
  152. if errors.Is(err, ErrAllCooling) {
  153. res.Partial = true
  154. res.Status = "paused"
  155. _ = rdb.Set(ctx, cloneKey(username, "status"), res.Status, cloneTTL).Err()
  156. log.Printf("[clone_group] %s: all accounts cooling, partial %d/%d (queries %d/%d)",
  157. username, len(seen), res.Total, res.QueriesDone, res.QueriesTotal)
  158. return res, nil
  159. }
  160. return res, err
  161. }
  162. // Per-account work. cooldownSecs is set when the account hits FloodWait
  163. // and should be cooled; otherwise we release with 0 (immediate availability).
  164. cooldownSecs, runErr := cloneGroupWithAccount(ctx, acc, username, queries, seen, doneQueries, addUsers, markQueryDone, setTotal, res)
  165. if cooldownSecs > 0 {
  166. mgr.HandleFloodWait(acc, cooldownSecs)
  167. } else {
  168. mgr.Release(acc, 0)
  169. }
  170. if runErr != nil {
  171. // Non-FloodWait fatal error (e.g. connect failed, channel resolve failed)
  172. res.Partial = true
  173. res.Status = "paused"
  174. _ = rdb.Set(ctx, cloneKey(username, "status"), res.Status, cloneTTL).Err()
  175. return res, runErr
  176. }
  177. // Otherwise loop again to try another account or confirm done.
  178. }
  179. res.Status = "done"
  180. res.Partial = false
  181. _ = rdb.Set(ctx, cloneKey(username, "status"), res.Status, cloneTTL).Err()
  182. log.Printf("[clone_group] %s: done, %d/%d participants, %d/%d queries",
  183. username, len(seen), res.Total, res.QueriesDone, res.QueriesTotal)
  184. return res, nil
  185. }
  186. // cloneGroupWithAccount runs one account's worth of work: connect → resolve →
  187. // (phase 1 if total unknown) → iterate undone queries until a FloodWait, all
  188. // queries done, or collection is saturated. Returns (cooldownSecs, err):
  189. // - cooldownSecs > 0 means the account hit FloodWait and should cool for that duration
  190. // - err != nil is a non-recoverable error; caller should abort the whole run
  191. // - both zero = clean exit (either all done or saturation); caller inspects res
  192. func cloneGroupWithAccount(
  193. ctx context.Context,
  194. acc *ManagedAccount,
  195. username string,
  196. queries []string,
  197. seen map[int64]bool,
  198. doneQueries map[string]bool,
  199. addUsers func([]GroupParticipant) int,
  200. markQueryDone func(string),
  201. setTotal func(int),
  202. res *CloneResult,
  203. ) (int, error) {
  204. if err := acc.Client.Connect(ctx); err != nil {
  205. return 0, fmt.Errorf("connect %s: %w", acc.Account.Phone, err)
  206. }
  207. defer acc.Client.Disconnect()
  208. inputCh, ch, chatID, err := acc.Client.ResolveGroupPeer(ctx, username)
  209. if err != nil {
  210. if fwe, ok := err.(*FloodWaitError); ok {
  211. return fwe.Seconds, nil
  212. }
  213. return 0, fmt.Errorf("resolve %s: %w", username, err)
  214. }
  215. // Basic chat path: no pagination, one call returns everyone visible.
  216. // There's no search query mechanism for basic chats, so the full response
  217. // saturates the collection in one shot.
  218. if inputCh == nil && chatID != 0 {
  219. participants, err := acc.Client.GetChatParticipantsByID(ctx, chatID)
  220. if err != nil {
  221. if fwe, ok := err.(*FloodWaitError); ok {
  222. return fwe.Seconds, nil
  223. }
  224. return 0, fmt.Errorf("basic chat participants %s: %w", username, err)
  225. }
  226. addUsers(participants)
  227. setTotal(len(participants))
  228. // Mark phase 1 and every query as done to terminate the outer loop immediately.
  229. markQueryDone("")
  230. for _, q := range queries {
  231. markQueryDone(q)
  232. }
  233. log.Printf("[clone_group] %s basic chat: +%d (done)", username, len(participants))
  234. return 0, nil
  235. }
  236. if res.GroupTitle == "" && ch != nil {
  237. res.GroupTitle = ch.Title
  238. }
  239. // Auto-join: private groups and some supergroups refuse GetParticipants for
  240. // non-members. USER_ALREADY_PARTICIPANT is treated as success. Failure is
  241. // non-fatal (we still try GetParticipants — may work for public groups
  242. // that block joining but allow member listing).
  243. if err := acc.Client.JoinChannel(ctx, inputCh); err != nil {
  244. if fwe, ok := err.(*FloodWaitError); ok {
  245. return fwe.Seconds, nil
  246. }
  247. log.Printf("[clone_group] %s join failed (continuing anyway): %v", username, err)
  248. }
  249. // Authoritative total from channels.getFullChannel. This is the TRUE participant
  250. // count reported by TG (e.g. 1935), not the filtered count a restricted member
  251. // sees via ChannelParticipantsSearch. Without this, phase 1 returning only 4-5
  252. // users would prematurely satisfy the saturation check and skip phase 2.
  253. if fullTotal, err := acc.Client.GetFullChannelTotal(ctx, inputCh); err == nil && fullTotal > 0 {
  254. setTotal(fullTotal)
  255. log.Printf("[clone_group] %s full_channel total=%d", username, fullTotal)
  256. } else if err != nil {
  257. if fwe, ok := err.(*FloodWaitError); ok {
  258. return fwe.Seconds, nil
  259. }
  260. log.Printf("[clone_group] %s get_full_channel failed: %v (continuing)", username, err)
  261. }
  262. // Phase 1: Recent filter (returns up to ~200 active members). Replaces the
  263. // empty-Q Search filter, which is often heavily restricted for non-admin
  264. // members (returns only 4-5 results instead of ~200).
  265. if !doneQueries[""] {
  266. users, recentTotal, err := acc.Client.FetchRecentParticipants(ctx, inputCh)
  267. if err != nil {
  268. if fwe, ok := err.(*FloodWaitError); ok {
  269. return fwe.Seconds, nil
  270. }
  271. log.Printf("[clone_group] phase1 (recent) error for %s: %v", username, err)
  272. return 0, nil // non-FloodWait; abort this account, let outer loop retry
  273. }
  274. added := addUsers(users)
  275. if recentTotal > res.Total {
  276. setTotal(recentTotal)
  277. }
  278. markQueryDone("") // empty string marks phase 1 as complete
  279. log.Printf("[clone_group] %s phase1 recent: +%d (visible=%d, target=%d)",
  280. username, added, recentTotal, res.Total)
  281. if res.Total > 0 && len(seen) >= res.Total {
  282. return 0, nil // saturated
  283. }
  284. if err := jitterSleep(ctx, 2*time.Second, 4*time.Second); err != nil {
  285. return 0, err
  286. }
  287. }
  288. // Phase 2: iterate undone queries.
  289. for _, q := range queries {
  290. if ctx.Err() != nil {
  291. return 0, ctx.Err()
  292. }
  293. if doneQueries[q] {
  294. continue
  295. }
  296. if res.Total > 0 && len(seen) >= res.Total {
  297. return 0, nil // saturated
  298. }
  299. users, total, err := acc.Client.FetchParticipantsByQuery(ctx, inputCh, q)
  300. if err != nil {
  301. if fwe, ok := err.(*FloodWaitError); ok {
  302. // Partial results from this query may be in `users` — persist them
  303. // before swapping account. Don't mark the query done.
  304. addUsers(users)
  305. log.Printf("[clone_group] %s q=%q flood wait %ds after %d new, %d/%d",
  306. username, q, fwe.Seconds, len(users), len(seen), res.Total)
  307. return fwe.Seconds, nil
  308. }
  309. log.Printf("[clone_group] %s q=%q error: %v (skip, not marking done)", username, q, err)
  310. continue
  311. }
  312. addUsers(users)
  313. setTotal(total)
  314. markQueryDone(q)
  315. if err := jitterSleep(ctx, 2*time.Second, 4*time.Second); err != nil {
  316. return 0, err
  317. }
  318. }
  319. return 0, nil
  320. }