package telegram import ( "context" "encoding/json" "errors" "fmt" "log" "strconv" "time" "github.com/redis/go-redis/v9" ) // CloneResult is the outcome of a (possibly partial) group-clone run. type CloneResult struct { Username string `json:"group_username"` GroupTitle string `json:"group_title"` Participants []GroupParticipant `json:"participants"` Total int `json:"total"` // TG-reported total participant count Partial bool `json:"partial"` // true when the run stopped before done_queries was exhausted Status string `json:"status"` // "done" | "paused" (all accounts cooling) | "running" (transient) QueriesDone int `json:"queries_done"` // how many search queries have been completed across all runs QueriesTotal int `json:"queries_total"` // how many search queries participantSearchQueries defines } const ( cloneKeyPrefix = "spider:tg:clone:" cloneTTL = 24 * time.Hour ) func cloneKey(username, suffix string) string { return cloneKeyPrefix + username + ":" + suffix } // CloneGroupMembers fetches all visible participants of a group, persisting // progress in Redis so that FloodWait-cooled accounts can be swapped in and // subsequent calls pick up from where the last one stopped. // // Behavior: // - Phase 1 (empty query) runs once per session to obtain the current total. // - Phase 2 iterates the extended search query set; each query is marked in // Redis `done_queries` only on success, so a FloodWait in mid-query means // that query will be retried on the next run. // - On FloodWait, the current account is cooled via mgr.HandleFloodWait and // a new account is acquired. When ErrAllCooling is returned, the function // returns a partial result with Status="paused". // - Set reset=true to discard the Redis state and start over. func CloneGroupMembers(ctx context.Context, mgr *AccountManager, rdb *redis.Client, username string, reset bool) (*CloneResult, error) { if mgr == nil { return nil, errors.New("account manager is nil") } if rdb == nil { return nil, errors.New("redis client is nil") } if reset { _ = rdb.Del(ctx, cloneKey(username, "seen"), cloneKey(username, "done_queries"), cloneKey(username, "participants"), cloneKey(username, "total"), cloneKey(username, "status"), ).Err() } res := &CloneResult{Username: username, Status: "running"} queries := participantSearchQueries() res.QueriesTotal = len(queries) // Restore state from Redis. seen := make(map[int64]bool) if ids, err := rdb.SMembers(ctx, cloneKey(username, "seen")).Result(); err == nil { for _, s := range ids { if id, err := strconv.ParseInt(s, 10, 64); err == nil { seen[id] = true } } } doneQueries := make(map[string]bool) if qs, err := rdb.SMembers(ctx, cloneKey(username, "done_queries")).Result(); err == nil { for _, q := range qs { doneQueries[q] = true } } res.QueriesDone = len(doneQueries) if pts, err := rdb.LRange(ctx, cloneKey(username, "participants"), 0, -1).Result(); err == nil { for _, s := range pts { var p GroupParticipant if err := json.Unmarshal([]byte(s), &p); err == nil { res.Participants = append(res.Participants, p) } } } if t, err := rdb.Get(ctx, cloneKey(username, "total")).Int(); err == nil { res.Total = t } _ = rdb.Set(ctx, cloneKey(username, "status"), "running", cloneTTL).Err() // Persist helpers. addUsers := func(users []GroupParticipant) int { added := 0 if len(users) == 0 { return 0 } pipe := rdb.TxPipeline() for _, p := range users { if seen[p.ID] { continue } seen[p.ID] = true res.Participants = append(res.Participants, p) pipe.SAdd(ctx, cloneKey(username, "seen"), strconv.FormatInt(p.ID, 10)) if b, err := json.Marshal(p); err == nil { pipe.RPush(ctx, cloneKey(username, "participants"), string(b)) } added++ } if added > 0 { pipe.Expire(ctx, cloneKey(username, "seen"), cloneTTL) pipe.Expire(ctx, cloneKey(username, "participants"), cloneTTL) _, _ = pipe.Exec(ctx) } return added } markQueryDone := func(q string) { if doneQueries[q] { return } doneQueries[q] = true res.QueriesDone = len(doneQueries) pipe := rdb.TxPipeline() pipe.SAdd(ctx, cloneKey(username, "done_queries"), q) pipe.Expire(ctx, cloneKey(username, "done_queries"), cloneTTL) _, _ = pipe.Exec(ctx) } setTotal := func(n int) { if n > res.Total { res.Total = n _ = rdb.Set(ctx, cloneKey(username, "total"), n, cloneTTL).Err() } } collectedEverything := func() bool { return res.Total > 0 && len(seen) >= res.Total } allQueriesDone := func() bool { return len(doneQueries) >= len(queries) && res.Total > 0 // require total known so we know we saw phase 1 } // Main loop: keep rotating accounts until done or all cooling. for { if ctx.Err() != nil { res.Partial = true res.Status = "paused" _ = rdb.Set(ctx, cloneKey(username, "status"), res.Status, cloneTTL).Err() return res, ctx.Err() } if collectedEverything() || allQueriesDone() { break } acc, err := mgr.Acquire(ctx) if err != nil { if errors.Is(err, ErrAllCooling) { res.Partial = true res.Status = "paused" _ = rdb.Set(ctx, cloneKey(username, "status"), res.Status, cloneTTL).Err() log.Printf("[clone_group] %s: all accounts cooling, partial %d/%d (queries %d/%d)", username, len(seen), res.Total, res.QueriesDone, res.QueriesTotal) return res, nil } return res, err } // Per-account work. cooldownSecs is set when the account hits FloodWait // and should be cooled; otherwise we release with 0 (immediate availability). cooldownSecs, runErr := cloneGroupWithAccount(ctx, acc, username, queries, seen, doneQueries, addUsers, markQueryDone, setTotal, res) if cooldownSecs > 0 { mgr.HandleFloodWait(acc, cooldownSecs) } else { mgr.Release(acc, 0) } if runErr != nil { // Non-FloodWait fatal error (e.g. connect failed, channel resolve failed) res.Partial = true res.Status = "paused" _ = rdb.Set(ctx, cloneKey(username, "status"), res.Status, cloneTTL).Err() return res, runErr } // Otherwise loop again to try another account or confirm done. } res.Status = "done" res.Partial = false _ = rdb.Set(ctx, cloneKey(username, "status"), res.Status, cloneTTL).Err() log.Printf("[clone_group] %s: done, %d/%d participants, %d/%d queries", username, len(seen), res.Total, res.QueriesDone, res.QueriesTotal) return res, nil } // cloneGroupWithAccount runs one account's worth of work: connect → resolve → // (phase 1 if total unknown) → iterate undone queries until a FloodWait, all // queries done, or collection is saturated. Returns (cooldownSecs, err): // - cooldownSecs > 0 means the account hit FloodWait and should cool for that duration // - err != nil is a non-recoverable error; caller should abort the whole run // - both zero = clean exit (either all done or saturation); caller inspects res func cloneGroupWithAccount( ctx context.Context, acc *ManagedAccount, username string, queries []string, seen map[int64]bool, doneQueries map[string]bool, addUsers func([]GroupParticipant) int, markQueryDone func(string), setTotal func(int), res *CloneResult, ) (int, error) { if err := acc.Client.Connect(ctx); err != nil { return 0, fmt.Errorf("connect %s: %w", acc.Account.Phone, err) } defer acc.Client.Disconnect() inputCh, ch, chatID, err := acc.Client.ResolveGroupPeer(ctx, username) if err != nil { if fwe, ok := err.(*FloodWaitError); ok { return fwe.Seconds, nil } return 0, fmt.Errorf("resolve %s: %w", username, err) } // Basic chat path: no pagination, one call returns everyone visible. // There's no search query mechanism for basic chats, so the full response // saturates the collection in one shot. if inputCh == nil && chatID != 0 { participants, err := acc.Client.GetChatParticipantsByID(ctx, chatID) if err != nil { if fwe, ok := err.(*FloodWaitError); ok { return fwe.Seconds, nil } return 0, fmt.Errorf("basic chat participants %s: %w", username, err) } addUsers(participants) setTotal(len(participants)) // Mark phase 1 and every query as done to terminate the outer loop immediately. markQueryDone("") for _, q := range queries { markQueryDone(q) } log.Printf("[clone_group] %s basic chat: +%d (done)", username, len(participants)) return 0, nil } if res.GroupTitle == "" && ch != nil { res.GroupTitle = ch.Title } // Auto-join: private groups and some supergroups refuse GetParticipants for // non-members. USER_ALREADY_PARTICIPANT is treated as success. Failure is // non-fatal (we still try GetParticipants — may work for public groups // that block joining but allow member listing). if err := acc.Client.JoinChannel(ctx, inputCh); err != nil { if fwe, ok := err.(*FloodWaitError); ok { return fwe.Seconds, nil } log.Printf("[clone_group] %s join failed (continuing anyway): %v", username, err) } // Authoritative total from channels.getFullChannel. This is the TRUE participant // count reported by TG (e.g. 1935), not the filtered count a restricted member // sees via ChannelParticipantsSearch. Without this, phase 1 returning only 4-5 // users would prematurely satisfy the saturation check and skip phase 2. if fullTotal, err := acc.Client.GetFullChannelTotal(ctx, inputCh); err == nil && fullTotal > 0 { setTotal(fullTotal) log.Printf("[clone_group] %s full_channel total=%d", username, fullTotal) } else if err != nil { if fwe, ok := err.(*FloodWaitError); ok { return fwe.Seconds, nil } log.Printf("[clone_group] %s get_full_channel failed: %v (continuing)", username, err) } // Phase 1: Recent filter (returns up to ~200 active members). Replaces the // empty-Q Search filter, which is often heavily restricted for non-admin // members (returns only 4-5 results instead of ~200). if !doneQueries[""] { users, recentTotal, err := acc.Client.FetchRecentParticipants(ctx, inputCh) if err != nil { if fwe, ok := err.(*FloodWaitError); ok { return fwe.Seconds, nil } log.Printf("[clone_group] phase1 (recent) error for %s: %v", username, err) return 0, nil // non-FloodWait; abort this account, let outer loop retry } added := addUsers(users) if recentTotal > res.Total { setTotal(recentTotal) } markQueryDone("") // empty string marks phase 1 as complete log.Printf("[clone_group] %s phase1 recent: +%d (visible=%d, target=%d)", username, added, recentTotal, res.Total) if res.Total > 0 && len(seen) >= res.Total { return 0, nil // saturated } if err := jitterSleep(ctx, 2*time.Second, 4*time.Second); err != nil { return 0, err } } // Phase 2: iterate undone queries with a per-account budget and // consecutive-empty-guard. The goal is to yield BEFORE TG soft-bans the // account for over-querying. Caller rotates to a fresh account after. const ( maxQueriesPerRun = 30 // hard cap per account; outer loop rotates after consecutiveEmptyLimit = 15 // if this many queries in a row add 0 new users, this account is exhausted ) successCount := 0 consecutiveEmpty := 0 for _, q := range queries { if ctx.Err() != nil { return 0, ctx.Err() } if doneQueries[q] { continue } if res.Total > 0 && len(seen) >= res.Total { return 0, nil // saturated — done } if successCount >= maxQueriesPerRun { log.Printf("[clone_group] %s budget reached (%d queries on %s); yielding to next account", username, maxQueriesPerRun, acc.Account.Phone) return 0, nil } if consecutiveEmpty >= consecutiveEmptyLimit { log.Printf("[clone_group] %s %d consecutive empty queries on %s; account exhausted, yielding", username, consecutiveEmpty, acc.Account.Phone) return 0, nil } users, total, err := acc.Client.FetchParticipantsByQuery(ctx, inputCh, q) if err != nil { if fwe, ok := err.(*FloodWaitError); ok { // Partial results from this query may be in `users` — persist them // before swapping account. Don't mark the query done. addUsers(users) log.Printf("[clone_group] %s q=%q flood wait %ds after %d new, %d/%d", username, q, fwe.Seconds, len(users), len(seen), res.Total) return fwe.Seconds, nil } log.Printf("[clone_group] %s q=%q error: %v (skip, not marking done)", username, q, err) continue } added := addUsers(users) setTotal(total) markQueryDone(q) successCount++ if added == 0 { consecutiveEmpty++ } else { consecutiveEmpty = 0 } // Slow-paced jitter: 8-15s per query to avoid TG anti-scrape detection. // A full 30-query budget takes ~6 min — intentional. if err := jitterSleep(ctx, 8*time.Second, 15*time.Second); err != nil { return 0, err } } return 0, nil }