messagecloning.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. package telegram
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "log"
  8. "strconv"
  9. "github.com/redis/go-redis/v9"
  10. )
  11. // CloneGroupMembersFromMessages scrapes senders of recent messages in a
  12. // group/channel. This is the preferred strategy for active groups where the
  13. // member-list API is restricted: every message exposes its sender, and any
  14. // joined account can read history. In a 1k+ member active group this
  15. // routinely yields 100-300 usernames in one pass vs 5-20 from the members
  16. // API for a non-admin account.
  17. //
  18. // State lives in the same Redis keys as CloneGroupMembers (spider:tg:clone:
  19. // <username>:{seen,participants,total,status}). Running both strategies on
  20. // the same group merges results naturally — dedup by user ID is shared.
  21. //
  22. // maxMessages caps how many recent messages to scan (default 2000).
  23. func CloneGroupMembersFromMessages(ctx context.Context, mgr *AccountManager, rdb *redis.Client, username string, maxMessages int) (*CloneResult, error) {
  24. if mgr == nil {
  25. return nil, errors.New("account manager is nil")
  26. }
  27. if rdb == nil {
  28. return nil, errors.New("redis client is nil")
  29. }
  30. if maxMessages <= 0 {
  31. maxMessages = 2000
  32. }
  33. res := &CloneResult{Username: username, Status: "running"}
  34. // Restore existing seen set so this pass is additive to member-API runs.
  35. seen := make(map[int64]bool)
  36. if ids, err := rdb.SMembers(ctx, cloneKey(username, "seen")).Result(); err == nil {
  37. for _, s := range ids {
  38. if id, err := strconv.ParseInt(s, 10, 64); err == nil {
  39. seen[id] = true
  40. }
  41. }
  42. }
  43. if pts, err := rdb.LRange(ctx, cloneKey(username, "participants"), 0, -1).Result(); err == nil {
  44. for _, s := range pts {
  45. var p GroupParticipant
  46. if err := json.Unmarshal([]byte(s), &p); err == nil {
  47. res.Participants = append(res.Participants, p)
  48. }
  49. }
  50. }
  51. if t, err := rdb.Get(ctx, cloneKey(username, "total")).Int(); err == nil {
  52. res.Total = t
  53. }
  54. acc, err := mgr.Acquire(ctx)
  55. if err != nil {
  56. if errors.Is(err, ErrAllCooling) {
  57. res.Partial = true
  58. res.Status = "paused"
  59. return res, nil
  60. }
  61. return res, err
  62. }
  63. cooldownSecs := 0
  64. defer func() {
  65. if cooldownSecs > 0 {
  66. mgr.HandleFloodWait(acc, cooldownSecs)
  67. } else {
  68. mgr.Release(acc, 0)
  69. }
  70. }()
  71. if err := acc.Client.Connect(ctx); err != nil {
  72. return res, fmt.Errorf("connect %s: %w", acc.Account.Phone, err)
  73. }
  74. defer acc.Client.Disconnect()
  75. // Best-effort auto-join so we have history-read permission.
  76. if inputCh, ch, _, err := acc.Client.ResolveGroupPeer(ctx, username); err == nil && inputCh != nil {
  77. _ = acc.Client.JoinChannel(ctx, inputCh)
  78. if res.GroupTitle == "" && ch != nil {
  79. res.GroupTitle = ch.Title
  80. }
  81. // Refresh authoritative total opportunistically.
  82. if t, err := acc.Client.GetFullChannelTotal(ctx, inputCh); err == nil && t > 0 && t > res.Total {
  83. res.Total = t
  84. _ = rdb.Set(ctx, cloneKey(username, "total"), t, cloneTTL).Err()
  85. }
  86. }
  87. peer, err := acc.Client.ResolveInputPeer(ctx, username)
  88. if err != nil {
  89. if fwe, ok := err.(*FloodWaitError); ok {
  90. cooldownSecs = fwe.Seconds
  91. return res, nil
  92. }
  93. return res, fmt.Errorf("resolve peer: %w", err)
  94. }
  95. senders, err := acc.Client.FetchMessageSenders(ctx, peer, maxMessages)
  96. if err != nil {
  97. if fwe, ok := err.(*FloodWaitError); ok {
  98. cooldownSecs = fwe.Seconds
  99. } else {
  100. log.Printf("[clone_msg] %s fetch history error: %v", username, err)
  101. }
  102. // fall through — persist whatever we got before the error.
  103. }
  104. added := 0
  105. pipe := rdb.TxPipeline()
  106. for _, p := range senders {
  107. if seen[p.ID] {
  108. continue
  109. }
  110. seen[p.ID] = true
  111. res.Participants = append(res.Participants, p)
  112. pipe.SAdd(ctx, cloneKey(username, "seen"), strconv.FormatInt(p.ID, 10))
  113. if b, jerr := json.Marshal(p); jerr == nil {
  114. pipe.RPush(ctx, cloneKey(username, "participants"), string(b))
  115. }
  116. added++
  117. }
  118. if added > 0 {
  119. pipe.Expire(ctx, cloneKey(username, "seen"), cloneTTL)
  120. pipe.Expire(ctx, cloneKey(username, "participants"), cloneTTL)
  121. _, _ = pipe.Exec(ctx)
  122. }
  123. log.Printf("[clone_msg] %s: scanned up to %d messages, +%d new senders with @username (pool now %d)",
  124. username, maxMessages, added, len(res.Participants))
  125. if cooldownSecs > 0 {
  126. res.Partial = true
  127. res.Status = "paused"
  128. } else {
  129. res.Status = "done"
  130. }
  131. _ = rdb.Set(ctx, cloneKey(username, "status"), res.Status, cloneTTL).Err()
  132. return res, nil
  133. }