package webcollector import ( "context" "log" "net/url" "regexp" "strings" "sync/atomic" "time" "spider/internal/crawler" "spider/internal/extractor" "spider/internal/plugin" "spider/internal/search" ) // Collector implements plugin.Collector for web-based merchant collection. // Combines search (Google via Serper) + page crawling + contact extraction. // NO AI — pure regex and rule-based filtering. type Collector struct { serper *search.SerperClient static *crawler.StaticCrawler dynamic *crawler.DynamicCrawler tmeValidator *crawler.TMeValidator stopped atomic.Bool } // New creates a new web collector. func New(serper *search.SerperClient) *Collector { return &Collector{ serper: serper, static: crawler.NewStaticCrawler(), dynamic: crawler.NewDynamicCrawler(), tmeValidator: crawler.NewTMeValidator(), } } func (c *Collector) Name() string { return "web_collector" } func (c *Collector) Stop() error { c.stopped.Store(true) return nil } // Run executes the web collection pipeline: // 1. For each keyword, search via Serper API // 2. Classify results: t.me links -> direct extract, web pages -> crawl // 3. Crawl pages, extract TG usernames and contact info // 4. Call callback for each merchant found // // cfg keys: (none required — keywords come from DB via the task manager) // The cfg map can contain: // - "keywords": []string — override keywords (optional) func (c *Collector) Run(ctx context.Context, cfg map[string]any, callback func(plugin.MerchantData)) error { c.stopped.Store(false) if c.serper == nil { log.Println("[web_collector] no serper client configured, skipping") return nil } keywords, _ := cfg["keywords"].([]string) if len(keywords) == 0 { log.Println("[web_collector] no keywords provided") return nil } for _, kw := range keywords { if c.stopped.Load() || ctx.Err() != nil { break } log.Printf("[web_collector] searching: %s", kw) results, err := c.serper.Search(ctx, kw) if err != nil { log.Printf("[web_collector] search error for %q: %v", kw, err) continue } for _, r := range results { if c.stopped.Load() || ctx.Err() != nil { break } classification := search.ClassifyURL(r.URL) switch classification { case "tg_channel": // Direct t.me link — extract username immediately username := extractTGUsername(r.URL) if username == "" { continue } callback(plugin.MerchantData{ TgUsername: username, TgLink: "https://t.me/" + username, SourceType: "web", SourceName: r.Title, SourceURL: r.URL, }) case "nav_site": // Crawl the page for TG links and contacts c.crawlPage(ctx, r.URL, r.Title, callback) default: // "discard" or unknown — also try rule filter for non-blacklisted if crawler.RuleFilter(r.URL) != crawler.FilterDiscard { c.crawlPage(ctx, r.URL, r.Title, callback) } } } // Delay between keywords select { case <-ctx.Done(): return nil case <-time.After(2 * time.Second): } } log.Println("[web_collector] done") return nil } // crawlPage fetches a page and extracts merchants from it. func (c *Collector) crawlPage(ctx context.Context, pageURL, title string, callback func(plugin.MerchantData)) { // Rule-based filter (no LLM) filterResult := crawler.RuleFilter(pageURL) if filterResult == crawler.FilterDiscard { return } // FilterUncertain: per requirements, discard without AI // FilterValid: proceed // Try static first, fallback to dynamic result := c.static.Crawl(ctx, pageURL) if result.Error != nil || result.HTML == "" { result = c.dynamic.Crawl(ctx, pageURL) } if result.Error != nil || result.HTML == "" { return } // Chinese content filter snippet := result.HTML if len(snippet) > 5000 { snippet = snippet[:5000] } if !extractor.ContainsChinese(snippet, 0) { return } // Process t.me links found on the page for _, tgLink := range result.TgLinks { username := crawler.ExtractTGUsername(tgLink) if username == "" { continue } // t.me dead check (free, unlimited) if !c.tmeValidator.IsAlive(ctx, username) { continue } callback(plugin.MerchantData{ TgUsername: username, TgLink: "https://t.me/" + username, SourceType: "web", SourceName: title, SourceURL: pageURL, }) } // Process other links — crawl merchant sub-pages for contact info for _, link := range result.Links { if c.stopped.Load() || ctx.Err() != nil { break } // Skip TG links (already processed) and blacklisted if strings.Contains(link, "t.me") || strings.Contains(link, "telegram.me") { continue } if crawler.RuleFilter(link) == crawler.FilterDiscard { continue } c.crawlMerchantSite(ctx, link, pageURL, callback) } } // crawlMerchantSite crawls a merchant's website for contact info. func (c *Collector) crawlMerchantSite(ctx context.Context, siteURL, sourceURL string, callback func(plugin.MerchantData)) { subPages := []string{siteURL, siteURL + "/contact", siteURL + "/about"} for _, page := range subPages { if ctx.Err() != nil { break } result := c.static.Crawl(ctx, page) if result.Error != nil || result.HTML == "" { continue } info := extractor.Extract(result.HTML) if !info.HasContact { continue } if info.TgUsername == "" { continue // per requirements: no tg_username = don't insert } callback(plugin.MerchantData{ TgUsername: info.TgUsername, TgLink: "https://t.me/" + info.TgUsername, Website: siteURL, Email: info.Email, Phone: info.Phone, SourceType: "web", SourceName: extractDomain(siteURL), SourceURL: sourceURL, OriginalText: "", }) break // found contact info, stop } } var reTGUsername = regexp.MustCompile(`(?:t(?:elegram)?\.me)/([a-zA-Z][a-zA-Z0-9_]{4,31})`) func extractTGUsername(rawURL string) string { m := reTGUsername.FindStringSubmatch(rawURL) if len(m) > 1 { return m[1] } return "" } func extractDomain(rawURL string) string { u, err := url.Parse(rawURL) if err != nil { return "" } return u.Hostname() }