phase5_crawl.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. package pipeline
  2. import (
  3. "context"
  4. "log"
  5. "strings"
  6. "gorm.io/gorm"
  7. "spider/internal/crawler"
  8. "spider/internal/extractor"
  9. "spider/internal/llm"
  10. "spider/internal/model"
  11. )
  12. // CrawlPhase Phase 5: 网页爬取
  13. type CrawlPhase struct {
  14. db *gorm.DB
  15. staticCrawler *crawler.StaticCrawler
  16. dynCrawler *crawler.DynamicCrawler
  17. tmeValidator *crawler.TMeValidator
  18. llmClient *llm.Client
  19. settings Settings
  20. reporter ProgressReporter
  21. }
  22. // NewCrawlPhase creates a new CrawlPhase.
  23. func NewCrawlPhase(db *gorm.DB, llmClient *llm.Client, settings Settings) *CrawlPhase {
  24. return &CrawlPhase{
  25. db: db,
  26. staticCrawler: crawler.NewStaticCrawler(),
  27. dynCrawler: crawler.NewDynamicCrawler(),
  28. tmeValidator: crawler.NewTMeValidator(),
  29. llmClient: llmClient,
  30. settings: settings,
  31. }
  32. }
  33. func (p *CrawlPhase) Name() string { return "crawl" }
  34. func (p *CrawlPhase) Run(ctx context.Context, task *model.Task, opts *Options) error {
  35. log.Printf("[crawl] starting, task_id=%d", task.ID)
  36. tmeEnabled := true
  37. if p.settings != nil {
  38. tmeEnabled = p.settings.GetBool(ctx, "tme_validator.enabled", true)
  39. }
  40. var navSites []model.NavSite
  41. q := p.db.Where("status = ?", "pending")
  42. if opts.TestRun != nil && opts.TestRun.ItemLimit > 0 {
  43. q = q.Limit(opts.TestRun.ItemLimit)
  44. }
  45. q.Find(&navSites)
  46. total := len(navSites)
  47. merchantCount := 0
  48. for i, site := range navSites {
  49. if isContextDone(ctx) {
  50. break
  51. }
  52. if p.reporter != nil {
  53. p.reporter("crawl", i+1, total, "爬取: "+site.URL)
  54. }
  55. // 预过滤
  56. filterResult := crawler.RuleFilter(site.URL)
  57. if filterResult == crawler.FilterDiscard {
  58. p.db.Model(&site).Updates(map[string]interface{}{
  59. "status": "filtered",
  60. "filter_reason": "blacklist",
  61. })
  62. continue
  63. }
  64. // 不确定的 URL,交 LLM 判断
  65. if filterResult == crawler.FilterUncertain && p.llmClient != nil {
  66. isNav, confidence, err := p.llmClient.IsNavSite(ctx, site.URL)
  67. if err != nil || !isNav || confidence < 0.6 {
  68. p.db.Model(&site).Updates(map[string]interface{}{
  69. "status": "filtered",
  70. "filter_reason": "llm_reject",
  71. })
  72. continue
  73. }
  74. }
  75. // 爬取:先尝试静态,失败则动态
  76. result := p.staticCrawler.Crawl(ctx, site.URL)
  77. if result.Error != nil || result.HTML == "" {
  78. log.Printf("[crawl] static failed for %s, trying dynamic", site.URL)
  79. result = p.dynCrawler.Crawl(ctx, site.URL)
  80. }
  81. if result.Error != nil {
  82. p.db.Model(&site).Update("status", "failed")
  83. continue
  84. }
  85. // 过滤非中文页面
  86. snippet := result.HTML
  87. if len(snippet) > 5000 {
  88. snippet = snippet[:5000]
  89. }
  90. if !extractor.ContainsChinese(snippet, 0) {
  91. p.db.Model(&site).Updates(map[string]interface{}{
  92. "status": "filtered",
  93. "filter_reason": "non_chinese",
  94. })
  95. continue
  96. }
  97. // 处理发现的 TG 链接
  98. for _, tgLink := range result.TgLinks {
  99. username := crawler.ExtractTGUsername(tgLink)
  100. if username == "" {
  101. continue
  102. }
  103. // t.me 死号预检
  104. if tmeEnabled {
  105. if !p.tmeValidator.IsAlive(ctx, username) {
  106. log.Printf("[crawl] dead account: %s", username)
  107. continue
  108. }
  109. }
  110. raw := &model.MerchantRaw{
  111. TgUsername: username,
  112. SourceType: "web_crawl",
  113. SourceID: site.URL,
  114. Status: "raw",
  115. }
  116. p.db.Create(raw)
  117. merchantCount++
  118. }
  119. // 处理普通链接(商户官网子页)
  120. for _, link := range result.Links {
  121. if isContextDone(ctx) {
  122. break
  123. }
  124. // 排除 TG 链接(已处理)和无效链接
  125. if strings.Contains(link, "t.me") || strings.Contains(link, "telegram.me") {
  126. continue
  127. }
  128. if crawler.RuleFilter(link) == crawler.FilterDiscard {
  129. continue
  130. }
  131. // 爬商户官网子页提取联系方式
  132. p.crawlMerchantSite(ctx, link, site.URL)
  133. }
  134. p.db.Model(&site).Updates(map[string]interface{}{
  135. "status": "scraped",
  136. "merchant_count": merchantCount,
  137. })
  138. }
  139. log.Printf("[crawl] done: %d merchants found", merchantCount)
  140. return nil
  141. }
  142. // crawlMerchantSite 爬取商户官网,提取联系方式
  143. func (p *CrawlPhase) crawlMerchantSite(ctx context.Context, siteURL, sourceURL string) {
  144. subPages := []string{siteURL, siteURL + "/contact", siteURL + "/about", siteURL + "/关于我们"}
  145. for _, page := range subPages {
  146. if isContextDone(ctx) {
  147. break
  148. }
  149. result := p.staticCrawler.Crawl(ctx, page)
  150. if result.Error != nil || result.HTML == "" {
  151. continue
  152. }
  153. info := extractor.Extract(result.HTML)
  154. if !info.HasContact {
  155. continue
  156. }
  157. raw := &model.MerchantRaw{
  158. TgUsername: info.TgUsername,
  159. Website: siteURL,
  160. Email: info.Email,
  161. Phone: info.Phone,
  162. SourceType: "web_crawl",
  163. SourceID: sourceURL,
  164. Status: "raw",
  165. }
  166. if raw.TgUsername != "" || raw.Email != "" || raw.Phone != "" {
  167. p.db.Create(raw)
  168. }
  169. break // 找到联系方式就停止
  170. }
  171. }