package pipeline import ( "context" "log" "strings" "gorm.io/gorm" "spider/internal/crawler" "spider/internal/extractor" "spider/internal/llm" "spider/internal/model" ) // CrawlPhase Phase 5: 网页爬取 type CrawlPhase struct { db *gorm.DB staticCrawler *crawler.StaticCrawler dynCrawler *crawler.DynamicCrawler tmeValidator *crawler.TMeValidator llmClient *llm.Client settings Settings reporter ProgressReporter } // NewCrawlPhase creates a new CrawlPhase. func NewCrawlPhase(db *gorm.DB, llmClient *llm.Client, settings Settings) *CrawlPhase { return &CrawlPhase{ db: db, staticCrawler: crawler.NewStaticCrawler(), dynCrawler: crawler.NewDynamicCrawler(), tmeValidator: crawler.NewTMeValidator(), llmClient: llmClient, settings: settings, } } func (p *CrawlPhase) Name() string { return "crawl" } func (p *CrawlPhase) Run(ctx context.Context, task *model.Task, opts *Options) error { log.Printf("[crawl] starting, task_id=%d", task.ID) tmeEnabled := true if p.settings != nil { tmeEnabled = p.settings.GetBool(ctx, "tme_validator.enabled", true) } var navSites []model.NavSite q := p.db.Where("status = ?", "pending") if opts.TestRun != nil && opts.TestRun.ItemLimit > 0 { q = q.Limit(opts.TestRun.ItemLimit) } q.Find(&navSites) total := len(navSites) merchantCount := 0 for i, site := range navSites { if isContextDone(ctx) { break } if p.reporter != nil { p.reporter("crawl", i+1, total, "爬取: "+site.URL) } // 预过滤 filterResult := crawler.RuleFilter(site.URL) if filterResult == crawler.FilterDiscard { p.db.Model(&site).Updates(map[string]interface{}{ "status": "filtered", "filter_reason": "blacklist", }) continue } // 不确定的 URL,交 LLM 判断 if filterResult == crawler.FilterUncertain && p.llmClient != nil { isNav, confidence, err := p.llmClient.IsNavSite(ctx, site.URL) if err != nil || !isNav || confidence < 0.6 { p.db.Model(&site).Updates(map[string]interface{}{ "status": "filtered", "filter_reason": "llm_reject", }) continue } } // 爬取:先尝试静态,失败则动态 result := p.staticCrawler.Crawl(ctx, site.URL) if result.Error != nil || result.HTML == "" { log.Printf("[crawl] static failed for %s, trying dynamic", site.URL) result = p.dynCrawler.Crawl(ctx, site.URL) } if result.Error != nil { p.db.Model(&site).Update("status", "failed") continue } // 过滤非中文页面 snippet := result.HTML if len(snippet) > 5000 { snippet = snippet[:5000] } if !extractor.ContainsChinese(snippet, 0) { p.db.Model(&site).Updates(map[string]interface{}{ "status": "filtered", "filter_reason": "non_chinese", }) continue } // 处理发现的 TG 链接 for _, tgLink := range result.TgLinks { username := crawler.ExtractTGUsername(tgLink) if username == "" { continue } // t.me 死号预检 if tmeEnabled { if !p.tmeValidator.IsAlive(ctx, username) { log.Printf("[crawl] dead account: %s", username) continue } } raw := &model.MerchantRaw{ TgUsername: username, SourceType: "web_crawl", SourceID: site.URL, Status: "raw", } p.db.Create(raw) merchantCount++ } // 处理普通链接(商户官网子页) for _, link := range result.Links { if isContextDone(ctx) { break } // 排除 TG 链接(已处理)和无效链接 if strings.Contains(link, "t.me") || strings.Contains(link, "telegram.me") { continue } if crawler.RuleFilter(link) == crawler.FilterDiscard { continue } // 爬商户官网子页提取联系方式 p.crawlMerchantSite(ctx, link, site.URL) } p.db.Model(&site).Updates(map[string]interface{}{ "status": "scraped", "merchant_count": merchantCount, }) } log.Printf("[crawl] done: %d merchants found", merchantCount) return nil } // crawlMerchantSite 爬取商户官网,提取联系方式 func (p *CrawlPhase) crawlMerchantSite(ctx context.Context, siteURL, sourceURL string) { subPages := []string{siteURL, siteURL + "/contact", siteURL + "/about", siteURL + "/关于我们"} for _, page := range subPages { if isContextDone(ctx) { break } result := p.staticCrawler.Crawl(ctx, page) if result.Error != nil || result.HTML == "" { continue } info := extractor.Extract(result.HTML) if !info.HasContact { continue } raw := &model.MerchantRaw{ TgUsername: info.TgUsername, Website: siteURL, Email: info.Email, Phone: info.Phone, SourceType: "web_crawl", SourceID: sourceURL, Status: "raw", } if raw.TgUsername != "" || raw.Email != "" || raw.Phone != "" { p.db.Create(raw) } break // 找到联系方式就停止 } }