| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195 |
- package telegram
- import (
- "bufio"
- "context"
- "encoding/base64"
- "fmt"
- "log"
- "math/rand/v2"
- "net"
- "net/url"
- "regexp"
- "sort"
- "strings"
- "sync"
- "time"
- "github.com/gotd/td/session"
- "github.com/gotd/td/telegram"
- "github.com/gotd/td/telegram/dcs"
- "github.com/gotd/td/tg"
- "github.com/gotd/td/tgerr"
- xproxy "golang.org/x/net/proxy"
- )
- var tmeRegexp = regexp.MustCompile(`https?://t\.me/[^\s"'<>)\]]+`)
- // Client TG 客户端
- type Client struct {
- account Account
- sessionPath string
- proxyURL string // SOCKS5/HTTP proxy URL
- mu sync.Mutex
- tgc *telegram.Client
- api *tg.Client
- cancel context.CancelFunc
- ready chan struct{} // closed when connected
- runErr error
- }
- // New 创建客户端(不连接,只初始化)
- func New(account Account) *Client {
- return &Client{
- account: account,
- sessionPath: account.SessionFile,
- ready: make(chan struct{}),
- }
- }
- // SetProxy sets the proxy URL for this client's connections.
- func (c *Client) SetProxy(proxyURL string) {
- c.mu.Lock()
- c.proxyURL = proxyURL
- c.mu.Unlock()
- }
- // Connect 连接并认证(从 session 文件恢复)
- // session 文件不存在时返回错误(不做交互式登录,session 需要预先生成)
- func (c *Client) Connect(ctx context.Context) error {
- storage := &session.FileStorage{Path: c.sessionPath}
- opts := telegram.Options{
- SessionStorage: storage,
- NoUpdates: true,
- Device: telegram.DeviceConfig{
- DeviceModel: c.account.Device,
- AppVersion: c.account.AppVersion,
- SystemVersion: c.account.SystemVersion,
- LangPack: c.account.LangPack,
- SystemLangCode: c.account.SystemLangCode,
- LangCode: c.account.LangCode,
- },
- }
- // Apply proxy if configured
- c.mu.Lock()
- proxyURL := c.proxyURL
- c.mu.Unlock()
- if proxyURL != "" {
- dialFunc, err := buildProxyDialer(proxyURL)
- if err != nil {
- log.Printf("[tg_client] failed to create proxy dialer: %v, connecting without proxy", err)
- } else {
- opts.Resolver = dcs.Plain(dcs.PlainOptions{Dial: dialFunc})
- log.Printf("[tg_client] connecting via proxy: %s", proxyURL)
- }
- }
- client := telegram.NewClient(c.account.AppID, c.account.AppHash, opts)
- runCtx, cancel := context.WithCancel(ctx)
- c.mu.Lock()
- c.tgc = client
- c.cancel = cancel
- c.ready = make(chan struct{})
- c.runErr = nil
- readyCh := c.ready
- c.mu.Unlock()
- errCh := make(chan error, 1)
- go func() {
- err := client.Run(runCtx, func(ctx context.Context) error {
- c.mu.Lock()
- c.api = client.API()
- close(readyCh)
- c.mu.Unlock()
- // Block until context is cancelled (Disconnect called)
- <-ctx.Done()
- return ctx.Err()
- })
- c.mu.Lock()
- c.runErr = err
- c.mu.Unlock()
- errCh <- err
- }()
- // Wait for ready or error
- select {
- case <-readyCh:
- return nil
- case err := <-errCh:
- if err != nil && err != context.Canceled {
- return err
- }
- return nil
- case <-ctx.Done():
- cancel()
- return ctx.Err()
- }
- }
- // Disconnect 断开连接
- func (c *Client) Disconnect() {
- c.mu.Lock()
- cancel := c.cancel
- c.mu.Unlock()
- if cancel != nil {
- cancel()
- }
- }
- // waitReady waits for the client to be connected and returns the api client
- func (c *Client) waitReady(ctx context.Context) (*tg.Client, error) {
- c.mu.Lock()
- readyCh := c.ready
- api := c.api
- c.mu.Unlock()
- if api != nil {
- return api, nil
- }
- select {
- case <-readyCh:
- c.mu.Lock()
- api = c.api
- c.mu.Unlock()
- return api, nil
- case <-ctx.Done():
- return nil, ctx.Err()
- }
- }
- // GetChannelInfo 获取频道/用户信息,通过用户名查找
- func (c *Client) GetChannelInfo(ctx context.Context, username string) (*ChannelInfo, error) {
- api, err := c.waitReady(ctx)
- if err != nil {
- return nil, err
- }
- username = strings.TrimPrefix(username, "@")
- resolved, err := api.ContactsResolveUsername(ctx, &tg.ContactsResolveUsernameRequest{
- Username: username,
- })
- if err != nil {
- return nil, wrapFloodWait(err)
- }
- info := &ChannelInfo{Username: username}
- // Look for channel/chat in the resolved chats
- for _, ch := range resolved.Chats {
- switch v := ch.(type) {
- case *tg.Channel:
- title := v.Title
- info.Title = title
- info.IsChannel = v.GetBroadcast()
- info.IsGroup = v.GetMegagroup()
- if count, ok := v.GetParticipantsCount(); ok {
- info.MemberCount = count
- }
- // Get full channel info for About
- accessHash, hasHash := v.GetAccessHash()
- if hasHash {
- full, ferr := api.ChannelsGetFullChannel(ctx, &tg.InputChannel{
- ChannelID: v.GetID(),
- AccessHash: accessHash,
- })
- if ferr == nil {
- if cf, ok := full.FullChat.(*tg.ChannelFull); ok {
- info.About = cf.GetAbout()
- if count, ok := cf.GetParticipantsCount(); ok && info.MemberCount == 0 {
- info.MemberCount = count
- }
- }
- }
- }
- return info, nil
- case *tg.Chat:
- info.Title = v.Title
- info.IsGroup = true
- info.MemberCount = v.ParticipantsCount
- return info, nil
- }
- }
- return info, nil
- }
- // GetMessages 获取频道历史消息
- // offsetID: 从哪条消息开始(断点续传)
- // limit: 最多取多少条
- // 返回的消息按 ID 从小到大排序
- func (c *Client) GetMessages(ctx context.Context, username string, offsetID, limit int) ([]Message, error) {
- api, err := c.waitReady(ctx)
- if err != nil {
- return nil, err
- }
- username = strings.TrimPrefix(username, "@")
- peer, err := c.resolveInputPeer(ctx, api, username)
- if err != nil {
- return nil, err
- }
- result, err := api.MessagesGetHistory(ctx, &tg.MessagesGetHistoryRequest{
- Peer: peer,
- OffsetID: offsetID,
- Limit: limit,
- })
- if err != nil {
- return nil, wrapFloodWait(err)
- }
- return extractMessages(result), nil
- }
- // GetPinnedMessages 获取置顶消息
- func (c *Client) GetPinnedMessages(ctx context.Context, username string) ([]Message, error) {
- api, err := c.waitReady(ctx)
- if err != nil {
- return nil, err
- }
- username = strings.TrimPrefix(username, "@")
- peer, err := c.resolveInputPeer(ctx, api, username)
- if err != nil {
- return nil, err
- }
- result, err := api.MessagesSearch(ctx, &tg.MessagesSearchRequest{
- Peer: peer,
- Filter: &tg.InputMessagesFilterPinned{},
- Limit: 100,
- })
- if err != nil {
- return nil, wrapFloodWait(err)
- }
- return extractMessages(result), nil
- }
- // VerifyUser 验证用户名是否存在,返回用户信息
- func (c *Client) VerifyUser(ctx context.Context, username string) (*UserInfo, error) {
- api, err := c.waitReady(ctx)
- if err != nil {
- return nil, err
- }
- username = strings.TrimPrefix(username, "@")
- resolved, err := api.ContactsResolveUsername(ctx, &tg.ContactsResolveUsernameRequest{
- Username: username,
- })
- if err != nil {
- if tgerr.Is(err, "USERNAME_NOT_OCCUPIED", "USERNAME_INVALID") {
- return &UserInfo{Username: username, Exists: false}, nil
- }
- return nil, wrapFloodWait(err)
- }
- // Check if the peer is a user
- if _, ok := resolved.Peer.(*tg.PeerUser); ok {
- for _, u := range resolved.Users {
- if user, ok := u.(*tg.User); ok {
- info := &UserInfo{
- ID: user.GetID(),
- Username: username,
- IsBot: user.GetBot(),
- IsPremium: user.GetPremium(),
- Exists: true,
- }
- if fn, ok := user.GetFirstName(); ok {
- info.FirstName = fn
- }
- if ln, ok := user.GetLastName(); ok {
- info.LastName = ln
- }
- if status, ok := user.GetStatus(); ok {
- if offline, ok := status.(*tg.UserStatusOffline); ok {
- t := time.Unix(int64(offline.GetWasOnline()), 0)
- info.LastOnline = &t
- }
- }
- return info, nil
- }
- }
- }
- // Check if it's a channel or group
- for _, ch := range resolved.Chats {
- switch v := ch.(type) {
- case *tg.Channel:
- return &UserInfo{
- ID: v.GetID(),
- Username: username,
- IsChannel: v.GetBroadcast(),
- IsGroup: v.GetMegagroup(),
- Exists: true,
- }, nil
- case *tg.Chat:
- return &UserInfo{
- ID: v.ID,
- IsGroup: true,
- Exists: true,
- }, nil
- }
- }
- return &UserInfo{Username: username, Exists: false}, nil
- }
- // ResolveGroupChannel looks up a group/channel by username and returns both
- // the InputChannel handle (for subsequent API calls) and the raw Channel
- // struct (for metadata like title and participant count). Returns
- // (nil, nil, error) for basic chats (no InputChannel).
- func (c *Client) ResolveGroupChannel(ctx context.Context, username string) (*tg.InputChannel, *tg.Channel, error) {
- api, err := c.waitReady(ctx)
- if err != nil {
- return nil, nil, err
- }
- username = strings.TrimPrefix(username, "@")
- resolved, err := api.ContactsResolveUsername(ctx, &tg.ContactsResolveUsernameRequest{Username: username})
- if err != nil {
- return nil, nil, wrapFloodWait(err)
- }
- for _, ch := range resolved.Chats {
- if v, ok := ch.(*tg.Channel); ok {
- accessHash, _ := v.GetAccessHash()
- return &tg.InputChannel{ChannelID: v.GetID(), AccessHash: accessHash}, v, nil
- }
- }
- return nil, nil, fmt.Errorf("无法解析群组为超级群组: %s", username)
- }
- // FetchMessageSenders paginates channel/group history and returns distinct
- // sender information (username + display name + user ID). For active groups
- // this often yields far more usernames than participants.search (which is
- // heavily restricted for non-admins) — because every message exposes its
- // sender, and you can read history of any joined group.
- //
- // limit is the TOTAL number of messages to scan (paged 100 at a time).
- // The result contains only senders that have a public @username set.
- func (c *Client) FetchMessageSenders(ctx context.Context, peer tg.InputPeerClass, limit int) ([]GroupParticipant, error) {
- api, err := c.waitReady(ctx)
- if err != nil {
- return nil, err
- }
- if limit <= 0 {
- limit = 500
- }
- const pageSize = 100
- seen := make(map[int64]bool)
- var out []GroupParticipant
- offsetID := 0
- scanned := 0
- for scanned < limit {
- ps := pageSize
- if limit-scanned < ps {
- ps = limit - scanned
- }
- result, err := api.MessagesGetHistory(ctx, &tg.MessagesGetHistoryRequest{
- Peer: peer,
- OffsetID: offsetID,
- Limit: ps,
- })
- if err != nil {
- return out, wrapFloodWait(err)
- }
- // Build user map from response for quick lookup.
- users := make(map[int64]*tg.User)
- var msgs []tg.MessageClass
- switch v := result.(type) {
- case *tg.MessagesMessages:
- msgs = v.Messages
- for _, u := range v.Users {
- if usr, ok := u.(*tg.User); ok {
- users[usr.GetID()] = usr
- }
- }
- case *tg.MessagesMessagesSlice:
- msgs = v.Messages
- for _, u := range v.Users {
- if usr, ok := u.(*tg.User); ok {
- users[usr.GetID()] = usr
- }
- }
- case *tg.MessagesChannelMessages:
- msgs = v.Messages
- for _, u := range v.Users {
- if usr, ok := u.(*tg.User); ok {
- users[usr.GetID()] = usr
- }
- }
- default:
- return out, nil
- }
- if len(msgs) == 0 {
- break
- }
- minID := 0
- for _, raw := range msgs {
- var fromID int64
- var id int
- switch m := raw.(type) {
- case *tg.Message:
- id = m.GetID()
- if from, ok := m.GetFromID(); ok {
- if pu, ok := from.(*tg.PeerUser); ok {
- fromID = pu.UserID
- }
- }
- case *tg.MessageService:
- id = m.GetID()
- if from, ok := m.GetFromID(); ok {
- if pu, ok := from.(*tg.PeerUser); ok {
- fromID = pu.UserID
- }
- }
- }
- if id > 0 && (minID == 0 || id < minID) {
- minID = id
- }
- if fromID == 0 || seen[fromID] {
- continue
- }
- usr, ok := users[fromID]
- if !ok || usr.GetBot() {
- continue
- }
- uname, _ := usr.GetUsername()
- if uname == "" {
- continue // we only care about @-addressable accounts
- }
- seen[fromID] = true
- p := GroupParticipant{
- ID: fromID,
- Username: uname,
- IsPremium: usr.GetPremium(),
- }
- if fn, ok := usr.GetFirstName(); ok {
- p.FirstName = fn
- }
- if ln, ok := usr.GetLastName(); ok {
- p.LastName = ln
- }
- out = append(out, p)
- }
- scanned += len(msgs)
- if minID == 0 || minID == offsetID {
- break // no progress possible
- }
- offsetID = minID
- if err := jitterSleep(ctx, 1500*time.Millisecond, 3*time.Second); err != nil {
- return out, err
- }
- }
- return out, nil
- }
- // ResolveInputPeer is a public wrapper over resolveInputPeer for use outside
- // the client file. Handles @username, channels, chats.
- func (c *Client) ResolveInputPeer(ctx context.Context, username string) (tg.InputPeerClass, error) {
- api, err := c.waitReady(ctx)
- if err != nil {
- return nil, err
- }
- return c.resolveInputPeer(ctx, api, strings.TrimPrefix(username, "@"))
- }
- // GetFullChannelTotal calls channels.getFullChannel to obtain the authoritative
- // participants_count (which may be much larger than what a restricted member
- // can see via ChannelParticipantsSearch).
- func (c *Client) GetFullChannelTotal(ctx context.Context, ch *tg.InputChannel) (int, error) {
- api, err := c.waitReady(ctx)
- if err != nil {
- return 0, err
- }
- full, err := api.ChannelsGetFullChannel(ctx, ch)
- if err != nil {
- return 0, wrapFloodWait(err)
- }
- cf, ok := full.FullChat.(*tg.ChannelFull)
- if !ok {
- return 0, fmt.Errorf("unexpected FullChat type")
- }
- if n, ok := cf.GetParticipantsCount(); ok {
- return n, nil
- }
- return 0, nil
- }
- // FetchRecentParticipants paginates channels.getParticipants with the Recent
- // filter, which returns active participants sorted by recency. This is the
- // preferred Phase 1 filter for large groups — empty-string Search filter is
- // often heavily restricted (returns 4-5 results) while Recent returns up to
- // ~200 and is not treated as "searching".
- func (c *Client) FetchRecentParticipants(ctx context.Context, channel *tg.InputChannel) ([]GroupParticipant, int, error) {
- api, err := c.waitReady(ctx)
- if err != nil {
- return nil, 0, err
- }
- const pageSize = 200
- offset := 0
- totalCount := 0
- var out []GroupParticipant
- for {
- result, err := api.ChannelsGetParticipants(ctx, &tg.ChannelsGetParticipantsRequest{
- Channel: channel,
- Filter: &tg.ChannelParticipantsRecent{},
- Offset: offset,
- Limit: pageSize,
- Hash: 0,
- })
- if err != nil {
- return out, totalCount, wrapFloodWait(err)
- }
- cp, ok := result.(*tg.ChannelsChannelParticipants)
- if !ok || len(cp.Users) == 0 {
- break
- }
- if cp.Count > totalCount {
- totalCount = cp.Count
- }
- for _, u := range cp.Users {
- user, ok := u.(*tg.User)
- if !ok {
- continue
- }
- p := GroupParticipant{
- ID: user.GetID(),
- IsBot: user.GetBot(),
- IsPremium: user.GetPremium(),
- }
- if un, ok := user.GetUsername(); ok {
- p.Username = un
- }
- if fn, ok := user.GetFirstName(); ok {
- p.FirstName = fn
- }
- if ln, ok := user.GetLastName(); ok {
- p.LastName = ln
- }
- out = append(out, p)
- }
- offset += len(cp.Users)
- if offset >= cp.Count {
- break
- }
- if err := jitterSleep(ctx, 800*time.Millisecond, 1500*time.Millisecond); err != nil {
- return out, totalCount, err
- }
- }
- return out, totalCount, nil
- }
- // JoinChannel makes the current account a member of the given channel/supergroup.
- // USER_ALREADY_PARTICIPANT is treated as success. FloodWait is wrapped normally.
- // Side effect: this account becomes visibly a member of the group — make sure
- // the caller actually wants that (private groups require it to see the member
- // list, but it leaves a trace in group join/leave activity logs).
- func (c *Client) JoinChannel(ctx context.Context, ch *tg.InputChannel) error {
- api, err := c.waitReady(ctx)
- if err != nil {
- return err
- }
- _, err = api.ChannelsJoinChannel(ctx, ch)
- if err != nil {
- if tgerr.Is(err, "USER_ALREADY_PARTICIPANT") {
- return nil
- }
- return wrapFloodWait(err)
- }
- return nil
- }
- // GetChatParticipantsByID fetches members of a basic (non-supergroup) chat.
- // Basic chats have no pagination — this returns everyone in one call.
- func (c *Client) GetChatParticipantsByID(ctx context.Context, chatID int64) ([]GroupParticipant, error) {
- api, err := c.waitReady(ctx)
- if err != nil {
- return nil, err
- }
- return c.getChatParticipants(ctx, api, chatID)
- }
- // ResolveGroupPeer is a broader resolver than ResolveGroupChannel: it also
- // returns a basic-chat ID when the target is not a supergroup/channel.
- // Exactly one of (inputCh, chatID) will be non-zero on success.
- func (c *Client) ResolveGroupPeer(ctx context.Context, username string) (*tg.InputChannel, *tg.Channel, int64, error) {
- api, err := c.waitReady(ctx)
- if err != nil {
- return nil, nil, 0, err
- }
- username = strings.TrimPrefix(username, "@")
- resolved, err := api.ContactsResolveUsername(ctx, &tg.ContactsResolveUsernameRequest{Username: username})
- if err != nil {
- return nil, nil, 0, wrapFloodWait(err)
- }
- for _, ch := range resolved.Chats {
- if v, ok := ch.(*tg.Channel); ok {
- accessHash, _ := v.GetAccessHash()
- return &tg.InputChannel{ChannelID: v.GetID(), AccessHash: accessHash}, v, 0, nil
- }
- }
- if p, ok := resolved.Peer.(*tg.PeerChat); ok {
- return nil, nil, p.ChatID, nil
- }
- return nil, nil, 0, fmt.Errorf("无法解析群组: %s", username)
- }
- // FetchParticipantsByQuery runs ChannelParticipantsSearch for one query string,
- // paginating through all pages. Returns the users surfaced by this query and
- // the total count reported by TG. On FloodWait, returns a *FloodWaitError.
- // The caller is responsible for deduping across queries.
- func (c *Client) FetchParticipantsByQuery(ctx context.Context, channel *tg.InputChannel, query string) ([]GroupParticipant, int, error) {
- api, err := c.waitReady(ctx)
- if err != nil {
- return nil, 0, err
- }
- const pageSize = 200
- offset := 0
- totalCount := 0
- var out []GroupParticipant
- for {
- result, err := api.ChannelsGetParticipants(ctx, &tg.ChannelsGetParticipantsRequest{
- Channel: channel,
- Filter: &tg.ChannelParticipantsSearch{Q: query},
- Offset: offset,
- Limit: pageSize,
- Hash: 0,
- })
- if err != nil {
- return out, totalCount, wrapFloodWait(err)
- }
- cp, ok := result.(*tg.ChannelsChannelParticipants)
- if !ok || len(cp.Users) == 0 {
- break
- }
- if cp.Count > totalCount {
- totalCount = cp.Count
- }
- for _, u := range cp.Users {
- user, ok := u.(*tg.User)
- if !ok {
- continue
- }
- p := GroupParticipant{
- ID: user.GetID(),
- IsBot: user.GetBot(),
- IsPremium: user.GetPremium(),
- }
- if un, ok := user.GetUsername(); ok {
- p.Username = un
- }
- if fn, ok := user.GetFirstName(); ok {
- p.FirstName = fn
- }
- if ln, ok := user.GetLastName(); ok {
- p.LastName = ln
- }
- out = append(out, p)
- }
- offset += len(cp.Users)
- if offset >= cp.Count {
- break
- }
- if err := jitterSleep(ctx, 800*time.Millisecond, 1500*time.Millisecond); err != nil {
- return out, totalCount, err
- }
- }
- return out, totalCount, nil
- }
- // GetGroupParticipants 获取群组/超级群组的成员列表(分页拉取全部)
- func (c *Client) GetGroupParticipants(ctx context.Context, username string) ([]GroupParticipant, error) {
- api, err := c.waitReady(ctx)
- if err != nil {
- return nil, err
- }
- username = strings.TrimPrefix(username, "@")
- // Resolve the channel/group
- resolved, err := api.ContactsResolveUsername(ctx, &tg.ContactsResolveUsernameRequest{
- Username: username,
- })
- if err != nil {
- return nil, wrapFloodWait(err)
- }
- // Find the channel in resolved chats
- var inputChannel *tg.InputChannel
- for _, ch := range resolved.Chats {
- switch v := ch.(type) {
- case *tg.Channel:
- accessHash, _ := v.GetAccessHash()
- inputChannel = &tg.InputChannel{
- ChannelID: v.GetID(),
- AccessHash: accessHash,
- }
- }
- }
- if inputChannel == nil {
- // Try as basic chat - get participants via MessagesGetFullChat
- if p, ok := resolved.Peer.(*tg.PeerChat); ok {
- return c.getChatParticipants(ctx, api, p.ChatID)
- }
- return nil, fmt.Errorf("无法解析群组: %s", username)
- }
- // Strategy: use ChannelParticipantsSearch with empty query (returns more than Recent),
- // then iterate alphabet queries to discover members beyond the 200 limit per query.
- seen := make(map[int64]bool)
- var allParticipants []GroupParticipant
- // Helper to extract users from a page
- extractUsers := func(cp *tg.ChannelsChannelParticipants) int {
- added := 0
- for _, u := range cp.Users {
- user, ok := u.(*tg.User)
- if !ok || seen[user.GetID()] {
- continue
- }
- seen[user.GetID()] = true
- p := GroupParticipant{
- ID: user.GetID(),
- IsBot: user.GetBot(),
- IsPremium: user.GetPremium(),
- }
- if un, ok := user.GetUsername(); ok {
- p.Username = un
- }
- if fn, ok := user.GetFirstName(); ok {
- p.FirstName = fn
- }
- if ln, ok := user.GetLastName(); ok {
- p.LastName = ln
- }
- allParticipants = append(allParticipants, p)
- added++
- }
- return added
- }
- // Phase 1: Search with empty query (gets up to ~200)
- totalCount := 0
- if err := c.fetchParticipantPages(ctx, api, inputChannel, "", seen, extractUsers, &totalCount); err != nil {
- if len(allParticipants) > 0 {
- return allParticipants, err
- }
- return nil, err
- }
- // Phase 2: If group has more members than we found, search by character sets to discover more.
- // We pace queries with jitter (2–4s) to avoid looking like a bot scanner and triggering FloodWait.
- // If FloodWait does hit, stop early and return what we already have — the calling task can
- // re-attempt later after the account cools down.
- if totalCount > len(allParticipants) && totalCount <= 10000 {
- queries := participantSearchQueries()
- for _, q := range queries {
- if ctx.Err() != nil {
- break
- }
- if len(allParticipants) >= totalCount {
- break // already collected everyone visible
- }
- beforeCount := len(allParticipants)
- err := c.fetchParticipantPages(ctx, api, inputChannel, q, seen, extractUsers, nil)
- if err != nil {
- if fwe, ok := err.(*FloodWaitError); ok {
- log.Printf("[tg_client] flood wait %ds during search q=%q for %s; returning %d/%d",
- fwe.Seconds, q, username, len(allParticipants), totalCount)
- } else {
- log.Printf("[tg_client] search q=%q for %s: %v (returning partial)", q, username, err)
- }
- break
- }
- if len(allParticipants) == beforeCount {
- continue // no new results; skip sleep and try next query
- }
- if err := jitterSleep(ctx, 2*time.Second, 4*time.Second); err != nil {
- return allParticipants, err
- }
- }
- }
- log.Printf("[tg_client] fetched %d/%d participants for %s", len(allParticipants), totalCount, username)
- return allParticipants, nil
- }
- // jitterSleep sleeps a random duration in [min, max) while respecting ctx.
- // Returns ctx.Err() if cancelled. Used to spread out TG API calls and avoid
- // looking like a deterministic scanner.
- func jitterSleep(ctx context.Context, min, max time.Duration) error {
- d := min + time.Duration(rand.Int64N(int64(max-min)))
- select {
- case <-ctx.Done():
- return ctx.Err()
- case <-time.After(d):
- return nil
- }
- }
- // participantSearchQueries returns search queries covering Latin, Cyrillic, Japanese,
- // Korean, and CJK scripts. TG's ChannelParticipantsSearch does substring matching on
- // first_name + last_name + username, so more starter-character coverage = more users
- // surfaced on groups beyond the 200-per-query cap. Total ~150 queries.
- func participantSearchQueries() []string {
- queries := make([]string, 0, 170)
- // Latin a-z
- for c := 'a'; c <= 'z'; c++ {
- queries = append(queries, string(c))
- }
- // Digits 0-9
- for c := '0'; c <= '9'; c++ {
- queries = append(queries, string(c))
- }
- // Cyrillic а-я
- for c := 'а'; c <= 'я'; c++ {
- queries = append(queries, string(c))
- }
- // Japanese Hiragana — common name-starter syllables
- queries = append(queries,
- "あ", "い", "う", "え", "お",
- "か", "さ", "た", "な", "ま",
- )
- // Korean Hangul — common initial syllables
- queries = append(queries,
- "가", "나", "다", "라", "마", "바", "사", "아", "자", "차",
- "카", "타", "파", "하",
- )
- // CJK: top Chinese surnames (百家姓 high frequency)
- surnames := []string{
- "王", "李", "张", "刘", "陈", "杨", "黄", "赵", "周", "吴",
- "徐", "孙", "马", "朱", "胡", "林", "何", "高", "郭", "罗",
- "谢", "宋", "唐", "许", "邓", "梁", "韩", "曹", "彭", "余",
- "潘", "袁", "蒋", "蔡", "卢", "田", "董", "叶", "程", "姜",
- }
- queries = append(queries, surnames...)
- // CJK: common given-name characters (高频二字名)
- given := []string{
- "伟", "芳", "娜", "秀", "敏", "静", "丽", "强", "磊", "军",
- "洋", "勇", "艳", "杰", "涛", "明", "超", "霞", "平", "刚",
- }
- queries = append(queries, given...)
- // CJK: common modifiers and city prefixes (covers nicknames/titles)
- misc := []string{
- "大", "小", "新", "老", "中", "天", "金", "一", "龙", "虎",
- "京", "沪", "深", "广", "杭", "苏",
- }
- queries = append(queries, misc...)
- return queries
- }
- // fetchParticipantPages paginates through ChannelParticipantsSearch results.
- func (c *Client) fetchParticipantPages(
- ctx context.Context,
- api *tg.Client,
- channel *tg.InputChannel,
- query string,
- seen map[int64]bool,
- extractUsers func(*tg.ChannelsChannelParticipants) int,
- outTotalCount *int,
- ) error {
- const pageSize = 200
- offset := 0
- for {
- result, err := api.ChannelsGetParticipants(ctx, &tg.ChannelsGetParticipantsRequest{
- Channel: channel,
- Filter: &tg.ChannelParticipantsSearch{Q: query},
- Offset: offset,
- Limit: pageSize,
- Hash: 0,
- })
- if err != nil {
- return wrapFloodWait(err)
- }
- cp, ok := result.(*tg.ChannelsChannelParticipants)
- if !ok || len(cp.Users) == 0 {
- break
- }
- if outTotalCount != nil && cp.Count > *outTotalCount {
- *outTotalCount = cp.Count
- }
- added := extractUsers(cp)
- offset += len(cp.Users)
- // If no new users were added in this page, stop
- if added == 0 || offset >= cp.Count {
- break
- }
- // Page interval: jittered to avoid a detectable request cadence.
- if err := jitterSleep(ctx, 800*time.Millisecond, 1500*time.Millisecond); err != nil {
- return err
- }
- }
- return nil
- }
- // getChatParticipants 获取普通群组的成员
- func (c *Client) getChatParticipants(ctx context.Context, api *tg.Client, chatID int64) ([]GroupParticipant, error) {
- full, err := api.MessagesGetFullChat(ctx, chatID)
- if err != nil {
- return nil, wrapFloodWait(err)
- }
- var participants []GroupParticipant
- for _, u := range full.Users {
- user, ok := u.(*tg.User)
- if !ok {
- continue
- }
- p := GroupParticipant{
- ID: user.GetID(),
- IsBot: user.GetBot(),
- IsPremium: user.GetPremium(),
- }
- if un, ok := user.GetUsername(); ok {
- p.Username = un
- }
- if fn, ok := user.GetFirstName(); ok {
- p.FirstName = fn
- }
- if ln, ok := user.GetLastName(); ok {
- p.LastName = ln
- }
- participants = append(participants, p)
- }
- return participants, nil
- }
- // buildProxyDialer creates a DialFunc that routes connections through the given proxy URL.
- // Supports socks5://, http://, https:// proxy protocols.
- func buildProxyDialer(rawURL string) (dcs.DialFunc, error) {
- u, err := url.Parse(rawURL)
- if err != nil {
- return nil, fmt.Errorf("parse proxy URL: %w", err)
- }
- switch u.Scheme {
- case "socks5", "socks5h":
- var auth *xproxy.Auth
- if u.User != nil {
- auth = &xproxy.Auth{User: u.User.Username()}
- if p, ok := u.User.Password(); ok {
- auth.Password = p
- }
- }
- dialer, err := xproxy.SOCKS5("tcp", u.Host, auth, xproxy.Direct)
- if err != nil {
- return nil, fmt.Errorf("create SOCKS5 dialer: %w", err)
- }
- ctxDialer, ok := dialer.(xproxy.ContextDialer)
- if !ok {
- return nil, fmt.Errorf("SOCKS5 dialer does not support DialContext")
- }
- return ctxDialer.DialContext, nil
- case "http", "https":
- // For HTTP proxies, use CONNECT tunneling
- return func(ctx context.Context, network, addr string) (net.Conn, error) {
- proxyConn, err := (&net.Dialer{Timeout: 15 * time.Second}).DialContext(ctx, "tcp", u.Host)
- if err != nil {
- return nil, fmt.Errorf("connect to proxy: %w", err)
- }
- // Set deadline for the CONNECT handshake
- if deadline, ok := ctx.Deadline(); ok {
- proxyConn.SetDeadline(deadline)
- } else {
- proxyConn.SetDeadline(time.Now().Add(15 * time.Second))
- }
- // Send CONNECT request
- connectReq := fmt.Sprintf("CONNECT %s HTTP/1.1\r\nHost: %s\r\n", addr, addr)
- if u.User != nil {
- pass, _ := u.User.Password()
- connectReq += fmt.Sprintf("Proxy-Authorization: Basic %s\r\n",
- encodeBasicAuth(u.User.Username(), pass))
- }
- connectReq += "\r\n"
- if _, err := proxyConn.Write([]byte(connectReq)); err != nil {
- proxyConn.Close()
- return nil, fmt.Errorf("write CONNECT: %w", err)
- }
- // Read HTTP response using bufio for proper line parsing
- br := bufio.NewReader(proxyConn)
- statusLine, err := br.ReadString('\n')
- if err != nil {
- proxyConn.Close()
- return nil, fmt.Errorf("read CONNECT status: %w", err)
- }
- // Parse status code from "HTTP/1.x NNN reason"
- parts := strings.SplitN(strings.TrimSpace(statusLine), " ", 3)
- if len(parts) < 2 || parts[1] != "200" {
- proxyConn.Close()
- return nil, fmt.Errorf("CONNECT failed: %s", strings.TrimSpace(statusLine))
- }
- // Consume remaining headers until empty line
- for {
- line, err := br.ReadString('\n')
- if err != nil {
- proxyConn.Close()
- return nil, fmt.Errorf("read CONNECT headers: %w", err)
- }
- if strings.TrimSpace(line) == "" {
- break
- }
- }
- // Clear deadline — gotd manages its own timeouts
- proxyConn.SetDeadline(time.Time{})
- return proxyConn, nil
- }, nil
- default:
- return nil, fmt.Errorf("unsupported proxy scheme: %s", u.Scheme)
- }
- }
- // encodeBasicAuth returns base64-encoded "user:password" for proxy auth.
- func encodeBasicAuth(user, password string) string {
- return base64.StdEncoding.EncodeToString([]byte(user + ":" + password))
- }
- // resolveInputPeer resolves a username to an InputPeer
- func (c *Client) resolveInputPeer(ctx context.Context, api *tg.Client, username string) (tg.InputPeerClass, error) {
- resolved, err := api.ContactsResolveUsername(ctx, &tg.ContactsResolveUsernameRequest{
- Username: username,
- })
- if err != nil {
- return nil, wrapFloodWait(err)
- }
- switch p := resolved.Peer.(type) {
- case *tg.PeerChannel:
- for _, ch := range resolved.Chats {
- if channel, ok := ch.(*tg.Channel); ok && channel.GetID() == p.ChannelID {
- accessHash, _ := channel.GetAccessHash()
- return &tg.InputPeerChannel{
- ChannelID: p.ChannelID,
- AccessHash: accessHash,
- }, nil
- }
- }
- return &tg.InputPeerChannel{ChannelID: p.ChannelID}, nil
- case *tg.PeerUser:
- for _, u := range resolved.Users {
- if user, ok := u.(*tg.User); ok && user.GetID() == p.UserID {
- accessHash, _ := user.GetAccessHash()
- return &tg.InputPeerUser{
- UserID: p.UserID,
- AccessHash: accessHash,
- }, nil
- }
- }
- return &tg.InputPeerUser{UserID: p.UserID}, nil
- case *tg.PeerChat:
- return &tg.InputPeerChat{ChatID: p.ChatID}, nil
- }
- return &tg.InputPeerEmpty{}, nil
- }
- // extractMessages extracts messages from a MessagesMessagesClass
- func extractMessages(result tg.MessagesMessagesClass) []Message {
- var rawMsgs []tg.MessageClass
- switch v := result.(type) {
- case *tg.MessagesMessages:
- rawMsgs = v.Messages
- case *tg.MessagesMessagesSlice:
- rawMsgs = v.Messages
- case *tg.MessagesChannelMessages:
- rawMsgs = v.Messages
- case *tg.MessagesMessagesNotModified:
- return nil
- }
- var msgs []Message
- for _, raw := range rawMsgs {
- switch m := raw.(type) {
- case *tg.Message:
- msg := Message{
- ID: m.GetID(),
- Text: m.GetMessage(),
- IsService: false,
- }
- // Extract forward source channel username
- if fwd, ok := m.GetFwdFrom(); ok {
- if fromID, ok := fwd.GetFromID(); ok {
- if peerCh, ok := fromID.(*tg.PeerChannel); ok {
- _ = peerCh // We'd need channel map to resolve username; skip for now
- }
- }
- }
- // Extract t.me links from text
- msg.Links = tmeRegexp.FindAllString(msg.Text, -1)
- msgs = append(msgs, msg)
- case *tg.MessageService:
- msgs = append(msgs, Message{
- ID: m.GetID(),
- IsService: true,
- })
- }
- }
- // Sort by ID ascending
- sort.Slice(msgs, func(i, j int) bool {
- return msgs[i].ID < msgs[j].ID
- })
- return msgs
- }
- // isFloodWait 检查错误是否是 FloodWait,提取等待时间
- func isFloodWait(err error) (bool, int) {
- if d, ok := tgerr.AsFloodWait(err); ok {
- return true, int(d.Seconds())
- }
- return false, 0
- }
- // wrapFloodWait wraps a FloodWait error into FloodWaitError
- func wrapFloodWait(err error) error {
- if ok, secs := isFloodWait(err); ok {
- return &FloodWaitError{Seconds: secs}
- }
- return err
- }
|