| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200 |
- package pipeline
- import (
- "context"
- "log"
- "strings"
- "gorm.io/gorm"
- "spider/internal/crawler"
- "spider/internal/extractor"
- "spider/internal/llm"
- "spider/internal/model"
- )
- // CrawlPhase Phase 5: 网页爬取
- type CrawlPhase struct {
- db *gorm.DB
- staticCrawler *crawler.StaticCrawler
- dynCrawler *crawler.DynamicCrawler
- tmeValidator *crawler.TMeValidator
- llmClient *llm.Client
- settings Settings
- reporter ProgressReporter
- }
- // NewCrawlPhase creates a new CrawlPhase.
- func NewCrawlPhase(db *gorm.DB, llmClient *llm.Client, settings Settings) *CrawlPhase {
- return &CrawlPhase{
- db: db,
- staticCrawler: crawler.NewStaticCrawler(),
- dynCrawler: crawler.NewDynamicCrawler(),
- tmeValidator: crawler.NewTMeValidator(),
- llmClient: llmClient,
- settings: settings,
- }
- }
- func (p *CrawlPhase) Name() string { return "crawl" }
- func (p *CrawlPhase) Run(ctx context.Context, task *model.Task, opts *Options) error {
- log.Printf("[crawl] starting, task_id=%d", task.ID)
- tmeEnabled := true
- if p.settings != nil {
- tmeEnabled = p.settings.GetBool(ctx, "tme_validator.enabled", true)
- }
- var navSites []model.NavSite
- q := p.db.Where("status = ?", "pending")
- if opts.TestRun != nil && opts.TestRun.ItemLimit > 0 {
- q = q.Limit(opts.TestRun.ItemLimit)
- }
- q.Find(&navSites)
- total := len(navSites)
- merchantCount := 0
- for i, site := range navSites {
- if isContextDone(ctx) {
- break
- }
- if p.reporter != nil {
- p.reporter("crawl", i+1, total, "爬取: "+site.URL)
- }
- // 预过滤
- filterResult := crawler.RuleFilter(site.URL)
- if filterResult == crawler.FilterDiscard {
- p.db.Model(&site).Updates(map[string]interface{}{
- "status": "filtered",
- "filter_reason": "blacklist",
- })
- continue
- }
- // 不确定的 URL,交 LLM 判断
- if filterResult == crawler.FilterUncertain && p.llmClient != nil {
- isNav, confidence, err := p.llmClient.IsNavSite(ctx, site.URL)
- if err != nil || !isNav || confidence < 0.6 {
- p.db.Model(&site).Updates(map[string]interface{}{
- "status": "filtered",
- "filter_reason": "llm_reject",
- })
- continue
- }
- }
- // 爬取:先尝试静态,失败则动态
- result := p.staticCrawler.Crawl(ctx, site.URL)
- if result.Error != nil || result.HTML == "" {
- log.Printf("[crawl] static failed for %s, trying dynamic", site.URL)
- result = p.dynCrawler.Crawl(ctx, site.URL)
- }
- if result.Error != nil {
- p.db.Model(&site).Update("status", "failed")
- continue
- }
- // 过滤非中文页面
- snippet := result.HTML
- if len(snippet) > 5000 {
- snippet = snippet[:5000]
- }
- if !extractor.ContainsChinese(snippet, 0) {
- p.db.Model(&site).Updates(map[string]interface{}{
- "status": "filtered",
- "filter_reason": "non_chinese",
- })
- continue
- }
- // 处理发现的 TG 链接
- for _, tgLink := range result.TgLinks {
- username := crawler.ExtractTGUsername(tgLink)
- if username == "" {
- continue
- }
- // t.me 死号预检
- if tmeEnabled {
- if !p.tmeValidator.IsAlive(ctx, username) {
- log.Printf("[crawl] dead account: %s", username)
- continue
- }
- }
- raw := &model.MerchantRaw{
- TgUsername: username,
- SourceType: "web_crawl",
- SourceID: site.URL,
- Status: "raw",
- }
- p.db.Create(raw)
- merchantCount++
- }
- // 处理普通链接(商户官网子页)
- for _, link := range result.Links {
- if isContextDone(ctx) {
- break
- }
- // 排除 TG 链接(已处理)和无效链接
- if strings.Contains(link, "t.me") || strings.Contains(link, "telegram.me") {
- continue
- }
- if crawler.RuleFilter(link) == crawler.FilterDiscard {
- continue
- }
- // 爬商户官网子页提取联系方式
- p.crawlMerchantSite(ctx, link, site.URL)
- }
- p.db.Model(&site).Updates(map[string]interface{}{
- "status": "scraped",
- "merchant_count": merchantCount,
- })
- }
- log.Printf("[crawl] done: %d merchants found", merchantCount)
- return nil
- }
- // crawlMerchantSite 爬取商户官网,提取联系方式
- func (p *CrawlPhase) crawlMerchantSite(ctx context.Context, siteURL, sourceURL string) {
- subPages := []string{siteURL, siteURL + "/contact", siteURL + "/about", siteURL + "/关于我们"}
- for _, page := range subPages {
- if isContextDone(ctx) {
- break
- }
- result := p.staticCrawler.Crawl(ctx, page)
- if result.Error != nil || result.HTML == "" {
- continue
- }
- info := extractor.Extract(result.HTML)
- if !info.HasContact {
- continue
- }
- raw := &model.MerchantRaw{
- TgUsername: info.TgUsername,
- Website: siteURL,
- Email: info.Email,
- Phone: info.Phone,
- SourceType: "web_crawl",
- SourceID: sourceURL,
- Status: "raw",
- }
- if raw.TgUsername != "" || raw.Email != "" || raw.Phone != "" {
- p.db.Create(raw)
- }
- break // 找到联系方式就停止
- }
- }
|