package telegram import ( "bufio" "context" "encoding/base64" "fmt" "log" "math/rand/v2" "net" "net/url" "regexp" "sort" "strings" "sync" "time" "github.com/gotd/td/session" "github.com/gotd/td/telegram" "github.com/gotd/td/telegram/dcs" "github.com/gotd/td/tg" "github.com/gotd/td/tgerr" xproxy "golang.org/x/net/proxy" ) var tmeRegexp = regexp.MustCompile(`https?://t\.me/[^\s"'<>)\]]+`) // Client TG 客户端 type Client struct { account Account sessionPath string proxyURL string // SOCKS5/HTTP proxy URL mu sync.Mutex tgc *telegram.Client api *tg.Client cancel context.CancelFunc ready chan struct{} // closed when connected runErr error } // New 创建客户端(不连接,只初始化) func New(account Account) *Client { return &Client{ account: account, sessionPath: account.SessionFile, ready: make(chan struct{}), } } // SetProxy sets the proxy URL for this client's connections. func (c *Client) SetProxy(proxyURL string) { c.mu.Lock() c.proxyURL = proxyURL c.mu.Unlock() } // Connect 连接并认证(从 session 文件恢复) // session 文件不存在时返回错误(不做交互式登录,session 需要预先生成) func (c *Client) Connect(ctx context.Context) error { storage := &session.FileStorage{Path: c.sessionPath} opts := telegram.Options{ SessionStorage: storage, NoUpdates: true, Device: telegram.DeviceConfig{ DeviceModel: c.account.Device, AppVersion: c.account.AppVersion, SystemVersion: c.account.SystemVersion, LangPack: c.account.LangPack, SystemLangCode: c.account.SystemLangCode, LangCode: c.account.LangCode, }, } // Apply proxy if configured c.mu.Lock() proxyURL := c.proxyURL c.mu.Unlock() if proxyURL != "" { dialFunc, err := buildProxyDialer(proxyURL) if err != nil { log.Printf("[tg_client] failed to create proxy dialer: %v, connecting without proxy", err) } else { opts.Resolver = dcs.Plain(dcs.PlainOptions{Dial: dialFunc}) log.Printf("[tg_client] connecting via proxy: %s", proxyURL) } } client := telegram.NewClient(c.account.AppID, c.account.AppHash, opts) runCtx, cancel := context.WithCancel(ctx) c.mu.Lock() c.tgc = client c.cancel = cancel c.ready = make(chan struct{}) c.runErr = nil readyCh := c.ready c.mu.Unlock() errCh := make(chan error, 1) go func() { err := client.Run(runCtx, func(ctx context.Context) error { c.mu.Lock() c.api = client.API() close(readyCh) c.mu.Unlock() // Block until context is cancelled (Disconnect called) <-ctx.Done() return ctx.Err() }) c.mu.Lock() c.runErr = err c.mu.Unlock() errCh <- err }() // Wait for ready or error select { case <-readyCh: return nil case err := <-errCh: if err != nil && err != context.Canceled { return err } return nil case <-ctx.Done(): cancel() return ctx.Err() } } // Disconnect 断开连接 func (c *Client) Disconnect() { c.mu.Lock() cancel := c.cancel c.mu.Unlock() if cancel != nil { cancel() } } // waitReady waits for the client to be connected and returns the api client func (c *Client) waitReady(ctx context.Context) (*tg.Client, error) { c.mu.Lock() readyCh := c.ready api := c.api c.mu.Unlock() if api != nil { return api, nil } select { case <-readyCh: c.mu.Lock() api = c.api c.mu.Unlock() return api, nil case <-ctx.Done(): return nil, ctx.Err() } } // GetChannelInfo 获取频道/用户信息,通过用户名查找 func (c *Client) GetChannelInfo(ctx context.Context, username string) (*ChannelInfo, error) { api, err := c.waitReady(ctx) if err != nil { return nil, err } username = strings.TrimPrefix(username, "@") resolved, err := api.ContactsResolveUsername(ctx, &tg.ContactsResolveUsernameRequest{ Username: username, }) if err != nil { return nil, wrapFloodWait(err) } info := &ChannelInfo{Username: username} // Look for channel/chat in the resolved chats for _, ch := range resolved.Chats { switch v := ch.(type) { case *tg.Channel: title := v.Title info.Title = title info.IsChannel = v.GetBroadcast() info.IsGroup = v.GetMegagroup() if count, ok := v.GetParticipantsCount(); ok { info.MemberCount = count } // Get full channel info for About accessHash, hasHash := v.GetAccessHash() if hasHash { full, ferr := api.ChannelsGetFullChannel(ctx, &tg.InputChannel{ ChannelID: v.GetID(), AccessHash: accessHash, }) if ferr == nil { if cf, ok := full.FullChat.(*tg.ChannelFull); ok { info.About = cf.GetAbout() if count, ok := cf.GetParticipantsCount(); ok && info.MemberCount == 0 { info.MemberCount = count } } } } return info, nil case *tg.Chat: info.Title = v.Title info.IsGroup = true info.MemberCount = v.ParticipantsCount return info, nil } } return info, nil } // GetMessages 获取频道历史消息 // offsetID: 从哪条消息开始(断点续传) // limit: 最多取多少条 // 返回的消息按 ID 从小到大排序 func (c *Client) GetMessages(ctx context.Context, username string, offsetID, limit int) ([]Message, error) { api, err := c.waitReady(ctx) if err != nil { return nil, err } username = strings.TrimPrefix(username, "@") peer, err := c.resolveInputPeer(ctx, api, username) if err != nil { return nil, err } result, err := api.MessagesGetHistory(ctx, &tg.MessagesGetHistoryRequest{ Peer: peer, OffsetID: offsetID, Limit: limit, }) if err != nil { return nil, wrapFloodWait(err) } return extractMessages(result), nil } // GetPinnedMessages 获取置顶消息 func (c *Client) GetPinnedMessages(ctx context.Context, username string) ([]Message, error) { api, err := c.waitReady(ctx) if err != nil { return nil, err } username = strings.TrimPrefix(username, "@") peer, err := c.resolveInputPeer(ctx, api, username) if err != nil { return nil, err } result, err := api.MessagesSearch(ctx, &tg.MessagesSearchRequest{ Peer: peer, Filter: &tg.InputMessagesFilterPinned{}, Limit: 100, }) if err != nil { return nil, wrapFloodWait(err) } return extractMessages(result), nil } // VerifyUser 验证用户名是否存在,返回用户信息 func (c *Client) VerifyUser(ctx context.Context, username string) (*UserInfo, error) { api, err := c.waitReady(ctx) if err != nil { return nil, err } username = strings.TrimPrefix(username, "@") resolved, err := api.ContactsResolveUsername(ctx, &tg.ContactsResolveUsernameRequest{ Username: username, }) if err != nil { if tgerr.Is(err, "USERNAME_NOT_OCCUPIED", "USERNAME_INVALID") { return &UserInfo{Username: username, Exists: false}, nil } return nil, wrapFloodWait(err) } // Check if the peer is a user if _, ok := resolved.Peer.(*tg.PeerUser); ok { for _, u := range resolved.Users { if user, ok := u.(*tg.User); ok { info := &UserInfo{ ID: user.GetID(), Username: username, IsBot: user.GetBot(), IsPremium: user.GetPremium(), Exists: true, } if fn, ok := user.GetFirstName(); ok { info.FirstName = fn } if ln, ok := user.GetLastName(); ok { info.LastName = ln } if status, ok := user.GetStatus(); ok { if offline, ok := status.(*tg.UserStatusOffline); ok { t := time.Unix(int64(offline.GetWasOnline()), 0) info.LastOnline = &t } } return info, nil } } } // Check if it's a channel or group for _, ch := range resolved.Chats { switch v := ch.(type) { case *tg.Channel: return &UserInfo{ ID: v.GetID(), Username: username, IsChannel: v.GetBroadcast(), IsGroup: v.GetMegagroup(), Exists: true, }, nil case *tg.Chat: return &UserInfo{ ID: v.ID, IsGroup: true, Exists: true, }, nil } } return &UserInfo{Username: username, Exists: false}, nil } // ResolveGroupChannel looks up a group/channel by username and returns both // the InputChannel handle (for subsequent API calls) and the raw Channel // struct (for metadata like title and participant count). Returns // (nil, nil, error) for basic chats (no InputChannel). func (c *Client) ResolveGroupChannel(ctx context.Context, username string) (*tg.InputChannel, *tg.Channel, error) { api, err := c.waitReady(ctx) if err != nil { return nil, nil, err } username = strings.TrimPrefix(username, "@") resolved, err := api.ContactsResolveUsername(ctx, &tg.ContactsResolveUsernameRequest{Username: username}) if err != nil { return nil, nil, wrapFloodWait(err) } for _, ch := range resolved.Chats { if v, ok := ch.(*tg.Channel); ok { accessHash, _ := v.GetAccessHash() return &tg.InputChannel{ChannelID: v.GetID(), AccessHash: accessHash}, v, nil } } return nil, nil, fmt.Errorf("无法解析群组为超级群组: %s", username) } // FetchMessageSenders paginates channel/group history and returns distinct // sender information (username + display name + user ID). For active groups // this often yields far more usernames than participants.search (which is // heavily restricted for non-admins) — because every message exposes its // sender, and you can read history of any joined group. // // limit is the TOTAL number of messages to scan (paged 100 at a time). // The result contains only senders that have a public @username set. func (c *Client) FetchMessageSenders(ctx context.Context, peer tg.InputPeerClass, limit int) ([]GroupParticipant, error) { api, err := c.waitReady(ctx) if err != nil { return nil, err } if limit <= 0 { limit = 500 } const pageSize = 100 seen := make(map[int64]bool) var out []GroupParticipant offsetID := 0 scanned := 0 for scanned < limit { ps := pageSize if limit-scanned < ps { ps = limit - scanned } result, err := api.MessagesGetHistory(ctx, &tg.MessagesGetHistoryRequest{ Peer: peer, OffsetID: offsetID, Limit: ps, }) if err != nil { return out, wrapFloodWait(err) } // Build user map from response for quick lookup. users := make(map[int64]*tg.User) var msgs []tg.MessageClass switch v := result.(type) { case *tg.MessagesMessages: msgs = v.Messages for _, u := range v.Users { if usr, ok := u.(*tg.User); ok { users[usr.GetID()] = usr } } case *tg.MessagesMessagesSlice: msgs = v.Messages for _, u := range v.Users { if usr, ok := u.(*tg.User); ok { users[usr.GetID()] = usr } } case *tg.MessagesChannelMessages: msgs = v.Messages for _, u := range v.Users { if usr, ok := u.(*tg.User); ok { users[usr.GetID()] = usr } } default: return out, nil } if len(msgs) == 0 { break } minID := 0 for _, raw := range msgs { var fromID int64 var id int switch m := raw.(type) { case *tg.Message: id = m.GetID() if from, ok := m.GetFromID(); ok { if pu, ok := from.(*tg.PeerUser); ok { fromID = pu.UserID } } case *tg.MessageService: id = m.GetID() if from, ok := m.GetFromID(); ok { if pu, ok := from.(*tg.PeerUser); ok { fromID = pu.UserID } } } if id > 0 && (minID == 0 || id < minID) { minID = id } if fromID == 0 || seen[fromID] { continue } usr, ok := users[fromID] if !ok || usr.GetBot() { continue } uname, _ := usr.GetUsername() if uname == "" { continue // we only care about @-addressable accounts } seen[fromID] = true p := GroupParticipant{ ID: fromID, Username: uname, IsPremium: usr.GetPremium(), } if fn, ok := usr.GetFirstName(); ok { p.FirstName = fn } if ln, ok := usr.GetLastName(); ok { p.LastName = ln } out = append(out, p) } scanned += len(msgs) if minID == 0 || minID == offsetID { break // no progress possible } offsetID = minID if err := jitterSleep(ctx, 1500*time.Millisecond, 3*time.Second); err != nil { return out, err } } return out, nil } // ResolveInputPeer is a public wrapper over resolveInputPeer for use outside // the client file. Handles @username, channels, chats. func (c *Client) ResolveInputPeer(ctx context.Context, username string) (tg.InputPeerClass, error) { api, err := c.waitReady(ctx) if err != nil { return nil, err } return c.resolveInputPeer(ctx, api, strings.TrimPrefix(username, "@")) } // GetFullChannelTotal calls channels.getFullChannel to obtain the authoritative // participants_count (which may be much larger than what a restricted member // can see via ChannelParticipantsSearch). func (c *Client) GetFullChannelTotal(ctx context.Context, ch *tg.InputChannel) (int, error) { api, err := c.waitReady(ctx) if err != nil { return 0, err } full, err := api.ChannelsGetFullChannel(ctx, ch) if err != nil { return 0, wrapFloodWait(err) } cf, ok := full.FullChat.(*tg.ChannelFull) if !ok { return 0, fmt.Errorf("unexpected FullChat type") } if n, ok := cf.GetParticipantsCount(); ok { return n, nil } return 0, nil } // FetchRecentParticipants paginates channels.getParticipants with the Recent // filter, which returns active participants sorted by recency. This is the // preferred Phase 1 filter for large groups — empty-string Search filter is // often heavily restricted (returns 4-5 results) while Recent returns up to // ~200 and is not treated as "searching". func (c *Client) FetchRecentParticipants(ctx context.Context, channel *tg.InputChannel) ([]GroupParticipant, int, error) { api, err := c.waitReady(ctx) if err != nil { return nil, 0, err } const pageSize = 200 offset := 0 totalCount := 0 var out []GroupParticipant for { result, err := api.ChannelsGetParticipants(ctx, &tg.ChannelsGetParticipantsRequest{ Channel: channel, Filter: &tg.ChannelParticipantsRecent{}, Offset: offset, Limit: pageSize, Hash: 0, }) if err != nil { return out, totalCount, wrapFloodWait(err) } cp, ok := result.(*tg.ChannelsChannelParticipants) if !ok || len(cp.Users) == 0 { break } if cp.Count > totalCount { totalCount = cp.Count } for _, u := range cp.Users { user, ok := u.(*tg.User) if !ok { continue } p := GroupParticipant{ ID: user.GetID(), IsBot: user.GetBot(), IsPremium: user.GetPremium(), } if un, ok := user.GetUsername(); ok { p.Username = un } if fn, ok := user.GetFirstName(); ok { p.FirstName = fn } if ln, ok := user.GetLastName(); ok { p.LastName = ln } out = append(out, p) } offset += len(cp.Users) if offset >= cp.Count { break } if err := jitterSleep(ctx, 800*time.Millisecond, 1500*time.Millisecond); err != nil { return out, totalCount, err } } return out, totalCount, nil } // JoinChannel makes the current account a member of the given channel/supergroup. // USER_ALREADY_PARTICIPANT is treated as success. FloodWait is wrapped normally. // Side effect: this account becomes visibly a member of the group — make sure // the caller actually wants that (private groups require it to see the member // list, but it leaves a trace in group join/leave activity logs). func (c *Client) JoinChannel(ctx context.Context, ch *tg.InputChannel) error { api, err := c.waitReady(ctx) if err != nil { return err } _, err = api.ChannelsJoinChannel(ctx, ch) if err != nil { if tgerr.Is(err, "USER_ALREADY_PARTICIPANT") { return nil } return wrapFloodWait(err) } return nil } // GetChatParticipantsByID fetches members of a basic (non-supergroup) chat. // Basic chats have no pagination — this returns everyone in one call. func (c *Client) GetChatParticipantsByID(ctx context.Context, chatID int64) ([]GroupParticipant, error) { api, err := c.waitReady(ctx) if err != nil { return nil, err } return c.getChatParticipants(ctx, api, chatID) } // ResolveGroupPeer is a broader resolver than ResolveGroupChannel: it also // returns a basic-chat ID when the target is not a supergroup/channel. // Exactly one of (inputCh, chatID) will be non-zero on success. func (c *Client) ResolveGroupPeer(ctx context.Context, username string) (*tg.InputChannel, *tg.Channel, int64, error) { api, err := c.waitReady(ctx) if err != nil { return nil, nil, 0, err } username = strings.TrimPrefix(username, "@") resolved, err := api.ContactsResolveUsername(ctx, &tg.ContactsResolveUsernameRequest{Username: username}) if err != nil { return nil, nil, 0, wrapFloodWait(err) } for _, ch := range resolved.Chats { if v, ok := ch.(*tg.Channel); ok { accessHash, _ := v.GetAccessHash() return &tg.InputChannel{ChannelID: v.GetID(), AccessHash: accessHash}, v, 0, nil } } if p, ok := resolved.Peer.(*tg.PeerChat); ok { return nil, nil, p.ChatID, nil } return nil, nil, 0, fmt.Errorf("无法解析群组: %s", username) } // FetchParticipantsByQuery runs ChannelParticipantsSearch for one query string, // paginating through all pages. Returns the users surfaced by this query and // the total count reported by TG. On FloodWait, returns a *FloodWaitError. // The caller is responsible for deduping across queries. func (c *Client) FetchParticipantsByQuery(ctx context.Context, channel *tg.InputChannel, query string) ([]GroupParticipant, int, error) { api, err := c.waitReady(ctx) if err != nil { return nil, 0, err } const pageSize = 200 offset := 0 totalCount := 0 var out []GroupParticipant for { result, err := api.ChannelsGetParticipants(ctx, &tg.ChannelsGetParticipantsRequest{ Channel: channel, Filter: &tg.ChannelParticipantsSearch{Q: query}, Offset: offset, Limit: pageSize, Hash: 0, }) if err != nil { return out, totalCount, wrapFloodWait(err) } cp, ok := result.(*tg.ChannelsChannelParticipants) if !ok || len(cp.Users) == 0 { break } if cp.Count > totalCount { totalCount = cp.Count } for _, u := range cp.Users { user, ok := u.(*tg.User) if !ok { continue } p := GroupParticipant{ ID: user.GetID(), IsBot: user.GetBot(), IsPremium: user.GetPremium(), } if un, ok := user.GetUsername(); ok { p.Username = un } if fn, ok := user.GetFirstName(); ok { p.FirstName = fn } if ln, ok := user.GetLastName(); ok { p.LastName = ln } out = append(out, p) } offset += len(cp.Users) if offset >= cp.Count { break } if err := jitterSleep(ctx, 800*time.Millisecond, 1500*time.Millisecond); err != nil { return out, totalCount, err } } return out, totalCount, nil } // GetGroupParticipants 获取群组/超级群组的成员列表(分页拉取全部) func (c *Client) GetGroupParticipants(ctx context.Context, username string) ([]GroupParticipant, error) { api, err := c.waitReady(ctx) if err != nil { return nil, err } username = strings.TrimPrefix(username, "@") // Resolve the channel/group resolved, err := api.ContactsResolveUsername(ctx, &tg.ContactsResolveUsernameRequest{ Username: username, }) if err != nil { return nil, wrapFloodWait(err) } // Find the channel in resolved chats var inputChannel *tg.InputChannel for _, ch := range resolved.Chats { switch v := ch.(type) { case *tg.Channel: accessHash, _ := v.GetAccessHash() inputChannel = &tg.InputChannel{ ChannelID: v.GetID(), AccessHash: accessHash, } } } if inputChannel == nil { // Try as basic chat - get participants via MessagesGetFullChat if p, ok := resolved.Peer.(*tg.PeerChat); ok { return c.getChatParticipants(ctx, api, p.ChatID) } return nil, fmt.Errorf("无法解析群组: %s", username) } // Strategy: use ChannelParticipantsSearch with empty query (returns more than Recent), // then iterate alphabet queries to discover members beyond the 200 limit per query. seen := make(map[int64]bool) var allParticipants []GroupParticipant // Helper to extract users from a page extractUsers := func(cp *tg.ChannelsChannelParticipants) int { added := 0 for _, u := range cp.Users { user, ok := u.(*tg.User) if !ok || seen[user.GetID()] { continue } seen[user.GetID()] = true p := GroupParticipant{ ID: user.GetID(), IsBot: user.GetBot(), IsPremium: user.GetPremium(), } if un, ok := user.GetUsername(); ok { p.Username = un } if fn, ok := user.GetFirstName(); ok { p.FirstName = fn } if ln, ok := user.GetLastName(); ok { p.LastName = ln } allParticipants = append(allParticipants, p) added++ } return added } // Phase 1: Search with empty query (gets up to ~200) totalCount := 0 if err := c.fetchParticipantPages(ctx, api, inputChannel, "", seen, extractUsers, &totalCount); err != nil { if len(allParticipants) > 0 { return allParticipants, err } return nil, err } // Phase 2: If group has more members than we found, search by character sets to discover more. // We pace queries with jitter (2–4s) to avoid looking like a bot scanner and triggering FloodWait. // If FloodWait does hit, stop early and return what we already have — the calling task can // re-attempt later after the account cools down. if totalCount > len(allParticipants) && totalCount <= 10000 { queries := participantSearchQueries() for _, q := range queries { if ctx.Err() != nil { break } if len(allParticipants) >= totalCount { break // already collected everyone visible } beforeCount := len(allParticipants) err := c.fetchParticipantPages(ctx, api, inputChannel, q, seen, extractUsers, nil) if err != nil { if fwe, ok := err.(*FloodWaitError); ok { log.Printf("[tg_client] flood wait %ds during search q=%q for %s; returning %d/%d", fwe.Seconds, q, username, len(allParticipants), totalCount) } else { log.Printf("[tg_client] search q=%q for %s: %v (returning partial)", q, username, err) } break } if len(allParticipants) == beforeCount { continue // no new results; skip sleep and try next query } if err := jitterSleep(ctx, 2*time.Second, 4*time.Second); err != nil { return allParticipants, err } } } log.Printf("[tg_client] fetched %d/%d participants for %s", len(allParticipants), totalCount, username) return allParticipants, nil } // jitterSleep sleeps a random duration in [min, max) while respecting ctx. // Returns ctx.Err() if cancelled. Used to spread out TG API calls and avoid // looking like a deterministic scanner. func jitterSleep(ctx context.Context, min, max time.Duration) error { d := min + time.Duration(rand.Int64N(int64(max-min))) select { case <-ctx.Done(): return ctx.Err() case <-time.After(d): return nil } } // participantSearchQueries returns search queries covering Latin, Cyrillic, Japanese, // Korean, and CJK scripts. TG's ChannelParticipantsSearch does substring matching on // first_name + last_name + username, so more starter-character coverage = more users // surfaced on groups beyond the 200-per-query cap. Total ~150 queries. func participantSearchQueries() []string { queries := make([]string, 0, 170) // Latin a-z for c := 'a'; c <= 'z'; c++ { queries = append(queries, string(c)) } // Digits 0-9 for c := '0'; c <= '9'; c++ { queries = append(queries, string(c)) } // Cyrillic а-я for c := 'а'; c <= 'я'; c++ { queries = append(queries, string(c)) } // Japanese Hiragana — common name-starter syllables queries = append(queries, "あ", "い", "う", "え", "お", "か", "さ", "た", "な", "ま", ) // Korean Hangul — common initial syllables queries = append(queries, "가", "나", "다", "라", "마", "바", "사", "아", "자", "차", "카", "타", "파", "하", ) // CJK: top Chinese surnames (百家姓 high frequency) surnames := []string{ "王", "李", "张", "刘", "陈", "杨", "黄", "赵", "周", "吴", "徐", "孙", "马", "朱", "胡", "林", "何", "高", "郭", "罗", "谢", "宋", "唐", "许", "邓", "梁", "韩", "曹", "彭", "余", "潘", "袁", "蒋", "蔡", "卢", "田", "董", "叶", "程", "姜", } queries = append(queries, surnames...) // CJK: common given-name characters (高频二字名) given := []string{ "伟", "芳", "娜", "秀", "敏", "静", "丽", "强", "磊", "军", "洋", "勇", "艳", "杰", "涛", "明", "超", "霞", "平", "刚", } queries = append(queries, given...) // CJK: common modifiers and city prefixes (covers nicknames/titles) misc := []string{ "大", "小", "新", "老", "中", "天", "金", "一", "龙", "虎", "京", "沪", "深", "广", "杭", "苏", } queries = append(queries, misc...) return queries } // fetchParticipantPages paginates through ChannelParticipantsSearch results. func (c *Client) fetchParticipantPages( ctx context.Context, api *tg.Client, channel *tg.InputChannel, query string, seen map[int64]bool, extractUsers func(*tg.ChannelsChannelParticipants) int, outTotalCount *int, ) error { const pageSize = 200 offset := 0 for { result, err := api.ChannelsGetParticipants(ctx, &tg.ChannelsGetParticipantsRequest{ Channel: channel, Filter: &tg.ChannelParticipantsSearch{Q: query}, Offset: offset, Limit: pageSize, Hash: 0, }) if err != nil { return wrapFloodWait(err) } cp, ok := result.(*tg.ChannelsChannelParticipants) if !ok || len(cp.Users) == 0 { break } if outTotalCount != nil && cp.Count > *outTotalCount { *outTotalCount = cp.Count } added := extractUsers(cp) offset += len(cp.Users) // If no new users were added in this page, stop if added == 0 || offset >= cp.Count { break } // Page interval: jittered to avoid a detectable request cadence. if err := jitterSleep(ctx, 800*time.Millisecond, 1500*time.Millisecond); err != nil { return err } } return nil } // getChatParticipants 获取普通群组的成员 func (c *Client) getChatParticipants(ctx context.Context, api *tg.Client, chatID int64) ([]GroupParticipant, error) { full, err := api.MessagesGetFullChat(ctx, chatID) if err != nil { return nil, wrapFloodWait(err) } var participants []GroupParticipant for _, u := range full.Users { user, ok := u.(*tg.User) if !ok { continue } p := GroupParticipant{ ID: user.GetID(), IsBot: user.GetBot(), IsPremium: user.GetPremium(), } if un, ok := user.GetUsername(); ok { p.Username = un } if fn, ok := user.GetFirstName(); ok { p.FirstName = fn } if ln, ok := user.GetLastName(); ok { p.LastName = ln } participants = append(participants, p) } return participants, nil } // buildProxyDialer creates a DialFunc that routes connections through the given proxy URL. // Supports socks5://, http://, https:// proxy protocols. func buildProxyDialer(rawURL string) (dcs.DialFunc, error) { u, err := url.Parse(rawURL) if err != nil { return nil, fmt.Errorf("parse proxy URL: %w", err) } switch u.Scheme { case "socks5", "socks5h": var auth *xproxy.Auth if u.User != nil { auth = &xproxy.Auth{User: u.User.Username()} if p, ok := u.User.Password(); ok { auth.Password = p } } dialer, err := xproxy.SOCKS5("tcp", u.Host, auth, xproxy.Direct) if err != nil { return nil, fmt.Errorf("create SOCKS5 dialer: %w", err) } ctxDialer, ok := dialer.(xproxy.ContextDialer) if !ok { return nil, fmt.Errorf("SOCKS5 dialer does not support DialContext") } return ctxDialer.DialContext, nil case "http", "https": // For HTTP proxies, use CONNECT tunneling return func(ctx context.Context, network, addr string) (net.Conn, error) { proxyConn, err := (&net.Dialer{Timeout: 15 * time.Second}).DialContext(ctx, "tcp", u.Host) if err != nil { return nil, fmt.Errorf("connect to proxy: %w", err) } // Set deadline for the CONNECT handshake if deadline, ok := ctx.Deadline(); ok { proxyConn.SetDeadline(deadline) } else { proxyConn.SetDeadline(time.Now().Add(15 * time.Second)) } // Send CONNECT request connectReq := fmt.Sprintf("CONNECT %s HTTP/1.1\r\nHost: %s\r\n", addr, addr) if u.User != nil { pass, _ := u.User.Password() connectReq += fmt.Sprintf("Proxy-Authorization: Basic %s\r\n", encodeBasicAuth(u.User.Username(), pass)) } connectReq += "\r\n" if _, err := proxyConn.Write([]byte(connectReq)); err != nil { proxyConn.Close() return nil, fmt.Errorf("write CONNECT: %w", err) } // Read HTTP response using bufio for proper line parsing br := bufio.NewReader(proxyConn) statusLine, err := br.ReadString('\n') if err != nil { proxyConn.Close() return nil, fmt.Errorf("read CONNECT status: %w", err) } // Parse status code from "HTTP/1.x NNN reason" parts := strings.SplitN(strings.TrimSpace(statusLine), " ", 3) if len(parts) < 2 || parts[1] != "200" { proxyConn.Close() return nil, fmt.Errorf("CONNECT failed: %s", strings.TrimSpace(statusLine)) } // Consume remaining headers until empty line for { line, err := br.ReadString('\n') if err != nil { proxyConn.Close() return nil, fmt.Errorf("read CONNECT headers: %w", err) } if strings.TrimSpace(line) == "" { break } } // Clear deadline — gotd manages its own timeouts proxyConn.SetDeadline(time.Time{}) return proxyConn, nil }, nil default: return nil, fmt.Errorf("unsupported proxy scheme: %s", u.Scheme) } } // encodeBasicAuth returns base64-encoded "user:password" for proxy auth. func encodeBasicAuth(user, password string) string { return base64.StdEncoding.EncodeToString([]byte(user + ":" + password)) } // resolveInputPeer resolves a username to an InputPeer func (c *Client) resolveInputPeer(ctx context.Context, api *tg.Client, username string) (tg.InputPeerClass, error) { resolved, err := api.ContactsResolveUsername(ctx, &tg.ContactsResolveUsernameRequest{ Username: username, }) if err != nil { return nil, wrapFloodWait(err) } switch p := resolved.Peer.(type) { case *tg.PeerChannel: for _, ch := range resolved.Chats { if channel, ok := ch.(*tg.Channel); ok && channel.GetID() == p.ChannelID { accessHash, _ := channel.GetAccessHash() return &tg.InputPeerChannel{ ChannelID: p.ChannelID, AccessHash: accessHash, }, nil } } return &tg.InputPeerChannel{ChannelID: p.ChannelID}, nil case *tg.PeerUser: for _, u := range resolved.Users { if user, ok := u.(*tg.User); ok && user.GetID() == p.UserID { accessHash, _ := user.GetAccessHash() return &tg.InputPeerUser{ UserID: p.UserID, AccessHash: accessHash, }, nil } } return &tg.InputPeerUser{UserID: p.UserID}, nil case *tg.PeerChat: return &tg.InputPeerChat{ChatID: p.ChatID}, nil } return &tg.InputPeerEmpty{}, nil } // extractMessages extracts messages from a MessagesMessagesClass func extractMessages(result tg.MessagesMessagesClass) []Message { var rawMsgs []tg.MessageClass switch v := result.(type) { case *tg.MessagesMessages: rawMsgs = v.Messages case *tg.MessagesMessagesSlice: rawMsgs = v.Messages case *tg.MessagesChannelMessages: rawMsgs = v.Messages case *tg.MessagesMessagesNotModified: return nil } var msgs []Message for _, raw := range rawMsgs { switch m := raw.(type) { case *tg.Message: msg := Message{ ID: m.GetID(), Text: m.GetMessage(), IsService: false, } // Extract forward source channel username if fwd, ok := m.GetFwdFrom(); ok { if fromID, ok := fwd.GetFromID(); ok { if peerCh, ok := fromID.(*tg.PeerChannel); ok { _ = peerCh // We'd need channel map to resolve username; skip for now } } } // Extract t.me links from text msg.Links = tmeRegexp.FindAllString(msg.Text, -1) msgs = append(msgs, msg) case *tg.MessageService: msgs = append(msgs, Message{ ID: m.GetID(), IsService: true, }) } } // Sort by ID ascending sort.Slice(msgs, func(i, j int) bool { return msgs[i].ID < msgs[j].ID }) return msgs } // isFloodWait 检查错误是否是 FloodWait,提取等待时间 func isFloodWait(err error) (bool, int) { if d, ok := tgerr.AsFloodWait(err); ok { return true, int(d.Seconds()) } return false, 0 } // wrapFloodWait wraps a FloodWait error into FloodWaitError func wrapFloodWait(err error) error { if ok, secs := isFloodWait(err); ok { return &FloodWaitError{Seconds: secs} } return err }