client.go 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053
  1. package telegram
  2. import (
  3. "bufio"
  4. "context"
  5. "encoding/base64"
  6. "fmt"
  7. "log"
  8. "math/rand/v2"
  9. "net"
  10. "net/url"
  11. "regexp"
  12. "sort"
  13. "strings"
  14. "sync"
  15. "time"
  16. "github.com/gotd/td/session"
  17. "github.com/gotd/td/telegram"
  18. "github.com/gotd/td/telegram/dcs"
  19. "github.com/gotd/td/tg"
  20. "github.com/gotd/td/tgerr"
  21. xproxy "golang.org/x/net/proxy"
  22. )
  23. var tmeRegexp = regexp.MustCompile(`https?://t\.me/[^\s"'<>)\]]+`)
  24. // Client TG 客户端
  25. type Client struct {
  26. account Account
  27. sessionPath string
  28. proxyURL string // SOCKS5/HTTP proxy URL
  29. mu sync.Mutex
  30. tgc *telegram.Client
  31. api *tg.Client
  32. cancel context.CancelFunc
  33. ready chan struct{} // closed when connected
  34. runErr error
  35. }
  36. // New 创建客户端(不连接,只初始化)
  37. func New(account Account) *Client {
  38. return &Client{
  39. account: account,
  40. sessionPath: account.SessionFile,
  41. ready: make(chan struct{}),
  42. }
  43. }
  44. // SetProxy sets the proxy URL for this client's connections.
  45. func (c *Client) SetProxy(proxyURL string) {
  46. c.mu.Lock()
  47. c.proxyURL = proxyURL
  48. c.mu.Unlock()
  49. }
  50. // Connect 连接并认证(从 session 文件恢复)
  51. // session 文件不存在时返回错误(不做交互式登录,session 需要预先生成)
  52. func (c *Client) Connect(ctx context.Context) error {
  53. storage := &session.FileStorage{Path: c.sessionPath}
  54. opts := telegram.Options{
  55. SessionStorage: storage,
  56. NoUpdates: true,
  57. Device: telegram.DeviceConfig{
  58. DeviceModel: c.account.Device,
  59. AppVersion: c.account.AppVersion,
  60. SystemVersion: c.account.SystemVersion,
  61. LangPack: c.account.LangPack,
  62. SystemLangCode: c.account.SystemLangCode,
  63. LangCode: c.account.LangCode,
  64. },
  65. }
  66. // Apply proxy if configured
  67. c.mu.Lock()
  68. proxyURL := c.proxyURL
  69. c.mu.Unlock()
  70. if proxyURL != "" {
  71. dialFunc, err := buildProxyDialer(proxyURL)
  72. if err != nil {
  73. log.Printf("[tg_client] failed to create proxy dialer: %v, connecting without proxy", err)
  74. } else {
  75. opts.Resolver = dcs.Plain(dcs.PlainOptions{Dial: dialFunc})
  76. log.Printf("[tg_client] connecting via proxy: %s", proxyURL)
  77. }
  78. }
  79. client := telegram.NewClient(c.account.AppID, c.account.AppHash, opts)
  80. runCtx, cancel := context.WithCancel(ctx)
  81. c.mu.Lock()
  82. c.tgc = client
  83. c.cancel = cancel
  84. c.ready = make(chan struct{})
  85. c.runErr = nil
  86. readyCh := c.ready
  87. c.mu.Unlock()
  88. errCh := make(chan error, 1)
  89. go func() {
  90. err := client.Run(runCtx, func(ctx context.Context) error {
  91. c.mu.Lock()
  92. c.api = client.API()
  93. close(readyCh)
  94. c.mu.Unlock()
  95. // Block until context is cancelled (Disconnect called)
  96. <-ctx.Done()
  97. return ctx.Err()
  98. })
  99. c.mu.Lock()
  100. c.runErr = err
  101. c.mu.Unlock()
  102. errCh <- err
  103. }()
  104. // Wait for ready or error
  105. select {
  106. case <-readyCh:
  107. return nil
  108. case err := <-errCh:
  109. if err != nil && err != context.Canceled {
  110. return err
  111. }
  112. return nil
  113. case <-ctx.Done():
  114. cancel()
  115. return ctx.Err()
  116. }
  117. }
  118. // Disconnect 断开连接
  119. func (c *Client) Disconnect() {
  120. c.mu.Lock()
  121. cancel := c.cancel
  122. c.mu.Unlock()
  123. if cancel != nil {
  124. cancel()
  125. }
  126. }
  127. // waitReady waits for the client to be connected and returns the api client
  128. func (c *Client) waitReady(ctx context.Context) (*tg.Client, error) {
  129. c.mu.Lock()
  130. readyCh := c.ready
  131. api := c.api
  132. c.mu.Unlock()
  133. if api != nil {
  134. return api, nil
  135. }
  136. select {
  137. case <-readyCh:
  138. c.mu.Lock()
  139. api = c.api
  140. c.mu.Unlock()
  141. return api, nil
  142. case <-ctx.Done():
  143. return nil, ctx.Err()
  144. }
  145. }
  146. // GetChannelInfo 获取频道/用户信息,通过用户名查找
  147. func (c *Client) GetChannelInfo(ctx context.Context, username string) (*ChannelInfo, error) {
  148. api, err := c.waitReady(ctx)
  149. if err != nil {
  150. return nil, err
  151. }
  152. username = strings.TrimPrefix(username, "@")
  153. resolved, err := api.ContactsResolveUsername(ctx, &tg.ContactsResolveUsernameRequest{
  154. Username: username,
  155. })
  156. if err != nil {
  157. return nil, wrapFloodWait(err)
  158. }
  159. info := &ChannelInfo{Username: username}
  160. // Look for channel/chat in the resolved chats
  161. for _, ch := range resolved.Chats {
  162. switch v := ch.(type) {
  163. case *tg.Channel:
  164. title := v.Title
  165. info.Title = title
  166. info.IsChannel = v.GetBroadcast()
  167. info.IsGroup = v.GetMegagroup()
  168. if count, ok := v.GetParticipantsCount(); ok {
  169. info.MemberCount = count
  170. }
  171. // Get full channel info for About
  172. accessHash, hasHash := v.GetAccessHash()
  173. if hasHash {
  174. full, ferr := api.ChannelsGetFullChannel(ctx, &tg.InputChannel{
  175. ChannelID: v.GetID(),
  176. AccessHash: accessHash,
  177. })
  178. if ferr == nil {
  179. if cf, ok := full.FullChat.(*tg.ChannelFull); ok {
  180. info.About = cf.GetAbout()
  181. if count, ok := cf.GetParticipantsCount(); ok && info.MemberCount == 0 {
  182. info.MemberCount = count
  183. }
  184. }
  185. }
  186. }
  187. return info, nil
  188. case *tg.Chat:
  189. info.Title = v.Title
  190. info.IsGroup = true
  191. info.MemberCount = v.ParticipantsCount
  192. return info, nil
  193. }
  194. }
  195. return info, nil
  196. }
  197. // GetMessages 获取频道历史消息
  198. // offsetID: 从哪条消息开始(断点续传)
  199. // limit: 最多取多少条
  200. // 返回的消息按 ID 从小到大排序
  201. func (c *Client) GetMessages(ctx context.Context, username string, offsetID, limit int) ([]Message, error) {
  202. api, err := c.waitReady(ctx)
  203. if err != nil {
  204. return nil, err
  205. }
  206. username = strings.TrimPrefix(username, "@")
  207. peer, err := c.resolveInputPeer(ctx, api, username)
  208. if err != nil {
  209. return nil, err
  210. }
  211. result, err := api.MessagesGetHistory(ctx, &tg.MessagesGetHistoryRequest{
  212. Peer: peer,
  213. OffsetID: offsetID,
  214. Limit: limit,
  215. })
  216. if err != nil {
  217. return nil, wrapFloodWait(err)
  218. }
  219. return extractMessages(result), nil
  220. }
  221. // GetPinnedMessages 获取置顶消息
  222. func (c *Client) GetPinnedMessages(ctx context.Context, username string) ([]Message, error) {
  223. api, err := c.waitReady(ctx)
  224. if err != nil {
  225. return nil, err
  226. }
  227. username = strings.TrimPrefix(username, "@")
  228. peer, err := c.resolveInputPeer(ctx, api, username)
  229. if err != nil {
  230. return nil, err
  231. }
  232. result, err := api.MessagesSearch(ctx, &tg.MessagesSearchRequest{
  233. Peer: peer,
  234. Filter: &tg.InputMessagesFilterPinned{},
  235. Limit: 100,
  236. })
  237. if err != nil {
  238. return nil, wrapFloodWait(err)
  239. }
  240. return extractMessages(result), nil
  241. }
  242. // VerifyUser 验证用户名是否存在,返回用户信息
  243. func (c *Client) VerifyUser(ctx context.Context, username string) (*UserInfo, error) {
  244. api, err := c.waitReady(ctx)
  245. if err != nil {
  246. return nil, err
  247. }
  248. username = strings.TrimPrefix(username, "@")
  249. resolved, err := api.ContactsResolveUsername(ctx, &tg.ContactsResolveUsernameRequest{
  250. Username: username,
  251. })
  252. if err != nil {
  253. if tgerr.Is(err, "USERNAME_NOT_OCCUPIED", "USERNAME_INVALID") {
  254. return &UserInfo{Username: username, Exists: false}, nil
  255. }
  256. return nil, wrapFloodWait(err)
  257. }
  258. // Check if the peer is a user
  259. if _, ok := resolved.Peer.(*tg.PeerUser); ok {
  260. for _, u := range resolved.Users {
  261. if user, ok := u.(*tg.User); ok {
  262. info := &UserInfo{
  263. ID: user.GetID(),
  264. Username: username,
  265. IsBot: user.GetBot(),
  266. IsPremium: user.GetPremium(),
  267. Exists: true,
  268. }
  269. if fn, ok := user.GetFirstName(); ok {
  270. info.FirstName = fn
  271. }
  272. if ln, ok := user.GetLastName(); ok {
  273. info.LastName = ln
  274. }
  275. if status, ok := user.GetStatus(); ok {
  276. if offline, ok := status.(*tg.UserStatusOffline); ok {
  277. t := time.Unix(int64(offline.GetWasOnline()), 0)
  278. info.LastOnline = &t
  279. }
  280. }
  281. return info, nil
  282. }
  283. }
  284. }
  285. // Check if it's a channel or group
  286. for _, ch := range resolved.Chats {
  287. switch v := ch.(type) {
  288. case *tg.Channel:
  289. return &UserInfo{
  290. ID: v.GetID(),
  291. Username: username,
  292. IsChannel: v.GetBroadcast(),
  293. IsGroup: v.GetMegagroup(),
  294. Exists: true,
  295. }, nil
  296. case *tg.Chat:
  297. return &UserInfo{
  298. ID: v.ID,
  299. IsGroup: true,
  300. Exists: true,
  301. }, nil
  302. }
  303. }
  304. return &UserInfo{Username: username, Exists: false}, nil
  305. }
  306. // ResolveGroupChannel looks up a group/channel by username and returns both
  307. // the InputChannel handle (for subsequent API calls) and the raw Channel
  308. // struct (for metadata like title and participant count). Returns
  309. // (nil, nil, error) for basic chats (no InputChannel).
  310. func (c *Client) ResolveGroupChannel(ctx context.Context, username string) (*tg.InputChannel, *tg.Channel, error) {
  311. api, err := c.waitReady(ctx)
  312. if err != nil {
  313. return nil, nil, err
  314. }
  315. username = strings.TrimPrefix(username, "@")
  316. resolved, err := api.ContactsResolveUsername(ctx, &tg.ContactsResolveUsernameRequest{Username: username})
  317. if err != nil {
  318. return nil, nil, wrapFloodWait(err)
  319. }
  320. for _, ch := range resolved.Chats {
  321. if v, ok := ch.(*tg.Channel); ok {
  322. accessHash, _ := v.GetAccessHash()
  323. return &tg.InputChannel{ChannelID: v.GetID(), AccessHash: accessHash}, v, nil
  324. }
  325. }
  326. return nil, nil, fmt.Errorf("无法解析群组为超级群组: %s", username)
  327. }
  328. // GetFullChannelTotal calls channels.getFullChannel to obtain the authoritative
  329. // participants_count (which may be much larger than what a restricted member
  330. // can see via ChannelParticipantsSearch).
  331. func (c *Client) GetFullChannelTotal(ctx context.Context, ch *tg.InputChannel) (int, error) {
  332. api, err := c.waitReady(ctx)
  333. if err != nil {
  334. return 0, err
  335. }
  336. full, err := api.ChannelsGetFullChannel(ctx, ch)
  337. if err != nil {
  338. return 0, wrapFloodWait(err)
  339. }
  340. cf, ok := full.FullChat.(*tg.ChannelFull)
  341. if !ok {
  342. return 0, fmt.Errorf("unexpected FullChat type")
  343. }
  344. if n, ok := cf.GetParticipantsCount(); ok {
  345. return n, nil
  346. }
  347. return 0, nil
  348. }
  349. // FetchRecentParticipants paginates channels.getParticipants with the Recent
  350. // filter, which returns active participants sorted by recency. This is the
  351. // preferred Phase 1 filter for large groups — empty-string Search filter is
  352. // often heavily restricted (returns 4-5 results) while Recent returns up to
  353. // ~200 and is not treated as "searching".
  354. func (c *Client) FetchRecentParticipants(ctx context.Context, channel *tg.InputChannel) ([]GroupParticipant, int, error) {
  355. api, err := c.waitReady(ctx)
  356. if err != nil {
  357. return nil, 0, err
  358. }
  359. const pageSize = 200
  360. offset := 0
  361. totalCount := 0
  362. var out []GroupParticipant
  363. for {
  364. result, err := api.ChannelsGetParticipants(ctx, &tg.ChannelsGetParticipantsRequest{
  365. Channel: channel,
  366. Filter: &tg.ChannelParticipantsRecent{},
  367. Offset: offset,
  368. Limit: pageSize,
  369. Hash: 0,
  370. })
  371. if err != nil {
  372. return out, totalCount, wrapFloodWait(err)
  373. }
  374. cp, ok := result.(*tg.ChannelsChannelParticipants)
  375. if !ok || len(cp.Users) == 0 {
  376. break
  377. }
  378. if cp.Count > totalCount {
  379. totalCount = cp.Count
  380. }
  381. for _, u := range cp.Users {
  382. user, ok := u.(*tg.User)
  383. if !ok {
  384. continue
  385. }
  386. p := GroupParticipant{
  387. ID: user.GetID(),
  388. IsBot: user.GetBot(),
  389. IsPremium: user.GetPremium(),
  390. }
  391. if un, ok := user.GetUsername(); ok {
  392. p.Username = un
  393. }
  394. if fn, ok := user.GetFirstName(); ok {
  395. p.FirstName = fn
  396. }
  397. if ln, ok := user.GetLastName(); ok {
  398. p.LastName = ln
  399. }
  400. out = append(out, p)
  401. }
  402. offset += len(cp.Users)
  403. if offset >= cp.Count {
  404. break
  405. }
  406. if err := jitterSleep(ctx, 800*time.Millisecond, 1500*time.Millisecond); err != nil {
  407. return out, totalCount, err
  408. }
  409. }
  410. return out, totalCount, nil
  411. }
  412. // JoinChannel makes the current account a member of the given channel/supergroup.
  413. // USER_ALREADY_PARTICIPANT is treated as success. FloodWait is wrapped normally.
  414. // Side effect: this account becomes visibly a member of the group — make sure
  415. // the caller actually wants that (private groups require it to see the member
  416. // list, but it leaves a trace in group join/leave activity logs).
  417. func (c *Client) JoinChannel(ctx context.Context, ch *tg.InputChannel) error {
  418. api, err := c.waitReady(ctx)
  419. if err != nil {
  420. return err
  421. }
  422. _, err = api.ChannelsJoinChannel(ctx, ch)
  423. if err != nil {
  424. if tgerr.Is(err, "USER_ALREADY_PARTICIPANT") {
  425. return nil
  426. }
  427. return wrapFloodWait(err)
  428. }
  429. return nil
  430. }
  431. // GetChatParticipantsByID fetches members of a basic (non-supergroup) chat.
  432. // Basic chats have no pagination — this returns everyone in one call.
  433. func (c *Client) GetChatParticipantsByID(ctx context.Context, chatID int64) ([]GroupParticipant, error) {
  434. api, err := c.waitReady(ctx)
  435. if err != nil {
  436. return nil, err
  437. }
  438. return c.getChatParticipants(ctx, api, chatID)
  439. }
  440. // ResolveGroupPeer is a broader resolver than ResolveGroupChannel: it also
  441. // returns a basic-chat ID when the target is not a supergroup/channel.
  442. // Exactly one of (inputCh, chatID) will be non-zero on success.
  443. func (c *Client) ResolveGroupPeer(ctx context.Context, username string) (*tg.InputChannel, *tg.Channel, int64, error) {
  444. api, err := c.waitReady(ctx)
  445. if err != nil {
  446. return nil, nil, 0, err
  447. }
  448. username = strings.TrimPrefix(username, "@")
  449. resolved, err := api.ContactsResolveUsername(ctx, &tg.ContactsResolveUsernameRequest{Username: username})
  450. if err != nil {
  451. return nil, nil, 0, wrapFloodWait(err)
  452. }
  453. for _, ch := range resolved.Chats {
  454. if v, ok := ch.(*tg.Channel); ok {
  455. accessHash, _ := v.GetAccessHash()
  456. return &tg.InputChannel{ChannelID: v.GetID(), AccessHash: accessHash}, v, 0, nil
  457. }
  458. }
  459. if p, ok := resolved.Peer.(*tg.PeerChat); ok {
  460. return nil, nil, p.ChatID, nil
  461. }
  462. return nil, nil, 0, fmt.Errorf("无法解析群组: %s", username)
  463. }
  464. // FetchParticipantsByQuery runs ChannelParticipantsSearch for one query string,
  465. // paginating through all pages. Returns the users surfaced by this query and
  466. // the total count reported by TG. On FloodWait, returns a *FloodWaitError.
  467. // The caller is responsible for deduping across queries.
  468. func (c *Client) FetchParticipantsByQuery(ctx context.Context, channel *tg.InputChannel, query string) ([]GroupParticipant, int, error) {
  469. api, err := c.waitReady(ctx)
  470. if err != nil {
  471. return nil, 0, err
  472. }
  473. const pageSize = 200
  474. offset := 0
  475. totalCount := 0
  476. var out []GroupParticipant
  477. for {
  478. result, err := api.ChannelsGetParticipants(ctx, &tg.ChannelsGetParticipantsRequest{
  479. Channel: channel,
  480. Filter: &tg.ChannelParticipantsSearch{Q: query},
  481. Offset: offset,
  482. Limit: pageSize,
  483. Hash: 0,
  484. })
  485. if err != nil {
  486. return out, totalCount, wrapFloodWait(err)
  487. }
  488. cp, ok := result.(*tg.ChannelsChannelParticipants)
  489. if !ok || len(cp.Users) == 0 {
  490. break
  491. }
  492. if cp.Count > totalCount {
  493. totalCount = cp.Count
  494. }
  495. for _, u := range cp.Users {
  496. user, ok := u.(*tg.User)
  497. if !ok {
  498. continue
  499. }
  500. p := GroupParticipant{
  501. ID: user.GetID(),
  502. IsBot: user.GetBot(),
  503. IsPremium: user.GetPremium(),
  504. }
  505. if un, ok := user.GetUsername(); ok {
  506. p.Username = un
  507. }
  508. if fn, ok := user.GetFirstName(); ok {
  509. p.FirstName = fn
  510. }
  511. if ln, ok := user.GetLastName(); ok {
  512. p.LastName = ln
  513. }
  514. out = append(out, p)
  515. }
  516. offset += len(cp.Users)
  517. if offset >= cp.Count {
  518. break
  519. }
  520. if err := jitterSleep(ctx, 800*time.Millisecond, 1500*time.Millisecond); err != nil {
  521. return out, totalCount, err
  522. }
  523. }
  524. return out, totalCount, nil
  525. }
  526. // GetGroupParticipants 获取群组/超级群组的成员列表(分页拉取全部)
  527. func (c *Client) GetGroupParticipants(ctx context.Context, username string) ([]GroupParticipant, error) {
  528. api, err := c.waitReady(ctx)
  529. if err != nil {
  530. return nil, err
  531. }
  532. username = strings.TrimPrefix(username, "@")
  533. // Resolve the channel/group
  534. resolved, err := api.ContactsResolveUsername(ctx, &tg.ContactsResolveUsernameRequest{
  535. Username: username,
  536. })
  537. if err != nil {
  538. return nil, wrapFloodWait(err)
  539. }
  540. // Find the channel in resolved chats
  541. var inputChannel *tg.InputChannel
  542. for _, ch := range resolved.Chats {
  543. switch v := ch.(type) {
  544. case *tg.Channel:
  545. accessHash, _ := v.GetAccessHash()
  546. inputChannel = &tg.InputChannel{
  547. ChannelID: v.GetID(),
  548. AccessHash: accessHash,
  549. }
  550. }
  551. }
  552. if inputChannel == nil {
  553. // Try as basic chat - get participants via MessagesGetFullChat
  554. if p, ok := resolved.Peer.(*tg.PeerChat); ok {
  555. return c.getChatParticipants(ctx, api, p.ChatID)
  556. }
  557. return nil, fmt.Errorf("无法解析群组: %s", username)
  558. }
  559. // Strategy: use ChannelParticipantsSearch with empty query (returns more than Recent),
  560. // then iterate alphabet queries to discover members beyond the 200 limit per query.
  561. seen := make(map[int64]bool)
  562. var allParticipants []GroupParticipant
  563. // Helper to extract users from a page
  564. extractUsers := func(cp *tg.ChannelsChannelParticipants) int {
  565. added := 0
  566. for _, u := range cp.Users {
  567. user, ok := u.(*tg.User)
  568. if !ok || seen[user.GetID()] {
  569. continue
  570. }
  571. seen[user.GetID()] = true
  572. p := GroupParticipant{
  573. ID: user.GetID(),
  574. IsBot: user.GetBot(),
  575. IsPremium: user.GetPremium(),
  576. }
  577. if un, ok := user.GetUsername(); ok {
  578. p.Username = un
  579. }
  580. if fn, ok := user.GetFirstName(); ok {
  581. p.FirstName = fn
  582. }
  583. if ln, ok := user.GetLastName(); ok {
  584. p.LastName = ln
  585. }
  586. allParticipants = append(allParticipants, p)
  587. added++
  588. }
  589. return added
  590. }
  591. // Phase 1: Search with empty query (gets up to ~200)
  592. totalCount := 0
  593. if err := c.fetchParticipantPages(ctx, api, inputChannel, "", seen, extractUsers, &totalCount); err != nil {
  594. if len(allParticipants) > 0 {
  595. return allParticipants, err
  596. }
  597. return nil, err
  598. }
  599. // Phase 2: If group has more members than we found, search by character sets to discover more.
  600. // We pace queries with jitter (2–4s) to avoid looking like a bot scanner and triggering FloodWait.
  601. // If FloodWait does hit, stop early and return what we already have — the calling task can
  602. // re-attempt later after the account cools down.
  603. if totalCount > len(allParticipants) && totalCount <= 10000 {
  604. queries := participantSearchQueries()
  605. for _, q := range queries {
  606. if ctx.Err() != nil {
  607. break
  608. }
  609. if len(allParticipants) >= totalCount {
  610. break // already collected everyone visible
  611. }
  612. beforeCount := len(allParticipants)
  613. err := c.fetchParticipantPages(ctx, api, inputChannel, q, seen, extractUsers, nil)
  614. if err != nil {
  615. if fwe, ok := err.(*FloodWaitError); ok {
  616. log.Printf("[tg_client] flood wait %ds during search q=%q for %s; returning %d/%d",
  617. fwe.Seconds, q, username, len(allParticipants), totalCount)
  618. } else {
  619. log.Printf("[tg_client] search q=%q for %s: %v (returning partial)", q, username, err)
  620. }
  621. break
  622. }
  623. if len(allParticipants) == beforeCount {
  624. continue // no new results; skip sleep and try next query
  625. }
  626. if err := jitterSleep(ctx, 2*time.Second, 4*time.Second); err != nil {
  627. return allParticipants, err
  628. }
  629. }
  630. }
  631. log.Printf("[tg_client] fetched %d/%d participants for %s", len(allParticipants), totalCount, username)
  632. return allParticipants, nil
  633. }
  634. // jitterSleep sleeps a random duration in [min, max) while respecting ctx.
  635. // Returns ctx.Err() if cancelled. Used to spread out TG API calls and avoid
  636. // looking like a deterministic scanner.
  637. func jitterSleep(ctx context.Context, min, max time.Duration) error {
  638. d := min + time.Duration(rand.Int64N(int64(max-min)))
  639. select {
  640. case <-ctx.Done():
  641. return ctx.Err()
  642. case <-time.After(d):
  643. return nil
  644. }
  645. }
  646. // participantSearchQueries returns search queries covering Latin, Cyrillic, Japanese,
  647. // Korean, and CJK scripts. TG's ChannelParticipantsSearch does substring matching on
  648. // first_name + last_name + username, so more starter-character coverage = more users
  649. // surfaced on groups beyond the 200-per-query cap. Total ~150 queries.
  650. func participantSearchQueries() []string {
  651. queries := make([]string, 0, 170)
  652. // Latin a-z
  653. for c := 'a'; c <= 'z'; c++ {
  654. queries = append(queries, string(c))
  655. }
  656. // Digits 0-9
  657. for c := '0'; c <= '9'; c++ {
  658. queries = append(queries, string(c))
  659. }
  660. // Cyrillic а-я
  661. for c := 'а'; c <= 'я'; c++ {
  662. queries = append(queries, string(c))
  663. }
  664. // Japanese Hiragana — common name-starter syllables
  665. queries = append(queries,
  666. "あ", "い", "う", "え", "お",
  667. "か", "さ", "た", "な", "ま",
  668. )
  669. // Korean Hangul — common initial syllables
  670. queries = append(queries,
  671. "가", "나", "다", "라", "마", "바", "사", "아", "자", "차",
  672. "카", "타", "파", "하",
  673. )
  674. // CJK: top Chinese surnames (百家姓 high frequency)
  675. surnames := []string{
  676. "王", "李", "张", "刘", "陈", "杨", "黄", "赵", "周", "吴",
  677. "徐", "孙", "马", "朱", "胡", "林", "何", "高", "郭", "罗",
  678. "谢", "宋", "唐", "许", "邓", "梁", "韩", "曹", "彭", "余",
  679. "潘", "袁", "蒋", "蔡", "卢", "田", "董", "叶", "程", "姜",
  680. }
  681. queries = append(queries, surnames...)
  682. // CJK: common given-name characters (高频二字名)
  683. given := []string{
  684. "伟", "芳", "娜", "秀", "敏", "静", "丽", "强", "磊", "军",
  685. "洋", "勇", "艳", "杰", "涛", "明", "超", "霞", "平", "刚",
  686. }
  687. queries = append(queries, given...)
  688. // CJK: common modifiers and city prefixes (covers nicknames/titles)
  689. misc := []string{
  690. "大", "小", "新", "老", "中", "天", "金", "一", "龙", "虎",
  691. "京", "沪", "深", "广", "杭", "苏",
  692. }
  693. queries = append(queries, misc...)
  694. return queries
  695. }
  696. // fetchParticipantPages paginates through ChannelParticipantsSearch results.
  697. func (c *Client) fetchParticipantPages(
  698. ctx context.Context,
  699. api *tg.Client,
  700. channel *tg.InputChannel,
  701. query string,
  702. seen map[int64]bool,
  703. extractUsers func(*tg.ChannelsChannelParticipants) int,
  704. outTotalCount *int,
  705. ) error {
  706. const pageSize = 200
  707. offset := 0
  708. for {
  709. result, err := api.ChannelsGetParticipants(ctx, &tg.ChannelsGetParticipantsRequest{
  710. Channel: channel,
  711. Filter: &tg.ChannelParticipantsSearch{Q: query},
  712. Offset: offset,
  713. Limit: pageSize,
  714. Hash: 0,
  715. })
  716. if err != nil {
  717. return wrapFloodWait(err)
  718. }
  719. cp, ok := result.(*tg.ChannelsChannelParticipants)
  720. if !ok || len(cp.Users) == 0 {
  721. break
  722. }
  723. if outTotalCount != nil && cp.Count > *outTotalCount {
  724. *outTotalCount = cp.Count
  725. }
  726. added := extractUsers(cp)
  727. offset += len(cp.Users)
  728. // If no new users were added in this page, stop
  729. if added == 0 || offset >= cp.Count {
  730. break
  731. }
  732. // Page interval: jittered to avoid a detectable request cadence.
  733. if err := jitterSleep(ctx, 800*time.Millisecond, 1500*time.Millisecond); err != nil {
  734. return err
  735. }
  736. }
  737. return nil
  738. }
  739. // getChatParticipants 获取普通群组的成员
  740. func (c *Client) getChatParticipants(ctx context.Context, api *tg.Client, chatID int64) ([]GroupParticipant, error) {
  741. full, err := api.MessagesGetFullChat(ctx, chatID)
  742. if err != nil {
  743. return nil, wrapFloodWait(err)
  744. }
  745. var participants []GroupParticipant
  746. for _, u := range full.Users {
  747. user, ok := u.(*tg.User)
  748. if !ok {
  749. continue
  750. }
  751. p := GroupParticipant{
  752. ID: user.GetID(),
  753. IsBot: user.GetBot(),
  754. IsPremium: user.GetPremium(),
  755. }
  756. if un, ok := user.GetUsername(); ok {
  757. p.Username = un
  758. }
  759. if fn, ok := user.GetFirstName(); ok {
  760. p.FirstName = fn
  761. }
  762. if ln, ok := user.GetLastName(); ok {
  763. p.LastName = ln
  764. }
  765. participants = append(participants, p)
  766. }
  767. return participants, nil
  768. }
  769. // buildProxyDialer creates a DialFunc that routes connections through the given proxy URL.
  770. // Supports socks5://, http://, https:// proxy protocols.
  771. func buildProxyDialer(rawURL string) (dcs.DialFunc, error) {
  772. u, err := url.Parse(rawURL)
  773. if err != nil {
  774. return nil, fmt.Errorf("parse proxy URL: %w", err)
  775. }
  776. switch u.Scheme {
  777. case "socks5", "socks5h":
  778. var auth *xproxy.Auth
  779. if u.User != nil {
  780. auth = &xproxy.Auth{User: u.User.Username()}
  781. if p, ok := u.User.Password(); ok {
  782. auth.Password = p
  783. }
  784. }
  785. dialer, err := xproxy.SOCKS5("tcp", u.Host, auth, xproxy.Direct)
  786. if err != nil {
  787. return nil, fmt.Errorf("create SOCKS5 dialer: %w", err)
  788. }
  789. ctxDialer, ok := dialer.(xproxy.ContextDialer)
  790. if !ok {
  791. return nil, fmt.Errorf("SOCKS5 dialer does not support DialContext")
  792. }
  793. return ctxDialer.DialContext, nil
  794. case "http", "https":
  795. // For HTTP proxies, use CONNECT tunneling
  796. return func(ctx context.Context, network, addr string) (net.Conn, error) {
  797. proxyConn, err := (&net.Dialer{Timeout: 15 * time.Second}).DialContext(ctx, "tcp", u.Host)
  798. if err != nil {
  799. return nil, fmt.Errorf("connect to proxy: %w", err)
  800. }
  801. // Set deadline for the CONNECT handshake
  802. if deadline, ok := ctx.Deadline(); ok {
  803. proxyConn.SetDeadline(deadline)
  804. } else {
  805. proxyConn.SetDeadline(time.Now().Add(15 * time.Second))
  806. }
  807. // Send CONNECT request
  808. connectReq := fmt.Sprintf("CONNECT %s HTTP/1.1\r\nHost: %s\r\n", addr, addr)
  809. if u.User != nil {
  810. pass, _ := u.User.Password()
  811. connectReq += fmt.Sprintf("Proxy-Authorization: Basic %s\r\n",
  812. encodeBasicAuth(u.User.Username(), pass))
  813. }
  814. connectReq += "\r\n"
  815. if _, err := proxyConn.Write([]byte(connectReq)); err != nil {
  816. proxyConn.Close()
  817. return nil, fmt.Errorf("write CONNECT: %w", err)
  818. }
  819. // Read HTTP response using bufio for proper line parsing
  820. br := bufio.NewReader(proxyConn)
  821. statusLine, err := br.ReadString('\n')
  822. if err != nil {
  823. proxyConn.Close()
  824. return nil, fmt.Errorf("read CONNECT status: %w", err)
  825. }
  826. // Parse status code from "HTTP/1.x NNN reason"
  827. parts := strings.SplitN(strings.TrimSpace(statusLine), " ", 3)
  828. if len(parts) < 2 || parts[1] != "200" {
  829. proxyConn.Close()
  830. return nil, fmt.Errorf("CONNECT failed: %s", strings.TrimSpace(statusLine))
  831. }
  832. // Consume remaining headers until empty line
  833. for {
  834. line, err := br.ReadString('\n')
  835. if err != nil {
  836. proxyConn.Close()
  837. return nil, fmt.Errorf("read CONNECT headers: %w", err)
  838. }
  839. if strings.TrimSpace(line) == "" {
  840. break
  841. }
  842. }
  843. // Clear deadline — gotd manages its own timeouts
  844. proxyConn.SetDeadline(time.Time{})
  845. return proxyConn, nil
  846. }, nil
  847. default:
  848. return nil, fmt.Errorf("unsupported proxy scheme: %s", u.Scheme)
  849. }
  850. }
  851. // encodeBasicAuth returns base64-encoded "user:password" for proxy auth.
  852. func encodeBasicAuth(user, password string) string {
  853. return base64.StdEncoding.EncodeToString([]byte(user + ":" + password))
  854. }
  855. // resolveInputPeer resolves a username to an InputPeer
  856. func (c *Client) resolveInputPeer(ctx context.Context, api *tg.Client, username string) (tg.InputPeerClass, error) {
  857. resolved, err := api.ContactsResolveUsername(ctx, &tg.ContactsResolveUsernameRequest{
  858. Username: username,
  859. })
  860. if err != nil {
  861. return nil, wrapFloodWait(err)
  862. }
  863. switch p := resolved.Peer.(type) {
  864. case *tg.PeerChannel:
  865. for _, ch := range resolved.Chats {
  866. if channel, ok := ch.(*tg.Channel); ok && channel.GetID() == p.ChannelID {
  867. accessHash, _ := channel.GetAccessHash()
  868. return &tg.InputPeerChannel{
  869. ChannelID: p.ChannelID,
  870. AccessHash: accessHash,
  871. }, nil
  872. }
  873. }
  874. return &tg.InputPeerChannel{ChannelID: p.ChannelID}, nil
  875. case *tg.PeerUser:
  876. for _, u := range resolved.Users {
  877. if user, ok := u.(*tg.User); ok && user.GetID() == p.UserID {
  878. accessHash, _ := user.GetAccessHash()
  879. return &tg.InputPeerUser{
  880. UserID: p.UserID,
  881. AccessHash: accessHash,
  882. }, nil
  883. }
  884. }
  885. return &tg.InputPeerUser{UserID: p.UserID}, nil
  886. case *tg.PeerChat:
  887. return &tg.InputPeerChat{ChatID: p.ChatID}, nil
  888. }
  889. return &tg.InputPeerEmpty{}, nil
  890. }
  891. // extractMessages extracts messages from a MessagesMessagesClass
  892. func extractMessages(result tg.MessagesMessagesClass) []Message {
  893. var rawMsgs []tg.MessageClass
  894. switch v := result.(type) {
  895. case *tg.MessagesMessages:
  896. rawMsgs = v.Messages
  897. case *tg.MessagesMessagesSlice:
  898. rawMsgs = v.Messages
  899. case *tg.MessagesChannelMessages:
  900. rawMsgs = v.Messages
  901. case *tg.MessagesMessagesNotModified:
  902. return nil
  903. }
  904. var msgs []Message
  905. for _, raw := range rawMsgs {
  906. switch m := raw.(type) {
  907. case *tg.Message:
  908. msg := Message{
  909. ID: m.GetID(),
  910. Text: m.GetMessage(),
  911. IsService: false,
  912. }
  913. // Extract forward source channel username
  914. if fwd, ok := m.GetFwdFrom(); ok {
  915. if fromID, ok := fwd.GetFromID(); ok {
  916. if peerCh, ok := fromID.(*tg.PeerChannel); ok {
  917. _ = peerCh // We'd need channel map to resolve username; skip for now
  918. }
  919. }
  920. }
  921. // Extract t.me links from text
  922. msg.Links = tmeRegexp.FindAllString(msg.Text, -1)
  923. msgs = append(msgs, msg)
  924. case *tg.MessageService:
  925. msgs = append(msgs, Message{
  926. ID: m.GetID(),
  927. IsService: true,
  928. })
  929. }
  930. }
  931. // Sort by ID ascending
  932. sort.Slice(msgs, func(i, j int) bool {
  933. return msgs[i].ID < msgs[j].ID
  934. })
  935. return msgs
  936. }
  937. // isFloodWait 检查错误是否是 FloodWait,提取等待时间
  938. func isFloodWait(err error) (bool, int) {
  939. if d, ok := tgerr.AsFloodWait(err); ok {
  940. return true, int(d.Seconds())
  941. }
  942. return false, 0
  943. }
  944. // wrapFloodWait wraps a FloodWait error into FloodWaitError
  945. func wrapFloodWait(err error) error {
  946. if ok, secs := isFloodWait(err); ok {
  947. return &FloodWaitError{Seconds: secs}
  948. }
  949. return err
  950. }