| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330 |
- package telegram
- import (
- "context"
- "encoding/json"
- "errors"
- "fmt"
- "log"
- "strconv"
- "time"
- "github.com/redis/go-redis/v9"
- )
- // CloneResult is the outcome of a (possibly partial) group-clone run.
- type CloneResult struct {
- Username string `json:"group_username"`
- GroupTitle string `json:"group_title"`
- Participants []GroupParticipant `json:"participants"`
- Total int `json:"total"` // TG-reported total participant count
- Partial bool `json:"partial"` // true when the run stopped before done_queries was exhausted
- Status string `json:"status"` // "done" | "paused" (all accounts cooling) | "running" (transient)
- QueriesDone int `json:"queries_done"` // how many search queries have been completed across all runs
- QueriesTotal int `json:"queries_total"` // how many search queries participantSearchQueries defines
- }
- const (
- cloneKeyPrefix = "spider:tg:clone:"
- cloneTTL = 24 * time.Hour
- )
- func cloneKey(username, suffix string) string {
- return cloneKeyPrefix + username + ":" + suffix
- }
- // CloneGroupMembers fetches all visible participants of a group, persisting
- // progress in Redis so that FloodWait-cooled accounts can be swapped in and
- // subsequent calls pick up from where the last one stopped.
- //
- // Behavior:
- // - Phase 1 (empty query) runs once per session to obtain the current total.
- // - Phase 2 iterates the extended search query set; each query is marked in
- // Redis `done_queries` only on success, so a FloodWait in mid-query means
- // that query will be retried on the next run.
- // - On FloodWait, the current account is cooled via mgr.HandleFloodWait and
- // a new account is acquired. When ErrAllCooling is returned, the function
- // returns a partial result with Status="paused".
- // - Set reset=true to discard the Redis state and start over.
- func CloneGroupMembers(ctx context.Context, mgr *AccountManager, rdb *redis.Client, username string, reset bool) (*CloneResult, error) {
- if mgr == nil {
- return nil, errors.New("account manager is nil")
- }
- if rdb == nil {
- return nil, errors.New("redis client is nil")
- }
- if reset {
- _ = rdb.Del(ctx,
- cloneKey(username, "seen"),
- cloneKey(username, "done_queries"),
- cloneKey(username, "participants"),
- cloneKey(username, "total"),
- cloneKey(username, "status"),
- ).Err()
- }
- res := &CloneResult{Username: username, Status: "running"}
- queries := participantSearchQueries()
- res.QueriesTotal = len(queries)
- // Restore state from Redis.
- seen := make(map[int64]bool)
- if ids, err := rdb.SMembers(ctx, cloneKey(username, "seen")).Result(); err == nil {
- for _, s := range ids {
- if id, err := strconv.ParseInt(s, 10, 64); err == nil {
- seen[id] = true
- }
- }
- }
- doneQueries := make(map[string]bool)
- if qs, err := rdb.SMembers(ctx, cloneKey(username, "done_queries")).Result(); err == nil {
- for _, q := range qs {
- doneQueries[q] = true
- }
- }
- res.QueriesDone = len(doneQueries)
- if pts, err := rdb.LRange(ctx, cloneKey(username, "participants"), 0, -1).Result(); err == nil {
- for _, s := range pts {
- var p GroupParticipant
- if err := json.Unmarshal([]byte(s), &p); err == nil {
- res.Participants = append(res.Participants, p)
- }
- }
- }
- if t, err := rdb.Get(ctx, cloneKey(username, "total")).Int(); err == nil {
- res.Total = t
- }
- _ = rdb.Set(ctx, cloneKey(username, "status"), "running", cloneTTL).Err()
- // Persist helpers.
- addUsers := func(users []GroupParticipant) int {
- added := 0
- if len(users) == 0 {
- return 0
- }
- pipe := rdb.TxPipeline()
- for _, p := range users {
- if seen[p.ID] {
- continue
- }
- seen[p.ID] = true
- res.Participants = append(res.Participants, p)
- pipe.SAdd(ctx, cloneKey(username, "seen"), strconv.FormatInt(p.ID, 10))
- if b, err := json.Marshal(p); err == nil {
- pipe.RPush(ctx, cloneKey(username, "participants"), string(b))
- }
- added++
- }
- if added > 0 {
- pipe.Expire(ctx, cloneKey(username, "seen"), cloneTTL)
- pipe.Expire(ctx, cloneKey(username, "participants"), cloneTTL)
- _, _ = pipe.Exec(ctx)
- }
- return added
- }
- markQueryDone := func(q string) {
- if doneQueries[q] {
- return
- }
- doneQueries[q] = true
- res.QueriesDone = len(doneQueries)
- pipe := rdb.TxPipeline()
- pipe.SAdd(ctx, cloneKey(username, "done_queries"), q)
- pipe.Expire(ctx, cloneKey(username, "done_queries"), cloneTTL)
- _, _ = pipe.Exec(ctx)
- }
- setTotal := func(n int) {
- if n > res.Total {
- res.Total = n
- _ = rdb.Set(ctx, cloneKey(username, "total"), n, cloneTTL).Err()
- }
- }
- collectedEverything := func() bool {
- return res.Total > 0 && len(seen) >= res.Total
- }
- allQueriesDone := func() bool {
- return len(doneQueries) >= len(queries) && res.Total > 0 // require total known so we know we saw phase 1
- }
- // Main loop: keep rotating accounts until done or all cooling.
- for {
- if ctx.Err() != nil {
- res.Partial = true
- res.Status = "paused"
- _ = rdb.Set(ctx, cloneKey(username, "status"), res.Status, cloneTTL).Err()
- return res, ctx.Err()
- }
- if collectedEverything() || allQueriesDone() {
- break
- }
- acc, err := mgr.Acquire(ctx)
- if err != nil {
- if errors.Is(err, ErrAllCooling) {
- res.Partial = true
- res.Status = "paused"
- _ = rdb.Set(ctx, cloneKey(username, "status"), res.Status, cloneTTL).Err()
- log.Printf("[clone_group] %s: all accounts cooling, partial %d/%d (queries %d/%d)",
- username, len(seen), res.Total, res.QueriesDone, res.QueriesTotal)
- return res, nil
- }
- return res, err
- }
- // Per-account work. cooldownSecs is set when the account hits FloodWait
- // and should be cooled; otherwise we release with 0 (immediate availability).
- cooldownSecs, runErr := cloneGroupWithAccount(ctx, acc, username, queries, seen, doneQueries, addUsers, markQueryDone, setTotal, res)
- if cooldownSecs > 0 {
- mgr.HandleFloodWait(acc, cooldownSecs)
- } else {
- mgr.Release(acc, 0)
- }
- if runErr != nil {
- // Non-FloodWait fatal error (e.g. connect failed, channel resolve failed)
- res.Partial = true
- res.Status = "paused"
- _ = rdb.Set(ctx, cloneKey(username, "status"), res.Status, cloneTTL).Err()
- return res, runErr
- }
- // Otherwise loop again to try another account or confirm done.
- }
- res.Status = "done"
- res.Partial = false
- _ = rdb.Set(ctx, cloneKey(username, "status"), res.Status, cloneTTL).Err()
- log.Printf("[clone_group] %s: done, %d/%d participants, %d/%d queries",
- username, len(seen), res.Total, res.QueriesDone, res.QueriesTotal)
- return res, nil
- }
- // cloneGroupWithAccount runs one account's worth of work: connect → resolve →
- // (phase 1 if total unknown) → iterate undone queries until a FloodWait, all
- // queries done, or collection is saturated. Returns (cooldownSecs, err):
- // - cooldownSecs > 0 means the account hit FloodWait and should cool for that duration
- // - err != nil is a non-recoverable error; caller should abort the whole run
- // - both zero = clean exit (either all done or saturation); caller inspects res
- func cloneGroupWithAccount(
- ctx context.Context,
- acc *ManagedAccount,
- username string,
- queries []string,
- seen map[int64]bool,
- doneQueries map[string]bool,
- addUsers func([]GroupParticipant) int,
- markQueryDone func(string),
- setTotal func(int),
- res *CloneResult,
- ) (int, error) {
- if err := acc.Client.Connect(ctx); err != nil {
- return 0, fmt.Errorf("connect %s: %w", acc.Account.Phone, err)
- }
- defer acc.Client.Disconnect()
- inputCh, ch, chatID, err := acc.Client.ResolveGroupPeer(ctx, username)
- if err != nil {
- if fwe, ok := err.(*FloodWaitError); ok {
- return fwe.Seconds, nil
- }
- return 0, fmt.Errorf("resolve %s: %w", username, err)
- }
- // Basic chat path: no pagination, one call returns everyone visible.
- // There's no search query mechanism for basic chats, so the full response
- // saturates the collection in one shot.
- if inputCh == nil && chatID != 0 {
- participants, err := acc.Client.GetChatParticipantsByID(ctx, chatID)
- if err != nil {
- if fwe, ok := err.(*FloodWaitError); ok {
- return fwe.Seconds, nil
- }
- return 0, fmt.Errorf("basic chat participants %s: %w", username, err)
- }
- addUsers(participants)
- setTotal(len(participants))
- // Mark phase 1 and every query as done to terminate the outer loop immediately.
- markQueryDone("")
- for _, q := range queries {
- markQueryDone(q)
- }
- log.Printf("[clone_group] %s basic chat: +%d (done)", username, len(participants))
- return 0, nil
- }
- if res.GroupTitle == "" && ch != nil {
- res.GroupTitle = ch.Title
- }
- // Auto-join: private groups and some supergroups refuse GetParticipants for
- // non-members. USER_ALREADY_PARTICIPANT is treated as success. Failure is
- // non-fatal (we still try GetParticipants — may work for public groups
- // that block joining but allow member listing).
- if err := acc.Client.JoinChannel(ctx, inputCh); err != nil {
- if fwe, ok := err.(*FloodWaitError); ok {
- return fwe.Seconds, nil
- }
- log.Printf("[clone_group] %s join failed (continuing anyway): %v", username, err)
- }
- // Phase 1: empty query. Run only once per clone session (when total is not
- // yet known). This both seeds seen with the visible batch AND captures total.
- if res.Total == 0 {
- users, total, err := acc.Client.FetchParticipantsByQuery(ctx, inputCh, "")
- if err != nil {
- if fwe, ok := err.(*FloodWaitError); ok {
- return fwe.Seconds, nil
- }
- log.Printf("[clone_group] phase1 error for %s: %v", username, err)
- return 0, nil // non-FloodWait; abort this account, let outer loop retry
- }
- added := addUsers(users)
- setTotal(total)
- markQueryDone("") // empty string marks phase 1 as complete
- log.Printf("[clone_group] %s phase1: +%d (total=%d)", username, added, total)
- if res.Total > 0 && len(seen) >= res.Total {
- return 0, nil // saturated
- }
- if err := jitterSleep(ctx, 2*time.Second, 4*time.Second); err != nil {
- return 0, err
- }
- }
- // Phase 2: iterate undone queries.
- for _, q := range queries {
- if ctx.Err() != nil {
- return 0, ctx.Err()
- }
- if doneQueries[q] {
- continue
- }
- if res.Total > 0 && len(seen) >= res.Total {
- return 0, nil // saturated
- }
- users, total, err := acc.Client.FetchParticipantsByQuery(ctx, inputCh, q)
- if err != nil {
- if fwe, ok := err.(*FloodWaitError); ok {
- // Partial results from this query may be in `users` — persist them
- // before swapping account. Don't mark the query done.
- addUsers(users)
- log.Printf("[clone_group] %s q=%q flood wait %ds after %d new, %d/%d",
- username, q, fwe.Seconds, len(users), len(seen), res.Total)
- return fwe.Seconds, nil
- }
- log.Printf("[clone_group] %s q=%q error: %v (skip, not marking done)", username, q, err)
- continue
- }
- addUsers(users)
- setTotal(total)
- markQueryDone(q)
- if err := jitterSleep(ctx, 2*time.Second, 4*time.Second); err != nil {
- return 0, err
- }
- }
- return 0, nil
- }
|