client.go 32 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195
  1. package telegram
  2. import (
  3. "bufio"
  4. "context"
  5. "encoding/base64"
  6. "fmt"
  7. "log"
  8. "math/rand/v2"
  9. "net"
  10. "net/url"
  11. "regexp"
  12. "sort"
  13. "strings"
  14. "sync"
  15. "time"
  16. "github.com/gotd/td/session"
  17. "github.com/gotd/td/telegram"
  18. "github.com/gotd/td/telegram/dcs"
  19. "github.com/gotd/td/tg"
  20. "github.com/gotd/td/tgerr"
  21. xproxy "golang.org/x/net/proxy"
  22. )
  23. var tmeRegexp = regexp.MustCompile(`https?://t\.me/[^\s"'<>)\]]+`)
  24. // Client TG 客户端
  25. type Client struct {
  26. account Account
  27. sessionPath string
  28. proxyURL string // SOCKS5/HTTP proxy URL
  29. mu sync.Mutex
  30. tgc *telegram.Client
  31. api *tg.Client
  32. cancel context.CancelFunc
  33. ready chan struct{} // closed when connected
  34. runErr error
  35. }
  36. // New 创建客户端(不连接,只初始化)
  37. func New(account Account) *Client {
  38. return &Client{
  39. account: account,
  40. sessionPath: account.SessionFile,
  41. ready: make(chan struct{}),
  42. }
  43. }
  44. // SetProxy sets the proxy URL for this client's connections.
  45. func (c *Client) SetProxy(proxyURL string) {
  46. c.mu.Lock()
  47. c.proxyURL = proxyURL
  48. c.mu.Unlock()
  49. }
  50. // Connect 连接并认证(从 session 文件恢复)
  51. // session 文件不存在时返回错误(不做交互式登录,session 需要预先生成)
  52. func (c *Client) Connect(ctx context.Context) error {
  53. storage := &session.FileStorage{Path: c.sessionPath}
  54. opts := telegram.Options{
  55. SessionStorage: storage,
  56. NoUpdates: true,
  57. Device: telegram.DeviceConfig{
  58. DeviceModel: c.account.Device,
  59. AppVersion: c.account.AppVersion,
  60. SystemVersion: c.account.SystemVersion,
  61. LangPack: c.account.LangPack,
  62. SystemLangCode: c.account.SystemLangCode,
  63. LangCode: c.account.LangCode,
  64. },
  65. }
  66. // Apply proxy if configured
  67. c.mu.Lock()
  68. proxyURL := c.proxyURL
  69. c.mu.Unlock()
  70. if proxyURL != "" {
  71. dialFunc, err := buildProxyDialer(proxyURL)
  72. if err != nil {
  73. log.Printf("[tg_client] failed to create proxy dialer: %v, connecting without proxy", err)
  74. } else {
  75. opts.Resolver = dcs.Plain(dcs.PlainOptions{Dial: dialFunc})
  76. log.Printf("[tg_client] connecting via proxy: %s", proxyURL)
  77. }
  78. }
  79. client := telegram.NewClient(c.account.AppID, c.account.AppHash, opts)
  80. runCtx, cancel := context.WithCancel(ctx)
  81. c.mu.Lock()
  82. c.tgc = client
  83. c.cancel = cancel
  84. c.ready = make(chan struct{})
  85. c.runErr = nil
  86. readyCh := c.ready
  87. c.mu.Unlock()
  88. errCh := make(chan error, 1)
  89. go func() {
  90. err := client.Run(runCtx, func(ctx context.Context) error {
  91. c.mu.Lock()
  92. c.api = client.API()
  93. close(readyCh)
  94. c.mu.Unlock()
  95. // Block until context is cancelled (Disconnect called)
  96. <-ctx.Done()
  97. return ctx.Err()
  98. })
  99. c.mu.Lock()
  100. c.runErr = err
  101. c.mu.Unlock()
  102. errCh <- err
  103. }()
  104. // Wait for ready or error
  105. select {
  106. case <-readyCh:
  107. return nil
  108. case err := <-errCh:
  109. if err != nil && err != context.Canceled {
  110. return err
  111. }
  112. return nil
  113. case <-ctx.Done():
  114. cancel()
  115. return ctx.Err()
  116. }
  117. }
  118. // Disconnect 断开连接
  119. func (c *Client) Disconnect() {
  120. c.mu.Lock()
  121. cancel := c.cancel
  122. c.mu.Unlock()
  123. if cancel != nil {
  124. cancel()
  125. }
  126. }
  127. // waitReady waits for the client to be connected and returns the api client
  128. func (c *Client) waitReady(ctx context.Context) (*tg.Client, error) {
  129. c.mu.Lock()
  130. readyCh := c.ready
  131. api := c.api
  132. c.mu.Unlock()
  133. if api != nil {
  134. return api, nil
  135. }
  136. select {
  137. case <-readyCh:
  138. c.mu.Lock()
  139. api = c.api
  140. c.mu.Unlock()
  141. return api, nil
  142. case <-ctx.Done():
  143. return nil, ctx.Err()
  144. }
  145. }
  146. // GetChannelInfo 获取频道/用户信息,通过用户名查找
  147. func (c *Client) GetChannelInfo(ctx context.Context, username string) (*ChannelInfo, error) {
  148. api, err := c.waitReady(ctx)
  149. if err != nil {
  150. return nil, err
  151. }
  152. username = strings.TrimPrefix(username, "@")
  153. resolved, err := api.ContactsResolveUsername(ctx, &tg.ContactsResolveUsernameRequest{
  154. Username: username,
  155. })
  156. if err != nil {
  157. return nil, wrapFloodWait(err)
  158. }
  159. info := &ChannelInfo{Username: username}
  160. // Look for channel/chat in the resolved chats
  161. for _, ch := range resolved.Chats {
  162. switch v := ch.(type) {
  163. case *tg.Channel:
  164. title := v.Title
  165. info.Title = title
  166. info.IsChannel = v.GetBroadcast()
  167. info.IsGroup = v.GetMegagroup()
  168. if count, ok := v.GetParticipantsCount(); ok {
  169. info.MemberCount = count
  170. }
  171. // Get full channel info for About
  172. accessHash, hasHash := v.GetAccessHash()
  173. if hasHash {
  174. full, ferr := api.ChannelsGetFullChannel(ctx, &tg.InputChannel{
  175. ChannelID: v.GetID(),
  176. AccessHash: accessHash,
  177. })
  178. if ferr == nil {
  179. if cf, ok := full.FullChat.(*tg.ChannelFull); ok {
  180. info.About = cf.GetAbout()
  181. if count, ok := cf.GetParticipantsCount(); ok && info.MemberCount == 0 {
  182. info.MemberCount = count
  183. }
  184. }
  185. }
  186. }
  187. return info, nil
  188. case *tg.Chat:
  189. info.Title = v.Title
  190. info.IsGroup = true
  191. info.MemberCount = v.ParticipantsCount
  192. return info, nil
  193. }
  194. }
  195. return info, nil
  196. }
  197. // GetMessages 获取频道历史消息
  198. // offsetID: 从哪条消息开始(断点续传)
  199. // limit: 最多取多少条
  200. // 返回的消息按 ID 从小到大排序
  201. func (c *Client) GetMessages(ctx context.Context, username string, offsetID, limit int) ([]Message, error) {
  202. api, err := c.waitReady(ctx)
  203. if err != nil {
  204. return nil, err
  205. }
  206. username = strings.TrimPrefix(username, "@")
  207. peer, err := c.resolveInputPeer(ctx, api, username)
  208. if err != nil {
  209. return nil, err
  210. }
  211. result, err := api.MessagesGetHistory(ctx, &tg.MessagesGetHistoryRequest{
  212. Peer: peer,
  213. OffsetID: offsetID,
  214. Limit: limit,
  215. })
  216. if err != nil {
  217. return nil, wrapFloodWait(err)
  218. }
  219. return extractMessages(result), nil
  220. }
  221. // GetPinnedMessages 获取置顶消息
  222. func (c *Client) GetPinnedMessages(ctx context.Context, username string) ([]Message, error) {
  223. api, err := c.waitReady(ctx)
  224. if err != nil {
  225. return nil, err
  226. }
  227. username = strings.TrimPrefix(username, "@")
  228. peer, err := c.resolveInputPeer(ctx, api, username)
  229. if err != nil {
  230. return nil, err
  231. }
  232. result, err := api.MessagesSearch(ctx, &tg.MessagesSearchRequest{
  233. Peer: peer,
  234. Filter: &tg.InputMessagesFilterPinned{},
  235. Limit: 100,
  236. })
  237. if err != nil {
  238. return nil, wrapFloodWait(err)
  239. }
  240. return extractMessages(result), nil
  241. }
  242. // VerifyUser 验证用户名是否存在,返回用户信息
  243. func (c *Client) VerifyUser(ctx context.Context, username string) (*UserInfo, error) {
  244. api, err := c.waitReady(ctx)
  245. if err != nil {
  246. return nil, err
  247. }
  248. username = strings.TrimPrefix(username, "@")
  249. resolved, err := api.ContactsResolveUsername(ctx, &tg.ContactsResolveUsernameRequest{
  250. Username: username,
  251. })
  252. if err != nil {
  253. if tgerr.Is(err, "USERNAME_NOT_OCCUPIED", "USERNAME_INVALID") {
  254. return &UserInfo{Username: username, Exists: false}, nil
  255. }
  256. return nil, wrapFloodWait(err)
  257. }
  258. // Check if the peer is a user
  259. if _, ok := resolved.Peer.(*tg.PeerUser); ok {
  260. for _, u := range resolved.Users {
  261. if user, ok := u.(*tg.User); ok {
  262. info := &UserInfo{
  263. ID: user.GetID(),
  264. Username: username,
  265. IsBot: user.GetBot(),
  266. IsPremium: user.GetPremium(),
  267. Exists: true,
  268. }
  269. if fn, ok := user.GetFirstName(); ok {
  270. info.FirstName = fn
  271. }
  272. if ln, ok := user.GetLastName(); ok {
  273. info.LastName = ln
  274. }
  275. if status, ok := user.GetStatus(); ok {
  276. if offline, ok := status.(*tg.UserStatusOffline); ok {
  277. t := time.Unix(int64(offline.GetWasOnline()), 0)
  278. info.LastOnline = &t
  279. }
  280. }
  281. return info, nil
  282. }
  283. }
  284. }
  285. // Check if it's a channel or group
  286. for _, ch := range resolved.Chats {
  287. switch v := ch.(type) {
  288. case *tg.Channel:
  289. return &UserInfo{
  290. ID: v.GetID(),
  291. Username: username,
  292. IsChannel: v.GetBroadcast(),
  293. IsGroup: v.GetMegagroup(),
  294. Exists: true,
  295. }, nil
  296. case *tg.Chat:
  297. return &UserInfo{
  298. ID: v.ID,
  299. IsGroup: true,
  300. Exists: true,
  301. }, nil
  302. }
  303. }
  304. return &UserInfo{Username: username, Exists: false}, nil
  305. }
  306. // ResolveGroupChannel looks up a group/channel by username and returns both
  307. // the InputChannel handle (for subsequent API calls) and the raw Channel
  308. // struct (for metadata like title and participant count). Returns
  309. // (nil, nil, error) for basic chats (no InputChannel).
  310. func (c *Client) ResolveGroupChannel(ctx context.Context, username string) (*tg.InputChannel, *tg.Channel, error) {
  311. api, err := c.waitReady(ctx)
  312. if err != nil {
  313. return nil, nil, err
  314. }
  315. username = strings.TrimPrefix(username, "@")
  316. resolved, err := api.ContactsResolveUsername(ctx, &tg.ContactsResolveUsernameRequest{Username: username})
  317. if err != nil {
  318. return nil, nil, wrapFloodWait(err)
  319. }
  320. for _, ch := range resolved.Chats {
  321. if v, ok := ch.(*tg.Channel); ok {
  322. accessHash, _ := v.GetAccessHash()
  323. return &tg.InputChannel{ChannelID: v.GetID(), AccessHash: accessHash}, v, nil
  324. }
  325. }
  326. return nil, nil, fmt.Errorf("无法解析群组为超级群组: %s", username)
  327. }
  328. // FetchMessageSenders paginates channel/group history and returns distinct
  329. // sender information (username + display name + user ID). For active groups
  330. // this often yields far more usernames than participants.search (which is
  331. // heavily restricted for non-admins) — because every message exposes its
  332. // sender, and you can read history of any joined group.
  333. //
  334. // limit is the TOTAL number of messages to scan (paged 100 at a time).
  335. // The result contains only senders that have a public @username set.
  336. func (c *Client) FetchMessageSenders(ctx context.Context, peer tg.InputPeerClass, limit int) ([]GroupParticipant, error) {
  337. api, err := c.waitReady(ctx)
  338. if err != nil {
  339. return nil, err
  340. }
  341. if limit <= 0 {
  342. limit = 500
  343. }
  344. const pageSize = 100
  345. seen := make(map[int64]bool)
  346. var out []GroupParticipant
  347. offsetID := 0
  348. scanned := 0
  349. for scanned < limit {
  350. ps := pageSize
  351. if limit-scanned < ps {
  352. ps = limit - scanned
  353. }
  354. result, err := api.MessagesGetHistory(ctx, &tg.MessagesGetHistoryRequest{
  355. Peer: peer,
  356. OffsetID: offsetID,
  357. Limit: ps,
  358. })
  359. if err != nil {
  360. return out, wrapFloodWait(err)
  361. }
  362. // Build user map from response for quick lookup.
  363. users := make(map[int64]*tg.User)
  364. var msgs []tg.MessageClass
  365. switch v := result.(type) {
  366. case *tg.MessagesMessages:
  367. msgs = v.Messages
  368. for _, u := range v.Users {
  369. if usr, ok := u.(*tg.User); ok {
  370. users[usr.GetID()] = usr
  371. }
  372. }
  373. case *tg.MessagesMessagesSlice:
  374. msgs = v.Messages
  375. for _, u := range v.Users {
  376. if usr, ok := u.(*tg.User); ok {
  377. users[usr.GetID()] = usr
  378. }
  379. }
  380. case *tg.MessagesChannelMessages:
  381. msgs = v.Messages
  382. for _, u := range v.Users {
  383. if usr, ok := u.(*tg.User); ok {
  384. users[usr.GetID()] = usr
  385. }
  386. }
  387. default:
  388. return out, nil
  389. }
  390. if len(msgs) == 0 {
  391. break
  392. }
  393. minID := 0
  394. for _, raw := range msgs {
  395. var fromID int64
  396. var id int
  397. switch m := raw.(type) {
  398. case *tg.Message:
  399. id = m.GetID()
  400. if from, ok := m.GetFromID(); ok {
  401. if pu, ok := from.(*tg.PeerUser); ok {
  402. fromID = pu.UserID
  403. }
  404. }
  405. case *tg.MessageService:
  406. id = m.GetID()
  407. if from, ok := m.GetFromID(); ok {
  408. if pu, ok := from.(*tg.PeerUser); ok {
  409. fromID = pu.UserID
  410. }
  411. }
  412. }
  413. if id > 0 && (minID == 0 || id < minID) {
  414. minID = id
  415. }
  416. if fromID == 0 || seen[fromID] {
  417. continue
  418. }
  419. usr, ok := users[fromID]
  420. if !ok || usr.GetBot() {
  421. continue
  422. }
  423. uname, _ := usr.GetUsername()
  424. if uname == "" {
  425. continue // we only care about @-addressable accounts
  426. }
  427. seen[fromID] = true
  428. p := GroupParticipant{
  429. ID: fromID,
  430. Username: uname,
  431. IsPremium: usr.GetPremium(),
  432. }
  433. if fn, ok := usr.GetFirstName(); ok {
  434. p.FirstName = fn
  435. }
  436. if ln, ok := usr.GetLastName(); ok {
  437. p.LastName = ln
  438. }
  439. out = append(out, p)
  440. }
  441. scanned += len(msgs)
  442. if minID == 0 || minID == offsetID {
  443. break // no progress possible
  444. }
  445. offsetID = minID
  446. if err := jitterSleep(ctx, 1500*time.Millisecond, 3*time.Second); err != nil {
  447. return out, err
  448. }
  449. }
  450. return out, nil
  451. }
  452. // ResolveInputPeer is a public wrapper over resolveInputPeer for use outside
  453. // the client file. Handles @username, channels, chats.
  454. func (c *Client) ResolveInputPeer(ctx context.Context, username string) (tg.InputPeerClass, error) {
  455. api, err := c.waitReady(ctx)
  456. if err != nil {
  457. return nil, err
  458. }
  459. return c.resolveInputPeer(ctx, api, strings.TrimPrefix(username, "@"))
  460. }
  461. // GetFullChannelTotal calls channels.getFullChannel to obtain the authoritative
  462. // participants_count (which may be much larger than what a restricted member
  463. // can see via ChannelParticipantsSearch).
  464. func (c *Client) GetFullChannelTotal(ctx context.Context, ch *tg.InputChannel) (int, error) {
  465. api, err := c.waitReady(ctx)
  466. if err != nil {
  467. return 0, err
  468. }
  469. full, err := api.ChannelsGetFullChannel(ctx, ch)
  470. if err != nil {
  471. return 0, wrapFloodWait(err)
  472. }
  473. cf, ok := full.FullChat.(*tg.ChannelFull)
  474. if !ok {
  475. return 0, fmt.Errorf("unexpected FullChat type")
  476. }
  477. if n, ok := cf.GetParticipantsCount(); ok {
  478. return n, nil
  479. }
  480. return 0, nil
  481. }
  482. // FetchRecentParticipants paginates channels.getParticipants with the Recent
  483. // filter, which returns active participants sorted by recency. This is the
  484. // preferred Phase 1 filter for large groups — empty-string Search filter is
  485. // often heavily restricted (returns 4-5 results) while Recent returns up to
  486. // ~200 and is not treated as "searching".
  487. func (c *Client) FetchRecentParticipants(ctx context.Context, channel *tg.InputChannel) ([]GroupParticipant, int, error) {
  488. api, err := c.waitReady(ctx)
  489. if err != nil {
  490. return nil, 0, err
  491. }
  492. const pageSize = 200
  493. offset := 0
  494. totalCount := 0
  495. var out []GroupParticipant
  496. for {
  497. result, err := api.ChannelsGetParticipants(ctx, &tg.ChannelsGetParticipantsRequest{
  498. Channel: channel,
  499. Filter: &tg.ChannelParticipantsRecent{},
  500. Offset: offset,
  501. Limit: pageSize,
  502. Hash: 0,
  503. })
  504. if err != nil {
  505. return out, totalCount, wrapFloodWait(err)
  506. }
  507. cp, ok := result.(*tg.ChannelsChannelParticipants)
  508. if !ok || len(cp.Users) == 0 {
  509. break
  510. }
  511. if cp.Count > totalCount {
  512. totalCount = cp.Count
  513. }
  514. for _, u := range cp.Users {
  515. user, ok := u.(*tg.User)
  516. if !ok {
  517. continue
  518. }
  519. p := GroupParticipant{
  520. ID: user.GetID(),
  521. IsBot: user.GetBot(),
  522. IsPremium: user.GetPremium(),
  523. }
  524. if un, ok := user.GetUsername(); ok {
  525. p.Username = un
  526. }
  527. if fn, ok := user.GetFirstName(); ok {
  528. p.FirstName = fn
  529. }
  530. if ln, ok := user.GetLastName(); ok {
  531. p.LastName = ln
  532. }
  533. out = append(out, p)
  534. }
  535. offset += len(cp.Users)
  536. if offset >= cp.Count {
  537. break
  538. }
  539. if err := jitterSleep(ctx, 800*time.Millisecond, 1500*time.Millisecond); err != nil {
  540. return out, totalCount, err
  541. }
  542. }
  543. return out, totalCount, nil
  544. }
  545. // JoinChannel makes the current account a member of the given channel/supergroup.
  546. // USER_ALREADY_PARTICIPANT is treated as success. FloodWait is wrapped normally.
  547. // Side effect: this account becomes visibly a member of the group — make sure
  548. // the caller actually wants that (private groups require it to see the member
  549. // list, but it leaves a trace in group join/leave activity logs).
  550. func (c *Client) JoinChannel(ctx context.Context, ch *tg.InputChannel) error {
  551. api, err := c.waitReady(ctx)
  552. if err != nil {
  553. return err
  554. }
  555. _, err = api.ChannelsJoinChannel(ctx, ch)
  556. if err != nil {
  557. if tgerr.Is(err, "USER_ALREADY_PARTICIPANT") {
  558. return nil
  559. }
  560. return wrapFloodWait(err)
  561. }
  562. return nil
  563. }
  564. // GetChatParticipantsByID fetches members of a basic (non-supergroup) chat.
  565. // Basic chats have no pagination — this returns everyone in one call.
  566. func (c *Client) GetChatParticipantsByID(ctx context.Context, chatID int64) ([]GroupParticipant, error) {
  567. api, err := c.waitReady(ctx)
  568. if err != nil {
  569. return nil, err
  570. }
  571. return c.getChatParticipants(ctx, api, chatID)
  572. }
  573. // ResolveGroupPeer is a broader resolver than ResolveGroupChannel: it also
  574. // returns a basic-chat ID when the target is not a supergroup/channel.
  575. // Exactly one of (inputCh, chatID) will be non-zero on success.
  576. func (c *Client) ResolveGroupPeer(ctx context.Context, username string) (*tg.InputChannel, *tg.Channel, int64, error) {
  577. api, err := c.waitReady(ctx)
  578. if err != nil {
  579. return nil, nil, 0, err
  580. }
  581. username = strings.TrimPrefix(username, "@")
  582. resolved, err := api.ContactsResolveUsername(ctx, &tg.ContactsResolveUsernameRequest{Username: username})
  583. if err != nil {
  584. return nil, nil, 0, wrapFloodWait(err)
  585. }
  586. for _, ch := range resolved.Chats {
  587. if v, ok := ch.(*tg.Channel); ok {
  588. accessHash, _ := v.GetAccessHash()
  589. return &tg.InputChannel{ChannelID: v.GetID(), AccessHash: accessHash}, v, 0, nil
  590. }
  591. }
  592. if p, ok := resolved.Peer.(*tg.PeerChat); ok {
  593. return nil, nil, p.ChatID, nil
  594. }
  595. return nil, nil, 0, fmt.Errorf("无法解析群组: %s", username)
  596. }
  597. // FetchParticipantsByQuery runs ChannelParticipantsSearch for one query string,
  598. // paginating through all pages. Returns the users surfaced by this query and
  599. // the total count reported by TG. On FloodWait, returns a *FloodWaitError.
  600. // The caller is responsible for deduping across queries.
  601. func (c *Client) FetchParticipantsByQuery(ctx context.Context, channel *tg.InputChannel, query string) ([]GroupParticipant, int, error) {
  602. api, err := c.waitReady(ctx)
  603. if err != nil {
  604. return nil, 0, err
  605. }
  606. const pageSize = 200
  607. offset := 0
  608. totalCount := 0
  609. var out []GroupParticipant
  610. for {
  611. result, err := api.ChannelsGetParticipants(ctx, &tg.ChannelsGetParticipantsRequest{
  612. Channel: channel,
  613. Filter: &tg.ChannelParticipantsSearch{Q: query},
  614. Offset: offset,
  615. Limit: pageSize,
  616. Hash: 0,
  617. })
  618. if err != nil {
  619. return out, totalCount, wrapFloodWait(err)
  620. }
  621. cp, ok := result.(*tg.ChannelsChannelParticipants)
  622. if !ok || len(cp.Users) == 0 {
  623. break
  624. }
  625. if cp.Count > totalCount {
  626. totalCount = cp.Count
  627. }
  628. for _, u := range cp.Users {
  629. user, ok := u.(*tg.User)
  630. if !ok {
  631. continue
  632. }
  633. p := GroupParticipant{
  634. ID: user.GetID(),
  635. IsBot: user.GetBot(),
  636. IsPremium: user.GetPremium(),
  637. }
  638. if un, ok := user.GetUsername(); ok {
  639. p.Username = un
  640. }
  641. if fn, ok := user.GetFirstName(); ok {
  642. p.FirstName = fn
  643. }
  644. if ln, ok := user.GetLastName(); ok {
  645. p.LastName = ln
  646. }
  647. out = append(out, p)
  648. }
  649. offset += len(cp.Users)
  650. if offset >= cp.Count {
  651. break
  652. }
  653. if err := jitterSleep(ctx, 800*time.Millisecond, 1500*time.Millisecond); err != nil {
  654. return out, totalCount, err
  655. }
  656. }
  657. return out, totalCount, nil
  658. }
  659. // GetGroupParticipants 获取群组/超级群组的成员列表(分页拉取全部)
  660. func (c *Client) GetGroupParticipants(ctx context.Context, username string) ([]GroupParticipant, error) {
  661. api, err := c.waitReady(ctx)
  662. if err != nil {
  663. return nil, err
  664. }
  665. username = strings.TrimPrefix(username, "@")
  666. // Resolve the channel/group
  667. resolved, err := api.ContactsResolveUsername(ctx, &tg.ContactsResolveUsernameRequest{
  668. Username: username,
  669. })
  670. if err != nil {
  671. return nil, wrapFloodWait(err)
  672. }
  673. // Find the channel in resolved chats
  674. var inputChannel *tg.InputChannel
  675. for _, ch := range resolved.Chats {
  676. switch v := ch.(type) {
  677. case *tg.Channel:
  678. accessHash, _ := v.GetAccessHash()
  679. inputChannel = &tg.InputChannel{
  680. ChannelID: v.GetID(),
  681. AccessHash: accessHash,
  682. }
  683. }
  684. }
  685. if inputChannel == nil {
  686. // Try as basic chat - get participants via MessagesGetFullChat
  687. if p, ok := resolved.Peer.(*tg.PeerChat); ok {
  688. return c.getChatParticipants(ctx, api, p.ChatID)
  689. }
  690. return nil, fmt.Errorf("无法解析群组: %s", username)
  691. }
  692. // Strategy: use ChannelParticipantsSearch with empty query (returns more than Recent),
  693. // then iterate alphabet queries to discover members beyond the 200 limit per query.
  694. seen := make(map[int64]bool)
  695. var allParticipants []GroupParticipant
  696. // Helper to extract users from a page
  697. extractUsers := func(cp *tg.ChannelsChannelParticipants) int {
  698. added := 0
  699. for _, u := range cp.Users {
  700. user, ok := u.(*tg.User)
  701. if !ok || seen[user.GetID()] {
  702. continue
  703. }
  704. seen[user.GetID()] = true
  705. p := GroupParticipant{
  706. ID: user.GetID(),
  707. IsBot: user.GetBot(),
  708. IsPremium: user.GetPremium(),
  709. }
  710. if un, ok := user.GetUsername(); ok {
  711. p.Username = un
  712. }
  713. if fn, ok := user.GetFirstName(); ok {
  714. p.FirstName = fn
  715. }
  716. if ln, ok := user.GetLastName(); ok {
  717. p.LastName = ln
  718. }
  719. allParticipants = append(allParticipants, p)
  720. added++
  721. }
  722. return added
  723. }
  724. // Phase 1: Search with empty query (gets up to ~200)
  725. totalCount := 0
  726. if err := c.fetchParticipantPages(ctx, api, inputChannel, "", seen, extractUsers, &totalCount); err != nil {
  727. if len(allParticipants) > 0 {
  728. return allParticipants, err
  729. }
  730. return nil, err
  731. }
  732. // Phase 2: If group has more members than we found, search by character sets to discover more.
  733. // We pace queries with jitter (2–4s) to avoid looking like a bot scanner and triggering FloodWait.
  734. // If FloodWait does hit, stop early and return what we already have — the calling task can
  735. // re-attempt later after the account cools down.
  736. if totalCount > len(allParticipants) && totalCount <= 10000 {
  737. queries := participantSearchQueries()
  738. for _, q := range queries {
  739. if ctx.Err() != nil {
  740. break
  741. }
  742. if len(allParticipants) >= totalCount {
  743. break // already collected everyone visible
  744. }
  745. beforeCount := len(allParticipants)
  746. err := c.fetchParticipantPages(ctx, api, inputChannel, q, seen, extractUsers, nil)
  747. if err != nil {
  748. if fwe, ok := err.(*FloodWaitError); ok {
  749. log.Printf("[tg_client] flood wait %ds during search q=%q for %s; returning %d/%d",
  750. fwe.Seconds, q, username, len(allParticipants), totalCount)
  751. } else {
  752. log.Printf("[tg_client] search q=%q for %s: %v (returning partial)", q, username, err)
  753. }
  754. break
  755. }
  756. if len(allParticipants) == beforeCount {
  757. continue // no new results; skip sleep and try next query
  758. }
  759. if err := jitterSleep(ctx, 2*time.Second, 4*time.Second); err != nil {
  760. return allParticipants, err
  761. }
  762. }
  763. }
  764. log.Printf("[tg_client] fetched %d/%d participants for %s", len(allParticipants), totalCount, username)
  765. return allParticipants, nil
  766. }
  767. // jitterSleep sleeps a random duration in [min, max) while respecting ctx.
  768. // Returns ctx.Err() if cancelled. Used to spread out TG API calls and avoid
  769. // looking like a deterministic scanner.
  770. func jitterSleep(ctx context.Context, min, max time.Duration) error {
  771. d := min + time.Duration(rand.Int64N(int64(max-min)))
  772. select {
  773. case <-ctx.Done():
  774. return ctx.Err()
  775. case <-time.After(d):
  776. return nil
  777. }
  778. }
  779. // participantSearchQueries returns search queries covering Latin, Cyrillic, Japanese,
  780. // Korean, and CJK scripts. TG's ChannelParticipantsSearch does substring matching on
  781. // first_name + last_name + username, so more starter-character coverage = more users
  782. // surfaced on groups beyond the 200-per-query cap. Total ~150 queries.
  783. func participantSearchQueries() []string {
  784. queries := make([]string, 0, 170)
  785. // Latin a-z
  786. for c := 'a'; c <= 'z'; c++ {
  787. queries = append(queries, string(c))
  788. }
  789. // Digits 0-9
  790. for c := '0'; c <= '9'; c++ {
  791. queries = append(queries, string(c))
  792. }
  793. // Cyrillic а-я
  794. for c := 'а'; c <= 'я'; c++ {
  795. queries = append(queries, string(c))
  796. }
  797. // Japanese Hiragana — common name-starter syllables
  798. queries = append(queries,
  799. "あ", "い", "う", "え", "お",
  800. "か", "さ", "た", "な", "ま",
  801. )
  802. // Korean Hangul — common initial syllables
  803. queries = append(queries,
  804. "가", "나", "다", "라", "마", "바", "사", "아", "자", "차",
  805. "카", "타", "파", "하",
  806. )
  807. // CJK: top Chinese surnames (百家姓 high frequency)
  808. surnames := []string{
  809. "王", "李", "张", "刘", "陈", "杨", "黄", "赵", "周", "吴",
  810. "徐", "孙", "马", "朱", "胡", "林", "何", "高", "郭", "罗",
  811. "谢", "宋", "唐", "许", "邓", "梁", "韩", "曹", "彭", "余",
  812. "潘", "袁", "蒋", "蔡", "卢", "田", "董", "叶", "程", "姜",
  813. }
  814. queries = append(queries, surnames...)
  815. // CJK: common given-name characters (高频二字名)
  816. given := []string{
  817. "伟", "芳", "娜", "秀", "敏", "静", "丽", "强", "磊", "军",
  818. "洋", "勇", "艳", "杰", "涛", "明", "超", "霞", "平", "刚",
  819. }
  820. queries = append(queries, given...)
  821. // CJK: common modifiers and city prefixes (covers nicknames/titles)
  822. misc := []string{
  823. "大", "小", "新", "老", "中", "天", "金", "一", "龙", "虎",
  824. "京", "沪", "深", "广", "杭", "苏",
  825. }
  826. queries = append(queries, misc...)
  827. return queries
  828. }
  829. // fetchParticipantPages paginates through ChannelParticipantsSearch results.
  830. func (c *Client) fetchParticipantPages(
  831. ctx context.Context,
  832. api *tg.Client,
  833. channel *tg.InputChannel,
  834. query string,
  835. seen map[int64]bool,
  836. extractUsers func(*tg.ChannelsChannelParticipants) int,
  837. outTotalCount *int,
  838. ) error {
  839. const pageSize = 200
  840. offset := 0
  841. for {
  842. result, err := api.ChannelsGetParticipants(ctx, &tg.ChannelsGetParticipantsRequest{
  843. Channel: channel,
  844. Filter: &tg.ChannelParticipantsSearch{Q: query},
  845. Offset: offset,
  846. Limit: pageSize,
  847. Hash: 0,
  848. })
  849. if err != nil {
  850. return wrapFloodWait(err)
  851. }
  852. cp, ok := result.(*tg.ChannelsChannelParticipants)
  853. if !ok || len(cp.Users) == 0 {
  854. break
  855. }
  856. if outTotalCount != nil && cp.Count > *outTotalCount {
  857. *outTotalCount = cp.Count
  858. }
  859. added := extractUsers(cp)
  860. offset += len(cp.Users)
  861. // If no new users were added in this page, stop
  862. if added == 0 || offset >= cp.Count {
  863. break
  864. }
  865. // Page interval: jittered to avoid a detectable request cadence.
  866. if err := jitterSleep(ctx, 800*time.Millisecond, 1500*time.Millisecond); err != nil {
  867. return err
  868. }
  869. }
  870. return nil
  871. }
  872. // getChatParticipants 获取普通群组的成员
  873. func (c *Client) getChatParticipants(ctx context.Context, api *tg.Client, chatID int64) ([]GroupParticipant, error) {
  874. full, err := api.MessagesGetFullChat(ctx, chatID)
  875. if err != nil {
  876. return nil, wrapFloodWait(err)
  877. }
  878. var participants []GroupParticipant
  879. for _, u := range full.Users {
  880. user, ok := u.(*tg.User)
  881. if !ok {
  882. continue
  883. }
  884. p := GroupParticipant{
  885. ID: user.GetID(),
  886. IsBot: user.GetBot(),
  887. IsPremium: user.GetPremium(),
  888. }
  889. if un, ok := user.GetUsername(); ok {
  890. p.Username = un
  891. }
  892. if fn, ok := user.GetFirstName(); ok {
  893. p.FirstName = fn
  894. }
  895. if ln, ok := user.GetLastName(); ok {
  896. p.LastName = ln
  897. }
  898. participants = append(participants, p)
  899. }
  900. return participants, nil
  901. }
  902. // buildProxyDialer creates a DialFunc that routes connections through the given proxy URL.
  903. // Supports socks5://, http://, https:// proxy protocols.
  904. func buildProxyDialer(rawURL string) (dcs.DialFunc, error) {
  905. u, err := url.Parse(rawURL)
  906. if err != nil {
  907. return nil, fmt.Errorf("parse proxy URL: %w", err)
  908. }
  909. switch u.Scheme {
  910. case "socks5", "socks5h":
  911. var auth *xproxy.Auth
  912. if u.User != nil {
  913. auth = &xproxy.Auth{User: u.User.Username()}
  914. if p, ok := u.User.Password(); ok {
  915. auth.Password = p
  916. }
  917. }
  918. dialer, err := xproxy.SOCKS5("tcp", u.Host, auth, xproxy.Direct)
  919. if err != nil {
  920. return nil, fmt.Errorf("create SOCKS5 dialer: %w", err)
  921. }
  922. ctxDialer, ok := dialer.(xproxy.ContextDialer)
  923. if !ok {
  924. return nil, fmt.Errorf("SOCKS5 dialer does not support DialContext")
  925. }
  926. return ctxDialer.DialContext, nil
  927. case "http", "https":
  928. // For HTTP proxies, use CONNECT tunneling
  929. return func(ctx context.Context, network, addr string) (net.Conn, error) {
  930. proxyConn, err := (&net.Dialer{Timeout: 15 * time.Second}).DialContext(ctx, "tcp", u.Host)
  931. if err != nil {
  932. return nil, fmt.Errorf("connect to proxy: %w", err)
  933. }
  934. // Set deadline for the CONNECT handshake
  935. if deadline, ok := ctx.Deadline(); ok {
  936. proxyConn.SetDeadline(deadline)
  937. } else {
  938. proxyConn.SetDeadline(time.Now().Add(15 * time.Second))
  939. }
  940. // Send CONNECT request
  941. connectReq := fmt.Sprintf("CONNECT %s HTTP/1.1\r\nHost: %s\r\n", addr, addr)
  942. if u.User != nil {
  943. pass, _ := u.User.Password()
  944. connectReq += fmt.Sprintf("Proxy-Authorization: Basic %s\r\n",
  945. encodeBasicAuth(u.User.Username(), pass))
  946. }
  947. connectReq += "\r\n"
  948. if _, err := proxyConn.Write([]byte(connectReq)); err != nil {
  949. proxyConn.Close()
  950. return nil, fmt.Errorf("write CONNECT: %w", err)
  951. }
  952. // Read HTTP response using bufio for proper line parsing
  953. br := bufio.NewReader(proxyConn)
  954. statusLine, err := br.ReadString('\n')
  955. if err != nil {
  956. proxyConn.Close()
  957. return nil, fmt.Errorf("read CONNECT status: %w", err)
  958. }
  959. // Parse status code from "HTTP/1.x NNN reason"
  960. parts := strings.SplitN(strings.TrimSpace(statusLine), " ", 3)
  961. if len(parts) < 2 || parts[1] != "200" {
  962. proxyConn.Close()
  963. return nil, fmt.Errorf("CONNECT failed: %s", strings.TrimSpace(statusLine))
  964. }
  965. // Consume remaining headers until empty line
  966. for {
  967. line, err := br.ReadString('\n')
  968. if err != nil {
  969. proxyConn.Close()
  970. return nil, fmt.Errorf("read CONNECT headers: %w", err)
  971. }
  972. if strings.TrimSpace(line) == "" {
  973. break
  974. }
  975. }
  976. // Clear deadline — gotd manages its own timeouts
  977. proxyConn.SetDeadline(time.Time{})
  978. return proxyConn, nil
  979. }, nil
  980. default:
  981. return nil, fmt.Errorf("unsupported proxy scheme: %s", u.Scheme)
  982. }
  983. }
  984. // encodeBasicAuth returns base64-encoded "user:password" for proxy auth.
  985. func encodeBasicAuth(user, password string) string {
  986. return base64.StdEncoding.EncodeToString([]byte(user + ":" + password))
  987. }
  988. // resolveInputPeer resolves a username to an InputPeer
  989. func (c *Client) resolveInputPeer(ctx context.Context, api *tg.Client, username string) (tg.InputPeerClass, error) {
  990. resolved, err := api.ContactsResolveUsername(ctx, &tg.ContactsResolveUsernameRequest{
  991. Username: username,
  992. })
  993. if err != nil {
  994. return nil, wrapFloodWait(err)
  995. }
  996. switch p := resolved.Peer.(type) {
  997. case *tg.PeerChannel:
  998. for _, ch := range resolved.Chats {
  999. if channel, ok := ch.(*tg.Channel); ok && channel.GetID() == p.ChannelID {
  1000. accessHash, _ := channel.GetAccessHash()
  1001. return &tg.InputPeerChannel{
  1002. ChannelID: p.ChannelID,
  1003. AccessHash: accessHash,
  1004. }, nil
  1005. }
  1006. }
  1007. return &tg.InputPeerChannel{ChannelID: p.ChannelID}, nil
  1008. case *tg.PeerUser:
  1009. for _, u := range resolved.Users {
  1010. if user, ok := u.(*tg.User); ok && user.GetID() == p.UserID {
  1011. accessHash, _ := user.GetAccessHash()
  1012. return &tg.InputPeerUser{
  1013. UserID: p.UserID,
  1014. AccessHash: accessHash,
  1015. }, nil
  1016. }
  1017. }
  1018. return &tg.InputPeerUser{UserID: p.UserID}, nil
  1019. case *tg.PeerChat:
  1020. return &tg.InputPeerChat{ChatID: p.ChatID}, nil
  1021. }
  1022. return &tg.InputPeerEmpty{}, nil
  1023. }
  1024. // extractMessages extracts messages from a MessagesMessagesClass
  1025. func extractMessages(result tg.MessagesMessagesClass) []Message {
  1026. var rawMsgs []tg.MessageClass
  1027. switch v := result.(type) {
  1028. case *tg.MessagesMessages:
  1029. rawMsgs = v.Messages
  1030. case *tg.MessagesMessagesSlice:
  1031. rawMsgs = v.Messages
  1032. case *tg.MessagesChannelMessages:
  1033. rawMsgs = v.Messages
  1034. case *tg.MessagesMessagesNotModified:
  1035. return nil
  1036. }
  1037. var msgs []Message
  1038. for _, raw := range rawMsgs {
  1039. switch m := raw.(type) {
  1040. case *tg.Message:
  1041. msg := Message{
  1042. ID: m.GetID(),
  1043. Text: m.GetMessage(),
  1044. IsService: false,
  1045. }
  1046. // Extract forward source channel username
  1047. if fwd, ok := m.GetFwdFrom(); ok {
  1048. if fromID, ok := fwd.GetFromID(); ok {
  1049. if peerCh, ok := fromID.(*tg.PeerChannel); ok {
  1050. _ = peerCh // We'd need channel map to resolve username; skip for now
  1051. }
  1052. }
  1053. }
  1054. // Extract t.me links from text
  1055. msg.Links = tmeRegexp.FindAllString(msg.Text, -1)
  1056. msgs = append(msgs, msg)
  1057. case *tg.MessageService:
  1058. msgs = append(msgs, Message{
  1059. ID: m.GetID(),
  1060. IsService: true,
  1061. })
  1062. }
  1063. }
  1064. // Sort by ID ascending
  1065. sort.Slice(msgs, func(i, j int) bool {
  1066. return msgs[i].ID < msgs[j].ID
  1067. })
  1068. return msgs
  1069. }
  1070. // isFloodWait 检查错误是否是 FloodWait,提取等待时间
  1071. func isFloodWait(err error) (bool, int) {
  1072. if d, ok := tgerr.AsFloodWait(err); ok {
  1073. return true, int(d.Seconds())
  1074. }
  1075. return false, 0
  1076. }
  1077. // wrapFloodWait wraps a FloodWait error into FloodWaitError
  1078. func wrapFloodWait(err error) error {
  1079. if ok, secs := isFloodWait(err); ok {
  1080. return &FloodWaitError{Seconds: secs}
  1081. }
  1082. return err
  1083. }