| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129 |
- package pipeline
- import (
- "context"
- "log"
- "net/url"
- "regexp"
- "time"
- "spider/internal/model"
- "spider/internal/search"
- "gorm.io/gorm"
- )
- // SearchPhase Phase 2: 搜索引擎采集
- type SearchPhase struct {
- db *gorm.DB
- serper *search.SerperClient
- settings Settings
- reporter ProgressReporter
- }
- // NewSearchPhase creates a new SearchPhase.
- func NewSearchPhase(db *gorm.DB, serper *search.SerperClient, settings Settings) *SearchPhase {
- return &SearchPhase{
- db: db,
- serper: serper,
- settings: settings,
- }
- }
- func (p *SearchPhase) Name() string { return "search" }
- func (p *SearchPhase) Run(ctx context.Context, task *model.Task, opts *Options) error {
- if p.serper == nil {
- log.Println("[search] no serper client configured, skipping")
- return nil
- }
- // 取 active 关键词
- var keywords []model.ManagedKeyword
- q := p.db.Where("status = ?", "active")
- if opts.TestRun != nil && opts.TestRun.ItemLimit > 0 {
- q = q.Limit(opts.TestRun.ItemLimit)
- }
- q.Find(&keywords)
- total := len(keywords)
- channelCount, navCount := 0, 0
- for i, kw := range keywords {
- if isContextDone(ctx) {
- break
- }
- if p.reporter != nil {
- p.reporter("search", i+1, total, "搜索: "+kw.Keyword)
- }
- results, err := p.serper.Search(ctx, kw.Keyword)
- if err != nil {
- log.Printf("[search] keyword=%s err=%v", kw.Keyword, err)
- continue
- }
- for _, r := range results {
- switch search.ClassifyURL(r.URL) {
- case "tg_channel":
- username := extractTGUsername(r.URL)
- if username == "" {
- continue
- }
- ch := &model.Channel{
- Username: username,
- Source: "search",
- SourceDetail: kw.Keyword,
- Status: "pending",
- }
- result := p.db.Where(model.Channel{Username: username}).FirstOrCreate(ch)
- if result.RowsAffected > 0 {
- channelCount++
- }
- case "nav_site":
- domain := extractDomain(r.URL)
- site := &model.NavSite{
- URL: r.URL,
- Domain: domain,
- Source: kw.Keyword,
- Status: "pending",
- }
- result := p.db.Where("url = ?", r.URL).FirstOrCreate(site)
- if result.RowsAffected > 0 {
- navCount++
- }
- }
- }
- // 关键词间 sleep 2s
- select {
- case <-ctx.Done():
- return nil
- case <-time.After(2 * time.Second):
- }
- }
- log.Printf("[search] done: %d channels, %d nav_sites found", channelCount, navCount)
- return nil
- }
- // extractTGUsername 从 t.me/username 或 telegram.me/username 提取用户名
- func extractTGUsername(rawURL string) string {
- re := regexp.MustCompile(`(?:t(?:elegram)?\.me)/([a-zA-Z][a-zA-Z0-9_]{4,31})`)
- m := re.FindStringSubmatch(rawURL)
- if len(m) > 1 {
- return m[1]
- }
- return ""
- }
- // extractDomain 从 URL 中提取域名
- func extractDomain(rawURL string) string {
- u, err := url.Parse(rawURL)
- if err != nil {
- return ""
- }
- return u.Hostname()
- }
|