| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243 |
- package webcollector
- import (
- "context"
- "log"
- "net/url"
- "regexp"
- "strings"
- "sync/atomic"
- "time"
- "spider/internal/crawler"
- "spider/internal/extractor"
- "spider/internal/plugin"
- "spider/internal/search"
- )
- // Collector implements plugin.Collector for web-based merchant collection.
- // Combines search (Google via Serper) + page crawling + contact extraction.
- // NO AI — pure regex and rule-based filtering.
- type Collector struct {
- serper *search.SerperClient
- static *crawler.StaticCrawler
- dynamic *crawler.DynamicCrawler
- tmeValidator *crawler.TMeValidator
- stopped atomic.Bool
- }
- // New creates a new web collector.
- func New(serper *search.SerperClient) *Collector {
- return &Collector{
- serper: serper,
- static: crawler.NewStaticCrawler(),
- dynamic: crawler.NewDynamicCrawler(),
- tmeValidator: crawler.NewTMeValidator(),
- }
- }
- func (c *Collector) Name() string { return "web_collector" }
- func (c *Collector) Stop() error {
- c.stopped.Store(true)
- return nil
- }
- // Run executes the web collection pipeline:
- // 1. For each keyword, search via Serper API
- // 2. Classify results: t.me links -> direct extract, web pages -> crawl
- // 3. Crawl pages, extract TG usernames and contact info
- // 4. Call callback for each merchant found
- //
- // cfg keys: (none required — keywords come from DB via the task manager)
- // The cfg map can contain:
- // - "keywords": []string — override keywords (optional)
- func (c *Collector) Run(ctx context.Context, cfg map[string]any, callback func(plugin.MerchantData)) error {
- c.stopped.Store(false)
- if c.serper == nil {
- log.Println("[web_collector] no serper client configured, skipping")
- return nil
- }
- keywords, _ := cfg["keywords"].([]string)
- if len(keywords) == 0 {
- log.Println("[web_collector] no keywords provided")
- return nil
- }
- for _, kw := range keywords {
- if c.stopped.Load() || ctx.Err() != nil {
- break
- }
- log.Printf("[web_collector] searching: %s", kw)
- results, err := c.serper.Search(ctx, kw)
- if err != nil {
- log.Printf("[web_collector] search error for %q: %v", kw, err)
- continue
- }
- for _, r := range results {
- if c.stopped.Load() || ctx.Err() != nil {
- break
- }
- classification := search.ClassifyURL(r.URL)
- switch classification {
- case "tg_channel":
- // Direct t.me link — extract username immediately
- username := extractTGUsername(r.URL)
- if username == "" {
- continue
- }
- callback(plugin.MerchantData{
- TgUsername: username,
- TgLink: "https://t.me/" + username,
- SourceType: "web",
- SourceName: r.Title,
- SourceURL: r.URL,
- })
- case "nav_site":
- // Crawl the page for TG links and contacts
- c.crawlPage(ctx, r.URL, r.Title, callback)
- default:
- // "discard" or unknown — also try rule filter for non-blacklisted
- if crawler.RuleFilter(r.URL) != crawler.FilterDiscard {
- c.crawlPage(ctx, r.URL, r.Title, callback)
- }
- }
- }
- // Delay between keywords
- select {
- case <-ctx.Done():
- return nil
- case <-time.After(2 * time.Second):
- }
- }
- log.Println("[web_collector] done")
- return nil
- }
- // crawlPage fetches a page and extracts merchants from it.
- func (c *Collector) crawlPage(ctx context.Context, pageURL, title string, callback func(plugin.MerchantData)) {
- // Rule-based filter (no LLM)
- filterResult := crawler.RuleFilter(pageURL)
- if filterResult == crawler.FilterDiscard {
- return
- }
- // FilterUncertain: per requirements, discard without AI
- // FilterValid: proceed
- // Try static first, fallback to dynamic
- result := c.static.Crawl(ctx, pageURL)
- if result.Error != nil || result.HTML == "" {
- result = c.dynamic.Crawl(ctx, pageURL)
- }
- if result.Error != nil || result.HTML == "" {
- return
- }
- // Chinese content filter
- snippet := result.HTML
- if len(snippet) > 5000 {
- snippet = snippet[:5000]
- }
- if !extractor.ContainsChinese(snippet, 0) {
- return
- }
- // Process t.me links found on the page
- for _, tgLink := range result.TgLinks {
- username := crawler.ExtractTGUsername(tgLink)
- if username == "" {
- continue
- }
- // t.me dead check (free, unlimited)
- if !c.tmeValidator.IsAlive(ctx, username) {
- continue
- }
- callback(plugin.MerchantData{
- TgUsername: username,
- TgLink: "https://t.me/" + username,
- SourceType: "web",
- SourceName: title,
- SourceURL: pageURL,
- })
- }
- // Process other links — crawl merchant sub-pages for contact info
- for _, link := range result.Links {
- if c.stopped.Load() || ctx.Err() != nil {
- break
- }
- // Skip TG links (already processed) and blacklisted
- if strings.Contains(link, "t.me") || strings.Contains(link, "telegram.me") {
- continue
- }
- if crawler.RuleFilter(link) == crawler.FilterDiscard {
- continue
- }
- c.crawlMerchantSite(ctx, link, pageURL, callback)
- }
- }
- // crawlMerchantSite crawls a merchant's website for contact info.
- func (c *Collector) crawlMerchantSite(ctx context.Context, siteURL, sourceURL string, callback func(plugin.MerchantData)) {
- subPages := []string{siteURL, siteURL + "/contact", siteURL + "/about"}
- for _, page := range subPages {
- if ctx.Err() != nil {
- break
- }
- result := c.static.Crawl(ctx, page)
- if result.Error != nil || result.HTML == "" {
- continue
- }
- info := extractor.Extract(result.HTML)
- if !info.HasContact {
- continue
- }
- if info.TgUsername == "" {
- continue // per requirements: no tg_username = don't insert
- }
- callback(plugin.MerchantData{
- TgUsername: info.TgUsername,
- TgLink: "https://t.me/" + info.TgUsername,
- Website: siteURL,
- Email: info.Email,
- Phone: info.Phone,
- SourceType: "web",
- SourceName: extractDomain(siteURL),
- SourceURL: sourceURL,
- OriginalText: "",
- })
- break // found contact info, stop
- }
- }
- var reTGUsername = regexp.MustCompile(`(?:t(?:elegram)?\.me)/([a-zA-Z][a-zA-Z0-9_]{4,31})`)
- func extractTGUsername(rawURL string) string {
- m := reTGUsername.FindStringSubmatch(rawURL)
- if len(m) > 1 {
- return m[1]
- }
- return ""
- }
- func extractDomain(rawURL string) string {
- u, err := url.Parse(rawURL)
- if err != nil {
- return ""
- }
- return u.Hostname()
- }
|