| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147 |
- package telegram
- import (
- "context"
- "encoding/json"
- "errors"
- "fmt"
- "log"
- "strconv"
- "github.com/redis/go-redis/v9"
- )
- // CloneGroupMembersFromMessages scrapes senders of recent messages in a
- // group/channel. This is the preferred strategy for active groups where the
- // member-list API is restricted: every message exposes its sender, and any
- // joined account can read history. In a 1k+ member active group this
- // routinely yields 100-300 usernames in one pass vs 5-20 from the members
- // API for a non-admin account.
- //
- // State lives in the same Redis keys as CloneGroupMembers (spider:tg:clone:
- // <username>:{seen,participants,total,status}). Running both strategies on
- // the same group merges results naturally — dedup by user ID is shared.
- //
- // maxMessages caps how many recent messages to scan (default 2000).
- func CloneGroupMembersFromMessages(ctx context.Context, mgr *AccountManager, rdb *redis.Client, username string, maxMessages int) (*CloneResult, error) {
- if mgr == nil {
- return nil, errors.New("account manager is nil")
- }
- if rdb == nil {
- return nil, errors.New("redis client is nil")
- }
- if maxMessages <= 0 {
- maxMessages = 2000
- }
- res := &CloneResult{Username: username, Status: "running"}
- // Restore existing seen set so this pass is additive to member-API runs.
- seen := make(map[int64]bool)
- if ids, err := rdb.SMembers(ctx, cloneKey(username, "seen")).Result(); err == nil {
- for _, s := range ids {
- if id, err := strconv.ParseInt(s, 10, 64); err == nil {
- seen[id] = true
- }
- }
- }
- if pts, err := rdb.LRange(ctx, cloneKey(username, "participants"), 0, -1).Result(); err == nil {
- for _, s := range pts {
- var p GroupParticipant
- if err := json.Unmarshal([]byte(s), &p); err == nil {
- res.Participants = append(res.Participants, p)
- }
- }
- }
- if t, err := rdb.Get(ctx, cloneKey(username, "total")).Int(); err == nil {
- res.Total = t
- }
- acc, err := mgr.Acquire(ctx)
- if err != nil {
- if errors.Is(err, ErrAllCooling) {
- res.Partial = true
- res.Status = "paused"
- return res, nil
- }
- return res, err
- }
- cooldownSecs := 0
- defer func() {
- if cooldownSecs > 0 {
- mgr.HandleFloodWait(acc, cooldownSecs)
- } else {
- mgr.Release(acc, 0)
- }
- }()
- if err := acc.Client.Connect(ctx); err != nil {
- return res, fmt.Errorf("connect %s: %w", acc.Account.Phone, err)
- }
- defer acc.Client.Disconnect()
- // Best-effort auto-join so we have history-read permission.
- if inputCh, ch, _, err := acc.Client.ResolveGroupPeer(ctx, username); err == nil && inputCh != nil {
- _ = acc.Client.JoinChannel(ctx, inputCh)
- if res.GroupTitle == "" && ch != nil {
- res.GroupTitle = ch.Title
- }
- // Refresh authoritative total opportunistically.
- if t, err := acc.Client.GetFullChannelTotal(ctx, inputCh); err == nil && t > 0 && t > res.Total {
- res.Total = t
- _ = rdb.Set(ctx, cloneKey(username, "total"), t, cloneTTL).Err()
- }
- }
- peer, err := acc.Client.ResolveInputPeer(ctx, username)
- if err != nil {
- if fwe, ok := err.(*FloodWaitError); ok {
- cooldownSecs = fwe.Seconds
- return res, nil
- }
- return res, fmt.Errorf("resolve peer: %w", err)
- }
- senders, err := acc.Client.FetchMessageSenders(ctx, peer, maxMessages)
- if err != nil {
- if fwe, ok := err.(*FloodWaitError); ok {
- cooldownSecs = fwe.Seconds
- } else {
- log.Printf("[clone_msg] %s fetch history error: %v", username, err)
- }
- // fall through — persist whatever we got before the error.
- }
- added := 0
- pipe := rdb.TxPipeline()
- for _, p := range senders {
- if seen[p.ID] {
- continue
- }
- seen[p.ID] = true
- res.Participants = append(res.Participants, p)
- pipe.SAdd(ctx, cloneKey(username, "seen"), strconv.FormatInt(p.ID, 10))
- if b, jerr := json.Marshal(p); jerr == nil {
- pipe.RPush(ctx, cloneKey(username, "participants"), string(b))
- }
- added++
- }
- if added > 0 {
- pipe.Expire(ctx, cloneKey(username, "seen"), cloneTTL)
- pipe.Expire(ctx, cloneKey(username, "participants"), cloneTTL)
- _, _ = pipe.Exec(ctx)
- }
- log.Printf("[clone_msg] %s: scanned up to %d messages, +%d new senders with @username (pool now %d)",
- username, maxMessages, added, len(res.Participants))
- if cooldownSecs > 0 {
- res.Partial = true
- res.Status = "paused"
- } else {
- res.Status = "done"
- }
- _ = rdb.Set(ctx, cloneKey(username, "status"), res.Status, cloneTTL).Err()
- return res, nil
- }
|