package pipeline import ( "context" "log" "net/url" "regexp" "time" "spider/internal/model" "spider/internal/search" "gorm.io/gorm" ) // SearchPhase Phase 2: 搜索引擎采集 type SearchPhase struct { db *gorm.DB serper *search.SerperClient settings Settings reporter ProgressReporter } // NewSearchPhase creates a new SearchPhase. func NewSearchPhase(db *gorm.DB, serper *search.SerperClient, settings Settings) *SearchPhase { return &SearchPhase{ db: db, serper: serper, settings: settings, } } func (p *SearchPhase) Name() string { return "search" } func (p *SearchPhase) Run(ctx context.Context, task *model.Task, opts *Options) error { if p.serper == nil { log.Println("[search] no serper client configured, skipping") return nil } // 取 active 关键词 var keywords []model.ManagedKeyword q := p.db.Where("status = ?", "active") if opts.TestRun != nil && opts.TestRun.ItemLimit > 0 { q = q.Limit(opts.TestRun.ItemLimit) } q.Find(&keywords) total := len(keywords) channelCount, navCount := 0, 0 for i, kw := range keywords { if isContextDone(ctx) { break } if p.reporter != nil { p.reporter("search", i+1, total, "搜索: "+kw.Keyword) } results, err := p.serper.Search(ctx, kw.Keyword) if err != nil { log.Printf("[search] keyword=%s err=%v", kw.Keyword, err) continue } for _, r := range results { switch search.ClassifyURL(r.URL) { case "tg_channel": username := extractTGUsername(r.URL) if username == "" { continue } ch := &model.Channel{ Username: username, Source: "search", SourceDetail: kw.Keyword, Status: "pending", } result := p.db.Where(model.Channel{Username: username}).FirstOrCreate(ch) if result.RowsAffected > 0 { channelCount++ } case "nav_site": domain := extractDomain(r.URL) site := &model.NavSite{ URL: r.URL, Domain: domain, Source: kw.Keyword, Status: "pending", } result := p.db.Where("url = ?", r.URL).FirstOrCreate(site) if result.RowsAffected > 0 { navCount++ } } } // 关键词间 sleep 2s select { case <-ctx.Done(): return nil case <-time.After(2 * time.Second): } } log.Printf("[search] done: %d channels, %d nav_sites found", channelCount, navCount) return nil } // extractTGUsername 从 t.me/username 或 telegram.me/username 提取用户名 func extractTGUsername(rawURL string) string { re := regexp.MustCompile(`(?:t(?:elegram)?\.me)/([a-zA-Z][a-zA-Z0-9_]{4,31})`) m := re.FindStringSubmatch(rawURL) if len(m) > 1 { return m[1] } return "" } // extractDomain 从 URL 中提取域名 func extractDomain(rawURL string) string { u, err := url.Parse(rawURL) if err != nil { return "" } return u.Hostname() }