|
@@ -0,0 +1,147 @@
|
|
|
|
|
+package telegram
|
|
|
|
|
+
|
|
|
|
|
+import (
|
|
|
|
|
+ "context"
|
|
|
|
|
+ "encoding/json"
|
|
|
|
|
+ "errors"
|
|
|
|
|
+ "fmt"
|
|
|
|
|
+ "log"
|
|
|
|
|
+ "strconv"
|
|
|
|
|
+
|
|
|
|
|
+ "github.com/redis/go-redis/v9"
|
|
|
|
|
+)
|
|
|
|
|
+
|
|
|
|
|
+// CloneGroupMembersFromMessages scrapes senders of recent messages in a
|
|
|
|
|
+// group/channel. This is the preferred strategy for active groups where the
|
|
|
|
|
+// member-list API is restricted: every message exposes its sender, and any
|
|
|
|
|
+// joined account can read history. In a 1k+ member active group this
|
|
|
|
|
+// routinely yields 100-300 usernames in one pass vs 5-20 from the members
|
|
|
|
|
+// API for a non-admin account.
|
|
|
|
|
+//
|
|
|
|
|
+// State lives in the same Redis keys as CloneGroupMembers (spider:tg:clone:
|
|
|
|
|
+// <username>:{seen,participants,total,status}). Running both strategies on
|
|
|
|
|
+// the same group merges results naturally — dedup by user ID is shared.
|
|
|
|
|
+//
|
|
|
|
|
+// maxMessages caps how many recent messages to scan (default 2000).
|
|
|
|
|
+func CloneGroupMembersFromMessages(ctx context.Context, mgr *AccountManager, rdb *redis.Client, username string, maxMessages int) (*CloneResult, error) {
|
|
|
|
|
+ if mgr == nil {
|
|
|
|
|
+ return nil, errors.New("account manager is nil")
|
|
|
|
|
+ }
|
|
|
|
|
+ if rdb == nil {
|
|
|
|
|
+ return nil, errors.New("redis client is nil")
|
|
|
|
|
+ }
|
|
|
|
|
+ if maxMessages <= 0 {
|
|
|
|
|
+ maxMessages = 2000
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ res := &CloneResult{Username: username, Status: "running"}
|
|
|
|
|
+
|
|
|
|
|
+ // Restore existing seen set so this pass is additive to member-API runs.
|
|
|
|
|
+ seen := make(map[int64]bool)
|
|
|
|
|
+ if ids, err := rdb.SMembers(ctx, cloneKey(username, "seen")).Result(); err == nil {
|
|
|
|
|
+ for _, s := range ids {
|
|
|
|
|
+ if id, err := strconv.ParseInt(s, 10, 64); err == nil {
|
|
|
|
|
+ seen[id] = true
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ if pts, err := rdb.LRange(ctx, cloneKey(username, "participants"), 0, -1).Result(); err == nil {
|
|
|
|
|
+ for _, s := range pts {
|
|
|
|
|
+ var p GroupParticipant
|
|
|
|
|
+ if err := json.Unmarshal([]byte(s), &p); err == nil {
|
|
|
|
|
+ res.Participants = append(res.Participants, p)
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ if t, err := rdb.Get(ctx, cloneKey(username, "total")).Int(); err == nil {
|
|
|
|
|
+ res.Total = t
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ acc, err := mgr.Acquire(ctx)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ if errors.Is(err, ErrAllCooling) {
|
|
|
|
|
+ res.Partial = true
|
|
|
|
|
+ res.Status = "paused"
|
|
|
|
|
+ return res, nil
|
|
|
|
|
+ }
|
|
|
|
|
+ return res, err
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ cooldownSecs := 0
|
|
|
|
|
+ defer func() {
|
|
|
|
|
+ if cooldownSecs > 0 {
|
|
|
|
|
+ mgr.HandleFloodWait(acc, cooldownSecs)
|
|
|
|
|
+ } else {
|
|
|
|
|
+ mgr.Release(acc, 0)
|
|
|
|
|
+ }
|
|
|
|
|
+ }()
|
|
|
|
|
+
|
|
|
|
|
+ if err := acc.Client.Connect(ctx); err != nil {
|
|
|
|
|
+ return res, fmt.Errorf("connect %s: %w", acc.Account.Phone, err)
|
|
|
|
|
+ }
|
|
|
|
|
+ defer acc.Client.Disconnect()
|
|
|
|
|
+
|
|
|
|
|
+ // Best-effort auto-join so we have history-read permission.
|
|
|
|
|
+ if inputCh, ch, _, err := acc.Client.ResolveGroupPeer(ctx, username); err == nil && inputCh != nil {
|
|
|
|
|
+ _ = acc.Client.JoinChannel(ctx, inputCh)
|
|
|
|
|
+ if res.GroupTitle == "" && ch != nil {
|
|
|
|
|
+ res.GroupTitle = ch.Title
|
|
|
|
|
+ }
|
|
|
|
|
+ // Refresh authoritative total opportunistically.
|
|
|
|
|
+ if t, err := acc.Client.GetFullChannelTotal(ctx, inputCh); err == nil && t > 0 && t > res.Total {
|
|
|
|
|
+ res.Total = t
|
|
|
|
|
+ _ = rdb.Set(ctx, cloneKey(username, "total"), t, cloneTTL).Err()
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ peer, err := acc.Client.ResolveInputPeer(ctx, username)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ if fwe, ok := err.(*FloodWaitError); ok {
|
|
|
|
|
+ cooldownSecs = fwe.Seconds
|
|
|
|
|
+ return res, nil
|
|
|
|
|
+ }
|
|
|
|
|
+ return res, fmt.Errorf("resolve peer: %w", err)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ senders, err := acc.Client.FetchMessageSenders(ctx, peer, maxMessages)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ if fwe, ok := err.(*FloodWaitError); ok {
|
|
|
|
|
+ cooldownSecs = fwe.Seconds
|
|
|
|
|
+ } else {
|
|
|
|
|
+ log.Printf("[clone_msg] %s fetch history error: %v", username, err)
|
|
|
|
|
+ }
|
|
|
|
|
+ // fall through — persist whatever we got before the error.
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ added := 0
|
|
|
|
|
+ pipe := rdb.TxPipeline()
|
|
|
|
|
+ for _, p := range senders {
|
|
|
|
|
+ if seen[p.ID] {
|
|
|
|
|
+ continue
|
|
|
|
|
+ }
|
|
|
|
|
+ seen[p.ID] = true
|
|
|
|
|
+ res.Participants = append(res.Participants, p)
|
|
|
|
|
+ pipe.SAdd(ctx, cloneKey(username, "seen"), strconv.FormatInt(p.ID, 10))
|
|
|
|
|
+ if b, jerr := json.Marshal(p); jerr == nil {
|
|
|
|
|
+ pipe.RPush(ctx, cloneKey(username, "participants"), string(b))
|
|
|
|
|
+ }
|
|
|
|
|
+ added++
|
|
|
|
|
+ }
|
|
|
|
|
+ if added > 0 {
|
|
|
|
|
+ pipe.Expire(ctx, cloneKey(username, "seen"), cloneTTL)
|
|
|
|
|
+ pipe.Expire(ctx, cloneKey(username, "participants"), cloneTTL)
|
|
|
|
|
+ _, _ = pipe.Exec(ctx)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ log.Printf("[clone_msg] %s: scanned up to %d messages, +%d new senders with @username (pool now %d)",
|
|
|
|
|
+ username, maxMessages, added, len(res.Participants))
|
|
|
|
|
+
|
|
|
|
|
+ if cooldownSecs > 0 {
|
|
|
|
|
+ res.Partial = true
|
|
|
|
|
+ res.Status = "paused"
|
|
|
|
|
+ } else {
|
|
|
|
|
+ res.Status = "done"
|
|
|
|
|
+ }
|
|
|
|
|
+ _ = rdb.Set(ctx, cloneKey(username, "status"), res.Status, cloneTTL).Err()
|
|
|
|
|
+ return res, nil
|
|
|
|
|
+}
|