phase2_search.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. package pipeline
  2. import (
  3. "context"
  4. "log"
  5. "net/url"
  6. "regexp"
  7. "time"
  8. "spider/internal/model"
  9. "spider/internal/search"
  10. "gorm.io/gorm"
  11. )
  12. // SearchPhase Phase 2: 搜索引擎采集
  13. type SearchPhase struct {
  14. db *gorm.DB
  15. serper *search.SerperClient
  16. settings Settings
  17. reporter ProgressReporter
  18. }
  19. // NewSearchPhase creates a new SearchPhase.
  20. func NewSearchPhase(db *gorm.DB, serper *search.SerperClient, settings Settings) *SearchPhase {
  21. return &SearchPhase{
  22. db: db,
  23. serper: serper,
  24. settings: settings,
  25. }
  26. }
  27. func (p *SearchPhase) Name() string { return "search" }
  28. func (p *SearchPhase) Run(ctx context.Context, task *model.Task, opts *Options) error {
  29. if p.serper == nil {
  30. log.Println("[search] no serper client configured, skipping")
  31. return nil
  32. }
  33. // 取 active 关键词
  34. var keywords []model.ManagedKeyword
  35. q := p.db.Where("status = ?", "active")
  36. if opts.TestRun != nil && opts.TestRun.ItemLimit > 0 {
  37. q = q.Limit(opts.TestRun.ItemLimit)
  38. }
  39. q.Find(&keywords)
  40. total := len(keywords)
  41. channelCount, navCount := 0, 0
  42. for i, kw := range keywords {
  43. if isContextDone(ctx) {
  44. break
  45. }
  46. if p.reporter != nil {
  47. p.reporter("search", i+1, total, "搜索: "+kw.Keyword)
  48. }
  49. results, err := p.serper.Search(ctx, kw.Keyword)
  50. if err != nil {
  51. log.Printf("[search] keyword=%s err=%v", kw.Keyword, err)
  52. continue
  53. }
  54. for _, r := range results {
  55. switch search.ClassifyURL(r.URL) {
  56. case "tg_channel":
  57. username := extractTGUsername(r.URL)
  58. if username == "" {
  59. continue
  60. }
  61. ch := &model.Channel{
  62. Username: username,
  63. Source: "search",
  64. SourceDetail: kw.Keyword,
  65. Status: "pending",
  66. }
  67. result := p.db.Where(model.Channel{Username: username}).FirstOrCreate(ch)
  68. if result.RowsAffected > 0 {
  69. channelCount++
  70. }
  71. case "nav_site":
  72. domain := extractDomain(r.URL)
  73. site := &model.NavSite{
  74. URL: r.URL,
  75. Domain: domain,
  76. Source: kw.Keyword,
  77. Status: "pending",
  78. }
  79. result := p.db.Where("url = ?", r.URL).FirstOrCreate(site)
  80. if result.RowsAffected > 0 {
  81. navCount++
  82. }
  83. }
  84. }
  85. // 关键词间 sleep 2s
  86. select {
  87. case <-ctx.Done():
  88. return nil
  89. case <-time.After(2 * time.Second):
  90. }
  91. }
  92. log.Printf("[search] done: %d channels, %d nav_sites found", channelCount, navCount)
  93. return nil
  94. }
  95. // extractTGUsername 从 t.me/username 或 telegram.me/username 提取用户名
  96. func extractTGUsername(rawURL string) string {
  97. re := regexp.MustCompile(`(?:t(?:elegram)?\.me)/([a-zA-Z][a-zA-Z0-9_]{4,31})`)
  98. m := re.FindStringSubmatch(rawURL)
  99. if len(m) > 1 {
  100. return m[1]
  101. }
  102. return ""
  103. }
  104. // extractDomain 从 URL 中提取域名
  105. func extractDomain(rawURL string) string {
  106. u, err := url.Parse(rawURL)
  107. if err != nil {
  108. return ""
  109. }
  110. return u.Hostname()
  111. }