collector.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243
  1. package webcollector
  2. import (
  3. "context"
  4. "log"
  5. "net/url"
  6. "regexp"
  7. "strings"
  8. "sync/atomic"
  9. "time"
  10. "spider/internal/crawler"
  11. "spider/internal/extractor"
  12. "spider/internal/plugin"
  13. "spider/internal/search"
  14. )
  15. // Collector implements plugin.Collector for web-based merchant collection.
  16. // Combines search (Google via Serper) + page crawling + contact extraction.
  17. // NO AI — pure regex and rule-based filtering.
  18. type Collector struct {
  19. serper *search.SerperClient
  20. static *crawler.StaticCrawler
  21. dynamic *crawler.DynamicCrawler
  22. tmeValidator *crawler.TMeValidator
  23. stopped atomic.Bool
  24. }
  25. // New creates a new web collector.
  26. func New(serper *search.SerperClient) *Collector {
  27. return &Collector{
  28. serper: serper,
  29. static: crawler.NewStaticCrawler(),
  30. dynamic: crawler.NewDynamicCrawler(),
  31. tmeValidator: crawler.NewTMeValidator(),
  32. }
  33. }
  34. func (c *Collector) Name() string { return "web_collector" }
  35. func (c *Collector) Stop() error {
  36. c.stopped.Store(true)
  37. return nil
  38. }
  39. // Run executes the web collection pipeline:
  40. // 1. For each keyword, search via Serper API
  41. // 2. Classify results: t.me links -> direct extract, web pages -> crawl
  42. // 3. Crawl pages, extract TG usernames and contact info
  43. // 4. Call callback for each merchant found
  44. //
  45. // cfg keys: (none required — keywords come from DB via the task manager)
  46. // The cfg map can contain:
  47. // - "keywords": []string — override keywords (optional)
  48. func (c *Collector) Run(ctx context.Context, cfg map[string]any, callback func(plugin.MerchantData)) error {
  49. c.stopped.Store(false)
  50. if c.serper == nil {
  51. log.Println("[web_collector] no serper client configured, skipping")
  52. return nil
  53. }
  54. keywords, _ := cfg["keywords"].([]string)
  55. if len(keywords) == 0 {
  56. log.Println("[web_collector] no keywords provided")
  57. return nil
  58. }
  59. for _, kw := range keywords {
  60. if c.stopped.Load() || ctx.Err() != nil {
  61. break
  62. }
  63. log.Printf("[web_collector] searching: %s", kw)
  64. results, err := c.serper.Search(ctx, kw)
  65. if err != nil {
  66. log.Printf("[web_collector] search error for %q: %v", kw, err)
  67. continue
  68. }
  69. for _, r := range results {
  70. if c.stopped.Load() || ctx.Err() != nil {
  71. break
  72. }
  73. classification := search.ClassifyURL(r.URL)
  74. switch classification {
  75. case "tg_channel":
  76. // Direct t.me link — extract username immediately
  77. username := extractTGUsername(r.URL)
  78. if username == "" {
  79. continue
  80. }
  81. callback(plugin.MerchantData{
  82. TgUsername: username,
  83. TgLink: "https://t.me/" + username,
  84. SourceType: "web",
  85. SourceName: r.Title,
  86. SourceURL: r.URL,
  87. })
  88. case "nav_site":
  89. // Crawl the page for TG links and contacts
  90. c.crawlPage(ctx, r.URL, r.Title, callback)
  91. default:
  92. // "discard" or unknown — also try rule filter for non-blacklisted
  93. if crawler.RuleFilter(r.URL) != crawler.FilterDiscard {
  94. c.crawlPage(ctx, r.URL, r.Title, callback)
  95. }
  96. }
  97. }
  98. // Delay between keywords
  99. select {
  100. case <-ctx.Done():
  101. return nil
  102. case <-time.After(2 * time.Second):
  103. }
  104. }
  105. log.Println("[web_collector] done")
  106. return nil
  107. }
  108. // crawlPage fetches a page and extracts merchants from it.
  109. func (c *Collector) crawlPage(ctx context.Context, pageURL, title string, callback func(plugin.MerchantData)) {
  110. // Rule-based filter (no LLM)
  111. filterResult := crawler.RuleFilter(pageURL)
  112. if filterResult == crawler.FilterDiscard {
  113. return
  114. }
  115. // FilterUncertain: per requirements, discard without AI
  116. // FilterValid: proceed
  117. // Try static first, fallback to dynamic
  118. result := c.static.Crawl(ctx, pageURL)
  119. if result.Error != nil || result.HTML == "" {
  120. result = c.dynamic.Crawl(ctx, pageURL)
  121. }
  122. if result.Error != nil || result.HTML == "" {
  123. return
  124. }
  125. // Chinese content filter
  126. snippet := result.HTML
  127. if len(snippet) > 5000 {
  128. snippet = snippet[:5000]
  129. }
  130. if !extractor.ContainsChinese(snippet, 0) {
  131. return
  132. }
  133. // Process t.me links found on the page
  134. for _, tgLink := range result.TgLinks {
  135. username := crawler.ExtractTGUsername(tgLink)
  136. if username == "" {
  137. continue
  138. }
  139. // t.me dead check (free, unlimited)
  140. if !c.tmeValidator.IsAlive(ctx, username) {
  141. continue
  142. }
  143. callback(plugin.MerchantData{
  144. TgUsername: username,
  145. TgLink: "https://t.me/" + username,
  146. SourceType: "web",
  147. SourceName: title,
  148. SourceURL: pageURL,
  149. })
  150. }
  151. // Process other links — crawl merchant sub-pages for contact info
  152. for _, link := range result.Links {
  153. if c.stopped.Load() || ctx.Err() != nil {
  154. break
  155. }
  156. // Skip TG links (already processed) and blacklisted
  157. if strings.Contains(link, "t.me") || strings.Contains(link, "telegram.me") {
  158. continue
  159. }
  160. if crawler.RuleFilter(link) == crawler.FilterDiscard {
  161. continue
  162. }
  163. c.crawlMerchantSite(ctx, link, pageURL, callback)
  164. }
  165. }
  166. // crawlMerchantSite crawls a merchant's website for contact info.
  167. func (c *Collector) crawlMerchantSite(ctx context.Context, siteURL, sourceURL string, callback func(plugin.MerchantData)) {
  168. subPages := []string{siteURL, siteURL + "/contact", siteURL + "/about"}
  169. for _, page := range subPages {
  170. if ctx.Err() != nil {
  171. break
  172. }
  173. result := c.static.Crawl(ctx, page)
  174. if result.Error != nil || result.HTML == "" {
  175. continue
  176. }
  177. info := extractor.Extract(result.HTML)
  178. if !info.HasContact {
  179. continue
  180. }
  181. if info.TgUsername == "" {
  182. continue // per requirements: no tg_username = don't insert
  183. }
  184. callback(plugin.MerchantData{
  185. TgUsername: info.TgUsername,
  186. TgLink: "https://t.me/" + info.TgUsername,
  187. Website: siteURL,
  188. Email: info.Email,
  189. Phone: info.Phone,
  190. SourceType: "web",
  191. SourceName: extractDomain(siteURL),
  192. SourceURL: sourceURL,
  193. OriginalText: "",
  194. })
  195. break // found contact info, stop
  196. }
  197. }
  198. var reTGUsername = regexp.MustCompile(`(?:t(?:elegram)?\.me)/([a-zA-Z][a-zA-Z0-9_]{4,31})`)
  199. func extractTGUsername(rawURL string) string {
  200. m := reTGUsername.FindStringSubmatch(rawURL)
  201. if len(m) > 1 {
  202. return m[1]
  203. }
  204. return ""
  205. }
  206. func extractDomain(rawURL string) string {
  207. u, err := url.Parse(rawURL)
  208. if err != nil {
  209. return ""
  210. }
  211. return u.Hostname()
  212. }