package webcollector import ( "context" "encoding/json" "log" "net/url" "regexp" "strings" "sync/atomic" "time" "github.com/redis/go-redis/v9" "spider/internal/crawler" "spider/internal/extractor" "spider/internal/plugin" proxypool "spider/internal/proxy" "spider/internal/search" ) const snapshotKey = "spider:webcollector:snapshot" // Collector implements plugin.Collector for web-based merchant collection. type Collector struct { serper *search.SerperClient static *crawler.StaticCrawler dynamic *crawler.DynamicCrawler tmeValidator *crawler.TMeValidator stopped atomic.Bool logger plugin.TaskLogger proxyPool *proxypool.Pool rdb *redis.Client } func New(serper *search.SerperClient, rdb *redis.Client) *Collector { return &Collector{ serper: serper, static: crawler.NewStaticCrawler(), dynamic: crawler.NewDynamicCrawler(), tmeValidator: crawler.NewTMeValidator(), logger: plugin.NopLogger(), rdb: rdb, } } func (c *Collector) Name() string { return "web_collector" } func (c *Collector) SetLogger(l plugin.TaskLogger) { c.logger = l } func (c *Collector) Stop() error { c.stopped.Store(true) return nil } func (c *Collector) Run(ctx context.Context, cfg map[string]any, callback func(plugin.MerchantData)) error { c.stopped.Store(false) // Apply proxy pool or single proxy if pool, ok := cfg["proxy_pool"].(*proxypool.Pool); ok && pool != nil { c.proxyPool = pool log.Printf("[web_collector] using proxy pool with %d proxies", pool.Size()) } else if proxyURL, ok := cfg["proxy_url"].(string); ok && proxyURL != "" { c.static.SetProxy(proxyURL) log.Printf("[web_collector] using proxy: %s", proxyURL) } if c.serper == nil { log.Println("[web_collector] no serper client configured, skipping") return nil } keywords, _ := cfg["keywords"].([]string) if len(keywords) == 0 { log.Println("[web_collector] no keywords provided") return nil } // Stop conditions maxMerchants, _ := cfg["max_merchants"].(int) maxDurationMins, _ := cfg["max_duration_mins"].(int) resumeSnapshot, _ := cfg["resume_snapshot"].(bool) var deadline time.Time if maxDurationMins > 0 { deadline = time.Now().Add(time.Duration(maxDurationMins) * time.Minute) log.Printf("[web_collector] will stop after %d minutes", maxDurationMins) } // Load or init snapshot snapshot := c.loadSnapshot(ctx) if !resumeSnapshot { snapshot = map[string]bool{} log.Println("[web_collector] starting fresh (snapshot cleared)") } else if len(snapshot) > 0 { log.Printf("[web_collector] resuming from snapshot, %d queries already done", len(snapshot)) } merchantCount := 0 wrappedCallback := func(md plugin.MerchantData) { callback(md) merchantCount++ } queries := expandSearchQueries(keywords) for _, q := range queries { if c.stopped.Load() || ctx.Err() != nil { break } if maxMerchants > 0 && merchantCount >= maxMerchants { log.Printf("[web_collector] reached max_merchants limit (%d), stopping", maxMerchants) break } if !deadline.IsZero() && time.Now().After(deadline) { log.Printf("[web_collector] reached max_duration limit, stopping") break } if snapshot[q] { log.Printf("[web_collector] skipping (snapshot): %s", q) continue } // Rotate proxy for each query if using pool c.rotateProxy() log.Printf("[web_collector] searching: %s", q) // ── Organic search ── t0 := time.Now() results, err := c.serper.Search(ctx, q) if err != nil { log.Printf("[web_collector] search error: %v", err) c.logger.LogError("search", "", err.Error()) } else { log.Printf("[web_collector] organic search %q: %d results in %dms", q, len(results), time.Since(t0).Milliseconds()) for i, r := range results { c.logger.LogSearchResult(q+" [organic]", i+1, r.Title, r.URL, r.Snippet) } c.processResults(ctx, results, q, wrappedCallback) } time.Sleep(1 * time.Second) // ── Video search ── if c.stopped.Load() || ctx.Err() != nil { break } videoResults, err := c.serper.SearchVideos(ctx, q) if err != nil { c.logger.LogError("search_videos", "", err.Error()) } else { for i, r := range videoResults { c.logger.LogSearchResult(q+" [video]", i+1, r.Title, r.URL, r.Snippet) } c.processResults(ctx, videoResults, q, wrappedCallback) } // Mark query as done in snapshot snapshot[q] = true c.saveSnapshot(ctx, snapshot) select { case <-ctx.Done(): return nil case <-time.After(2 * time.Second): } } // If all queries done naturally, clear snapshot for next full run allDone := true for _, q := range queries { if !snapshot[q] { allDone = false break } } if allDone && c.rdb != nil { c.rdb.Del(ctx, snapshotKey) log.Println("[web_collector] all queries done, snapshot cleared") } log.Printf("[web_collector] done, collected %d merchants", merchantCount) return nil } func (c *Collector) loadSnapshot(ctx context.Context) map[string]bool { if c.rdb == nil { return map[string]bool{} } data, err := c.rdb.Get(ctx, snapshotKey).Bytes() if err != nil { return map[string]bool{} } var snapshot map[string]bool if err := json.Unmarshal(data, &snapshot); err != nil { return map[string]bool{} } return snapshot } func (c *Collector) saveSnapshot(ctx context.Context, snapshot map[string]bool) { if c.rdb == nil { return } data, err := json.Marshal(snapshot) if err != nil { return } // Keep snapshot for 7 days c.rdb.Set(ctx, snapshotKey, data, 7*24*time.Hour) } // processResults handles search results with full logging at every node. func (c *Collector) processResults(ctx context.Context, results []search.SearchResult, query string, callback func(plugin.MerchantData)) { for _, r := range results { if c.stopped.Load() || ctx.Err() != nil { break } // ── Node 1: Extract from snippet text ── snippetText := r.Title + " " + r.Snippet c.extractFromSnippet(snippetText, r.Title, r.URL, callback) // ── Node 2: Extract URLs from snippet → crawl them ── snippetURLs := reURL.FindAllString(r.Snippet, -1) for _, sURL := range snippetURLs { if c.stopped.Load() || ctx.Err() != nil { break } sURL = strings.TrimRight(sURL, ".,;)\"'") if strings.Contains(sURL, "t.me/") || strings.Contains(sURL, "telegram.me/") { username := extractTGUsername(sURL) if username != "" { md := plugin.MerchantData{ TgUsername: username, TgLink: "https://t.me/" + username, SourceType: "web", SourceName: r.Title, SourceURL: r.URL, } c.logger.LogMerchantFound(md, "snippet_tme_url", 0, r.URL) callback(md) } continue } if isBlacklistDomain(sURL) { c.logger.LogSkip("crawl_snippet_url", sURL, "blacklisted_domain") continue } // Crawl URLs found inside snippets — depth=1, parent is the serper result c.crawlAndExtract(ctx, sURL, r.URL, 1, r.Title, callback) } // ── Node 3: Crawl the result URL itself ── classification := search.ClassifyURL(r.URL) c.logger.LogSkip("classify", r.URL, classification) // log classification decision switch classification { case "tg_channel": username := extractTGUsername(r.URL) if username != "" { md := plugin.MerchantData{ TgUsername: username, TgLink: "https://t.me/" + username, SourceType: "web", SourceName: r.Title, SourceURL: r.URL, } c.logger.LogMerchantFound(md, "direct_tme_link", 0, "") callback(md) } case "nav_site", "web_page": if crawler.RuleFilter(r.URL) != crawler.FilterDiscard { c.crawlAndExtract(ctx, r.URL, "", 0, r.Title, callback) } else { c.logger.LogSkip("crawl", r.URL, "rule_filter_discard") } default: c.logger.LogSkip("crawl", r.URL, "classification_discard") } } } // extractFromSnippet extracts contacts from snippet/title text and logs everything. func (c *Collector) extractFromSnippet(text, title, sourceURL string, callback func(plugin.MerchantData)) { contacts := extractor.ExtractAll(text) var usernames []string for _, info := range contacts { if info.TgUsername == "" { continue } usernames = append(usernames, info.TgUsername) md := plugin.MerchantData{ TgUsername: info.TgUsername, TgLink: "https://t.me/" + info.TgUsername, Website: info.Website, Email: info.Email, Phone: info.Phone, SourceType: "web", SourceName: title, SourceURL: sourceURL, OriginalText: text, } c.logger.LogMerchantFound(md, "snippet_regex", 0, "") callback(md) } // Always log snippet extraction — even if empty (for audit: "we looked, nothing found") c.logger.LogSnippetExtract(sourceURL, text, usernames) } // rotateProxy switches to the next proxy in the pool (if pool mode). // Returns the proxy URL being used (for health reporting). func (c *Collector) rotateProxy() string { if c.proxyPool == nil { return "" } nextURL := c.proxyPool.Next() if nextURL != "" { c.static.SetProxy(nextURL) log.Printf("[web_collector] rotated to proxy: %s", nextURL) } return nextURL } // reportProxyResult reports success/failure to the proxy pool for a specific proxy. func (c *Collector) reportProxyResult(proxyURL string, err error) { if c.proxyPool == nil || proxyURL == "" { return } if err != nil { c.proxyPool.ReportFailure(proxyURL) } else { c.proxyPool.ReportSuccess(proxyURL) } } // crawlAndExtract fetches a page, extracts contacts, and follows sub-links. // depth tracks how deep we are from the original serper result. // parentURL tracks which page led us here. func (c *Collector) crawlAndExtract(ctx context.Context, pageURL, parentURL string, depth int, title string, callback func(plugin.MerchantData)) { if depth > 2 { c.logger.LogSkip("crawl", pageURL, "max_depth_exceeded") return } // Rotate proxy and capture which proxy is being used usedProxy := c.rotateProxy() // ── Fetch page ── t0 := time.Now() result := c.static.Crawl(ctx, pageURL) c.reportProxyResult(usedProxy, result.Error) if result.Error != nil || result.HTML == "" { // On failure with pool, try once more with next proxy if c.proxyPool != nil && result.Error != nil { usedProxy = c.rotateProxy() result = c.static.Crawl(ctx, pageURL) c.reportProxyResult(usedProxy, result.Error) } if result.Error != nil || result.HTML == "" { result = c.dynamic.Crawl(ctx, pageURL) } } dur := time.Since(t0) if result.Error != nil || result.HTML == "" { c.logger.LogCrawlPage(pageURL, parentURL, depth, "", nil, 0, result.Error, dur) return } // Content filter hasTgLinks := len(result.TgLinks) > 0 if !hasTgLinks { snippet := result.HTML if len(snippet) > 5000 { snippet = snippet[:5000] } if !extractor.ContainsChinese(snippet, 0) && !extractor.HasContact(snippet) { c.logger.LogCrawlPage(pageURL, parentURL, depth, snippet, nil, len(result.Links), nil, dur) c.logger.LogSkip("crawl", pageURL, "no_chinese_no_contact") return } } // ── Log crawl with content summary ── htmlSummary := result.HTML if len(htmlSummary) > 2000 { htmlSummary = htmlSummary[:2000] } c.logger.LogCrawlPage(pageURL, parentURL, depth, htmlSummary, result.TgLinks, len(result.Links), nil, dur) // ── Extract from t.me links in ── seenUsernames := map[string]bool{} for _, tgLink := range result.TgLinks { username := crawler.ExtractTGUsername(tgLink) if username == "" || seenUsernames[strings.ToLower(username)] { continue } seenUsernames[strings.ToLower(username)] = true md := plugin.MerchantData{ TgUsername: username, TgLink: "https://t.me/" + username, SourceType: "web", SourceName: title, SourceURL: pageURL, } c.logger.LogMerchantFound(md, "crawl_href", depth, parentURL) callback(md) } // ── Extract from page text ── allContacts := extractor.ExtractAll(result.HTML) var extractedNames []string for _, info := range allContacts { if info.TgUsername == "" || seenUsernames[strings.ToLower(info.TgUsername)] { continue } seenUsernames[strings.ToLower(info.TgUsername)] = true extractedNames = append(extractedNames, info.TgUsername) md := plugin.MerchantData{ TgUsername: info.TgUsername, TgLink: "https://t.me/" + info.TgUsername, Website: info.Website, Email: info.Email, Phone: info.Phone, SourceType: "web", SourceName: title, SourceURL: pageURL, } c.logger.LogMerchantFound(md, "crawl_text", depth, parentURL) callback(md) } // Log page extraction results contentSample := result.HTML if len(contentSample) > 1000 { contentSample = contentSample[:1000] } c.logger.LogPageExtract(pageURL, parentURL, depth, contentSample, extractedNames) // ── Follow sub-links to deeper pages (depth+1) ── if depth < 2 { subPages := collectSubPages(pageURL, result.Links) for _, link := range subPages { if c.stopped.Load() || ctx.Err() != nil { break } if strings.Contains(link, "t.me") || strings.Contains(link, "telegram.me") { continue } if crawler.RuleFilter(link) == crawler.FilterDiscard { continue } c.crawlAndExtract(ctx, link, pageURL, depth+1, title, callback) } } } // collectSubPages picks sub-pages worth crawling from a page's links. // Prioritizes contact/about/support pages plus same-domain internal links. func collectSubPages(baseURL string, links []string) []string { baseDomain := extractDomain(baseURL) if baseDomain == "" { return nil } // Priority paths contactPaths := []string{"/contact", "/contact-us", "/about", "/about-us", "/support", "/faq", "/help"} var priority, sameDomain []string seen := map[string]bool{baseURL: true} for _, link := range links { if seen[link] { continue } seen[link] = true linkDomain := extractDomain(link) if linkDomain != baseDomain { continue } lower := strings.ToLower(link) isPriority := false for _, p := range contactPaths { if strings.Contains(lower, p) { priority = append(priority, link) isPriority = true break } } if !isPriority && len(sameDomain) < 5 { sameDomain = append(sameDomain, link) } } result := append(priority, sameDomain...) if len(result) > 10 { result = result[:10] } return result } func expandSearchQueries(keywords []string) []string { suffixes := []string{ "", " telegram", " t.me", " 电报", " 联系方式 telegram", } seen := map[string]bool{} var queries []string for _, kw := range keywords { for _, suffix := range suffixes { q := kw + suffix if !seen[q] { seen[q] = true queries = append(queries, q) } } } return queries } func isBlacklistDomain(u string) bool { bl := []string{"youtube.com", "google.com", "twitter.com", "facebook.com", "instagram.com", "bit.ly", "gstatic.com", "wikipedia.org", "x.com"} lower := strings.ToLower(u) for _, b := range bl { if strings.Contains(lower, b) { return true } } return false } var reTGUsername = regexp.MustCompile(`(?:t(?:elegram)?\.me)/([a-zA-Z][a-zA-Z0-9_]{4,31})`) var reURL = regexp.MustCompile(`https?://[^\s<>"'\x{4e00}-\x{9fa5}]+`) func extractTGUsername(rawURL string) string { m := reTGUsername.FindStringSubmatch(rawURL) if len(m) > 1 { return m[1] } return "" } func extractDomain(rawURL string) string { u, err := url.Parse(rawURL) if err != nil { return "" } return u.Hostname() }