phase4_scrape.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. package pipeline
  2. import (
  3. "context"
  4. "log"
  5. "strings"
  6. "time"
  7. "github.com/redis/go-redis/v9"
  8. "gorm.io/gorm"
  9. "spider/internal/extractor"
  10. "spider/internal/llm"
  11. "spider/internal/model"
  12. "spider/internal/telegram"
  13. )
  14. // ScrapePhase Phase 4: TG 消息采集
  15. type ScrapePhase struct {
  16. db *gorm.DB
  17. tgManager *telegram.AccountManager
  18. llmClient *llm.Client
  19. settings Settings
  20. redis *redis.Client
  21. reporter ProgressReporter
  22. }
  23. // NewScrapePhase creates a new ScrapePhase.
  24. func NewScrapePhase(db *gorm.DB, tgManager *telegram.AccountManager, llmClient *llm.Client, settings Settings, rdb *redis.Client) *ScrapePhase {
  25. return &ScrapePhase{
  26. db: db,
  27. tgManager: tgManager,
  28. llmClient: llmClient,
  29. settings: settings,
  30. redis: rdb,
  31. }
  32. }
  33. func (p *ScrapePhase) Name() string { return "scrape" }
  34. func (p *ScrapePhase) Run(ctx context.Context, task *model.Task, opts *Options) error {
  35. log.Printf("[scrape] starting, task_id=%d", task.ID)
  36. if p.tgManager == nil {
  37. log.Printf("[scrape] tgManager is nil, skipping")
  38. return nil
  39. }
  40. msgLimit := p.settings.GetInt(ctx, "tg_scraper.message_limit_per_channel", 500)
  41. delayMsg := p.settings.GetFloat(ctx, "tg_scraper.delay_per_message", 1.0)
  42. delayChannel := p.settings.GetFloat(ctx, "tg_scraper.delay_per_channel", 5.0)
  43. if opts.TestRun != nil && opts.TestRun.MessageLimit > 0 {
  44. msgLimit = opts.TestRun.MessageLimit
  45. }
  46. // 取 pending 频道
  47. var channels []model.Channel
  48. q := p.db.Where("status = ?", "pending")
  49. if opts.TestRun != nil && opts.TestRun.ItemLimit > 0 {
  50. q = q.Limit(opts.TestRun.ItemLimit)
  51. }
  52. q.Find(&channels)
  53. total := len(channels)
  54. log.Printf("[scrape] found %d pending channels", total)
  55. for i, ch := range channels {
  56. if isContextDone(ctx) {
  57. break
  58. }
  59. if p.reporter != nil {
  60. p.reporter("scrape", i+1, total, "采集频道: @"+ch.Username)
  61. }
  62. acc, err := p.tgManager.Acquire(ctx)
  63. if err != nil {
  64. log.Printf("[scrape] no available account: %v", err)
  65. break
  66. }
  67. if err := acc.Client.Connect(ctx); err != nil {
  68. log.Printf("[scrape] connect failed: %v", err)
  69. p.tgManager.Release(acc, 0)
  70. p.db.Model(&ch).Update("status", "failed")
  71. continue
  72. }
  73. // LLM 相关性评估
  74. if p.llmClient != nil {
  75. score, err := p.llmClient.EvalChannelRelevance(ctx, ch.Title, ch.About, ch.MemberCount)
  76. if err == nil && score < 0.5 {
  77. log.Printf("[scrape] skipping @%s, relevance score=%.2f", ch.Username, score)
  78. p.tgManager.Release(acc, 0)
  79. p.db.Model(&ch).Update("status", "skipped")
  80. continue
  81. }
  82. }
  83. // 读置顶消息
  84. pinnedMsgs, _ := acc.Client.GetPinnedMessages(ctx, ch.Username)
  85. p.processMessages(ctx, pinnedMsgs, &ch, delayMsg)
  86. // 读历史消息(断点续传)
  87. offsetID := ch.LastMessageID
  88. fetched := 0
  89. for fetched < msgLimit {
  90. if isContextDone(ctx) {
  91. break
  92. }
  93. batchSize := 100
  94. if msgLimit-fetched < batchSize {
  95. batchSize = msgLimit - fetched
  96. }
  97. msgs, err := acc.Client.GetMessages(ctx, ch.Username, offsetID, batchSize)
  98. if err != nil {
  99. if fw, ok := err.(*telegram.FloodWaitError); ok {
  100. log.Printf("[scrape] FloodWait %ds on @%s", fw.Seconds, ch.Username)
  101. p.tgManager.HandleFloodWait(acc, fw.Seconds)
  102. acc = nil
  103. } else {
  104. log.Printf("[scrape] GetMessages @%s: %v", ch.Username, err)
  105. }
  106. break
  107. }
  108. if len(msgs) == 0 {
  109. break
  110. }
  111. p.processMessages(ctx, msgs, &ch, delayMsg)
  112. // 更新断点
  113. lastID := msgs[len(msgs)-1].ID
  114. p.db.Model(&ch).Update("last_message_id", lastID)
  115. offsetID = lastID
  116. fetched += len(msgs)
  117. }
  118. if acc != nil {
  119. p.tgManager.Release(acc, 0)
  120. }
  121. p.db.Model(&ch).Update("status", "scraped")
  122. select {
  123. case <-ctx.Done():
  124. return nil
  125. case <-time.After(time.Duration(float64(time.Second) * delayChannel)):
  126. }
  127. }
  128. log.Printf("[scrape] done")
  129. return nil
  130. }
  131. // processMessages 处理一批消息,提取商户写入 merchants_raw
  132. func (p *ScrapePhase) processMessages(ctx context.Context, msgs []telegram.Message, ch *model.Channel, delayMsg float64) {
  133. for _, msg := range msgs {
  134. if msg.IsService || msg.Text == "" {
  135. continue
  136. }
  137. if !extractor.ContainsChinese(msg.Text, 0) {
  138. continue
  139. }
  140. if !extractor.HasContact(msg.Text) {
  141. continue
  142. }
  143. // 快速去重(Redis SET NX key)
  144. if p.redis != nil {
  145. info := extractor.Extract(msg.Text)
  146. if info.TgUsername != "" {
  147. dedupKey := "spider:dedup:merchant:" + info.TgUsername
  148. set, _ := p.redis.SetNX(ctx, dedupKey, "1", 7*24*time.Hour).Result()
  149. if !set {
  150. continue // 已存在,跳过
  151. }
  152. }
  153. }
  154. // LLM 精准解析
  155. var merchantInfo *extractor.MerchantInfo
  156. if p.llmClient != nil {
  157. merchantInfo, _ = p.llmClient.ParseMerchant(ctx, msg.Text)
  158. }
  159. // Fallback 到正则
  160. if merchantInfo == nil || merchantInfo.TgUsername == "" {
  161. info := extractor.Extract(msg.Text)
  162. merchantInfo = &extractor.MerchantInfo{
  163. TgUsername: info.TgUsername,
  164. Website: info.Website,
  165. Email: info.Email,
  166. Phone: info.Phone,
  167. }
  168. }
  169. if merchantInfo.TgUsername == "" && merchantInfo.Website == "" {
  170. continue
  171. }
  172. raw := &model.MerchantRaw{
  173. MerchantName: extractor.CleanMerchantName(merchantInfo.MerchantName),
  174. TgUsername: strings.TrimPrefix(merchantInfo.TgUsername, "@"),
  175. Website: merchantInfo.Website,
  176. Email: merchantInfo.Email,
  177. Phone: merchantInfo.Phone,
  178. Industry: merchantInfo.Industry,
  179. SourceType: "tg_scrape",
  180. SourceID: ch.Username,
  181. OriginalMessage: msg.Text,
  182. Status: "raw",
  183. }
  184. p.db.Create(raw)
  185. time.Sleep(time.Duration(float64(time.Second) * delayMsg))
  186. }
  187. }