client.go 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418
  1. package telegram
  2. import (
  3. "context"
  4. "regexp"
  5. "sort"
  6. "strings"
  7. "sync"
  8. "time"
  9. "github.com/gotd/td/session"
  10. "github.com/gotd/td/telegram"
  11. "github.com/gotd/td/tg"
  12. "github.com/gotd/td/tgerr"
  13. )
  14. var tmeRegexp = regexp.MustCompile(`https?://t\.me/[^\s"'<>)\]]+`)
  15. // Client TG 客户端
  16. type Client struct {
  17. account Account
  18. sessionPath string
  19. mu sync.Mutex
  20. tgc *telegram.Client
  21. api *tg.Client
  22. cancel context.CancelFunc
  23. ready chan struct{} // closed when connected
  24. runErr error
  25. }
  26. // New 创建客户端(不连接,只初始化)
  27. func New(account Account) *Client {
  28. return &Client{
  29. account: account,
  30. sessionPath: account.SessionFile,
  31. ready: make(chan struct{}),
  32. }
  33. }
  34. // Connect 连接并认证(从 session 文件恢复)
  35. // session 文件不存在时返回错误(不做交互式登录,session 需要预先生成)
  36. func (c *Client) Connect(ctx context.Context) error {
  37. storage := &session.FileStorage{Path: c.sessionPath}
  38. opts := telegram.Options{
  39. SessionStorage: storage,
  40. NoUpdates: true,
  41. }
  42. client := telegram.NewClient(c.account.AppID, c.account.AppHash, opts)
  43. runCtx, cancel := context.WithCancel(ctx)
  44. c.mu.Lock()
  45. c.tgc = client
  46. c.cancel = cancel
  47. c.ready = make(chan struct{})
  48. c.runErr = nil
  49. readyCh := c.ready
  50. c.mu.Unlock()
  51. errCh := make(chan error, 1)
  52. go func() {
  53. err := client.Run(runCtx, func(ctx context.Context) error {
  54. c.mu.Lock()
  55. c.api = client.API()
  56. close(readyCh)
  57. c.mu.Unlock()
  58. // Block until context is cancelled (Disconnect called)
  59. <-ctx.Done()
  60. return ctx.Err()
  61. })
  62. c.mu.Lock()
  63. c.runErr = err
  64. c.mu.Unlock()
  65. errCh <- err
  66. }()
  67. // Wait for ready or error
  68. select {
  69. case <-readyCh:
  70. return nil
  71. case err := <-errCh:
  72. if err != nil && err != context.Canceled {
  73. return err
  74. }
  75. return nil
  76. case <-ctx.Done():
  77. cancel()
  78. return ctx.Err()
  79. }
  80. }
  81. // Disconnect 断开连接
  82. func (c *Client) Disconnect() {
  83. c.mu.Lock()
  84. cancel := c.cancel
  85. c.mu.Unlock()
  86. if cancel != nil {
  87. cancel()
  88. }
  89. }
  90. // waitReady waits for the client to be connected and returns the api client
  91. func (c *Client) waitReady(ctx context.Context) (*tg.Client, error) {
  92. c.mu.Lock()
  93. readyCh := c.ready
  94. api := c.api
  95. c.mu.Unlock()
  96. if api != nil {
  97. return api, nil
  98. }
  99. select {
  100. case <-readyCh:
  101. c.mu.Lock()
  102. api = c.api
  103. c.mu.Unlock()
  104. return api, nil
  105. case <-ctx.Done():
  106. return nil, ctx.Err()
  107. }
  108. }
  109. // GetChannelInfo 获取频道/用户信息,通过用户名查找
  110. func (c *Client) GetChannelInfo(ctx context.Context, username string) (*ChannelInfo, error) {
  111. api, err := c.waitReady(ctx)
  112. if err != nil {
  113. return nil, err
  114. }
  115. username = strings.TrimPrefix(username, "@")
  116. resolved, err := api.ContactsResolveUsername(ctx, &tg.ContactsResolveUsernameRequest{
  117. Username: username,
  118. })
  119. if err != nil {
  120. return nil, wrapFloodWait(err)
  121. }
  122. info := &ChannelInfo{Username: username}
  123. // Look for channel/chat in the resolved chats
  124. for _, ch := range resolved.Chats {
  125. switch v := ch.(type) {
  126. case *tg.Channel:
  127. title := v.Title
  128. info.Title = title
  129. info.IsChannel = v.GetBroadcast()
  130. info.IsGroup = v.GetMegagroup()
  131. if count, ok := v.GetParticipantsCount(); ok {
  132. info.MemberCount = count
  133. }
  134. // Get full channel info for About
  135. accessHash, hasHash := v.GetAccessHash()
  136. if hasHash {
  137. full, ferr := api.ChannelsGetFullChannel(ctx, &tg.InputChannel{
  138. ChannelID: v.GetID(),
  139. AccessHash: accessHash,
  140. })
  141. if ferr == nil {
  142. if cf, ok := full.FullChat.(*tg.ChannelFull); ok {
  143. info.About = cf.GetAbout()
  144. if count, ok := cf.GetParticipantsCount(); ok && info.MemberCount == 0 {
  145. info.MemberCount = count
  146. }
  147. }
  148. }
  149. }
  150. return info, nil
  151. case *tg.Chat:
  152. info.Title = v.Title
  153. info.IsGroup = true
  154. info.MemberCount = v.ParticipantsCount
  155. return info, nil
  156. }
  157. }
  158. return info, nil
  159. }
  160. // GetMessages 获取频道历史消息
  161. // offsetID: 从哪条消息开始(断点续传)
  162. // limit: 最多取多少条
  163. // 返回的消息按 ID 从小到大排序
  164. func (c *Client) GetMessages(ctx context.Context, username string, offsetID, limit int) ([]Message, error) {
  165. api, err := c.waitReady(ctx)
  166. if err != nil {
  167. return nil, err
  168. }
  169. username = strings.TrimPrefix(username, "@")
  170. peer, err := c.resolveInputPeer(ctx, api, username)
  171. if err != nil {
  172. return nil, err
  173. }
  174. result, err := api.MessagesGetHistory(ctx, &tg.MessagesGetHistoryRequest{
  175. Peer: peer,
  176. OffsetID: offsetID,
  177. Limit: limit,
  178. })
  179. if err != nil {
  180. return nil, wrapFloodWait(err)
  181. }
  182. return extractMessages(result), nil
  183. }
  184. // GetPinnedMessages 获取置顶消息
  185. func (c *Client) GetPinnedMessages(ctx context.Context, username string) ([]Message, error) {
  186. api, err := c.waitReady(ctx)
  187. if err != nil {
  188. return nil, err
  189. }
  190. username = strings.TrimPrefix(username, "@")
  191. peer, err := c.resolveInputPeer(ctx, api, username)
  192. if err != nil {
  193. return nil, err
  194. }
  195. result, err := api.MessagesSearch(ctx, &tg.MessagesSearchRequest{
  196. Peer: peer,
  197. Filter: &tg.InputMessagesFilterPinned{},
  198. Limit: 100,
  199. })
  200. if err != nil {
  201. return nil, wrapFloodWait(err)
  202. }
  203. return extractMessages(result), nil
  204. }
  205. // VerifyUser 验证用户名是否存在,返回用户信息
  206. func (c *Client) VerifyUser(ctx context.Context, username string) (*UserInfo, error) {
  207. api, err := c.waitReady(ctx)
  208. if err != nil {
  209. return nil, err
  210. }
  211. username = strings.TrimPrefix(username, "@")
  212. resolved, err := api.ContactsResolveUsername(ctx, &tg.ContactsResolveUsernameRequest{
  213. Username: username,
  214. })
  215. if err != nil {
  216. if tgerr.Is(err, "USERNAME_NOT_OCCUPIED", "USERNAME_INVALID") {
  217. return &UserInfo{Username: username, Exists: false}, nil
  218. }
  219. return nil, wrapFloodWait(err)
  220. }
  221. // Check if the peer is a user
  222. if _, ok := resolved.Peer.(*tg.PeerUser); ok {
  223. for _, u := range resolved.Users {
  224. if user, ok := u.(*tg.User); ok {
  225. info := &UserInfo{
  226. ID: user.GetID(),
  227. Username: username,
  228. IsBot: user.GetBot(),
  229. IsPremium: user.GetPremium(),
  230. Exists: true,
  231. }
  232. if fn, ok := user.GetFirstName(); ok {
  233. info.FirstName = fn
  234. }
  235. if ln, ok := user.GetLastName(); ok {
  236. info.LastName = ln
  237. }
  238. if status, ok := user.GetStatus(); ok {
  239. if offline, ok := status.(*tg.UserStatusOffline); ok {
  240. t := time.Unix(int64(offline.GetWasOnline()), 0)
  241. info.LastOnline = &t
  242. }
  243. }
  244. return info, nil
  245. }
  246. }
  247. }
  248. // Check if it's a channel or group
  249. for _, ch := range resolved.Chats {
  250. switch v := ch.(type) {
  251. case *tg.Channel:
  252. return &UserInfo{
  253. ID: v.GetID(),
  254. Username: username,
  255. IsChannel: v.GetBroadcast(),
  256. IsGroup: v.GetMegagroup(),
  257. Exists: true,
  258. }, nil
  259. case *tg.Chat:
  260. return &UserInfo{
  261. ID: v.ID,
  262. IsGroup: true,
  263. Exists: true,
  264. }, nil
  265. }
  266. }
  267. return &UserInfo{Username: username, Exists: false}, nil
  268. }
  269. // resolveInputPeer resolves a username to an InputPeer
  270. func (c *Client) resolveInputPeer(ctx context.Context, api *tg.Client, username string) (tg.InputPeerClass, error) {
  271. resolved, err := api.ContactsResolveUsername(ctx, &tg.ContactsResolveUsernameRequest{
  272. Username: username,
  273. })
  274. if err != nil {
  275. return nil, wrapFloodWait(err)
  276. }
  277. switch p := resolved.Peer.(type) {
  278. case *tg.PeerChannel:
  279. for _, ch := range resolved.Chats {
  280. if channel, ok := ch.(*tg.Channel); ok && channel.GetID() == p.ChannelID {
  281. accessHash, _ := channel.GetAccessHash()
  282. return &tg.InputPeerChannel{
  283. ChannelID: p.ChannelID,
  284. AccessHash: accessHash,
  285. }, nil
  286. }
  287. }
  288. return &tg.InputPeerChannel{ChannelID: p.ChannelID}, nil
  289. case *tg.PeerUser:
  290. for _, u := range resolved.Users {
  291. if user, ok := u.(*tg.User); ok && user.GetID() == p.UserID {
  292. accessHash, _ := user.GetAccessHash()
  293. return &tg.InputPeerUser{
  294. UserID: p.UserID,
  295. AccessHash: accessHash,
  296. }, nil
  297. }
  298. }
  299. return &tg.InputPeerUser{UserID: p.UserID}, nil
  300. case *tg.PeerChat:
  301. return &tg.InputPeerChat{ChatID: p.ChatID}, nil
  302. }
  303. return &tg.InputPeerEmpty{}, nil
  304. }
  305. // extractMessages extracts messages from a MessagesMessagesClass
  306. func extractMessages(result tg.MessagesMessagesClass) []Message {
  307. var rawMsgs []tg.MessageClass
  308. switch v := result.(type) {
  309. case *tg.MessagesMessages:
  310. rawMsgs = v.Messages
  311. case *tg.MessagesMessagesSlice:
  312. rawMsgs = v.Messages
  313. case *tg.MessagesChannelMessages:
  314. rawMsgs = v.Messages
  315. case *tg.MessagesMessagesNotModified:
  316. return nil
  317. }
  318. var msgs []Message
  319. for _, raw := range rawMsgs {
  320. switch m := raw.(type) {
  321. case *tg.Message:
  322. msg := Message{
  323. ID: m.GetID(),
  324. Text: m.GetMessage(),
  325. IsService: false,
  326. }
  327. // Extract forward source channel username
  328. if fwd, ok := m.GetFwdFrom(); ok {
  329. if fromID, ok := fwd.GetFromID(); ok {
  330. if peerCh, ok := fromID.(*tg.PeerChannel); ok {
  331. _ = peerCh // We'd need channel map to resolve username; skip for now
  332. }
  333. }
  334. }
  335. // Extract t.me links from text
  336. msg.Links = tmeRegexp.FindAllString(msg.Text, -1)
  337. msgs = append(msgs, msg)
  338. case *tg.MessageService:
  339. msgs = append(msgs, Message{
  340. ID: m.GetID(),
  341. IsService: true,
  342. })
  343. }
  344. }
  345. // Sort by ID ascending
  346. sort.Slice(msgs, func(i, j int) bool {
  347. return msgs[i].ID < msgs[j].ID
  348. })
  349. return msgs
  350. }
  351. // isFloodWait 检查错误是否是 FloodWait,提取等待时间
  352. func isFloodWait(err error) (bool, int) {
  353. if d, ok := tgerr.AsFloodWait(err); ok {
  354. return true, int(d.Seconds())
  355. }
  356. return false, 0
  357. }
  358. // wrapFloodWait wraps a FloodWait error into FloodWaitError
  359. func wrapFloodWait(err error) error {
  360. if ok, secs := isFloodWait(err); ok {
  361. return &FloodWaitError{Seconds: secs}
  362. }
  363. return err
  364. }