| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448 |
- package webcollector
- import (
- "context"
- "log"
- "net/url"
- "regexp"
- "strings"
- "sync/atomic"
- "time"
- "spider/internal/crawler"
- "spider/internal/extractor"
- "spider/internal/plugin"
- proxypool "spider/internal/proxy"
- "spider/internal/search"
- )
- // Collector implements plugin.Collector for web-based merchant collection.
- type Collector struct {
- serper *search.SerperClient
- static *crawler.StaticCrawler
- dynamic *crawler.DynamicCrawler
- tmeValidator *crawler.TMeValidator
- stopped atomic.Bool
- logger plugin.TaskLogger
- proxyPool *proxypool.Pool
- }
- func New(serper *search.SerperClient) *Collector {
- return &Collector{
- serper: serper,
- static: crawler.NewStaticCrawler(),
- dynamic: crawler.NewDynamicCrawler(),
- tmeValidator: crawler.NewTMeValidator(),
- logger: plugin.NopLogger(),
- }
- }
- func (c *Collector) Name() string { return "web_collector" }
- func (c *Collector) SetLogger(l plugin.TaskLogger) { c.logger = l }
- func (c *Collector) Stop() error {
- c.stopped.Store(true)
- return nil
- }
- func (c *Collector) Run(ctx context.Context, cfg map[string]any, callback func(plugin.MerchantData)) error {
- c.stopped.Store(false)
- // Apply proxy pool or single proxy
- if pool, ok := cfg["proxy_pool"].(*proxypool.Pool); ok && pool != nil {
- c.proxyPool = pool
- log.Printf("[web_collector] using proxy pool with %d proxies", pool.Size())
- } else if proxyURL, ok := cfg["proxy_url"].(string); ok && proxyURL != "" {
- c.static.SetProxy(proxyURL)
- log.Printf("[web_collector] using proxy: %s", proxyURL)
- }
- if c.serper == nil {
- log.Println("[web_collector] no serper client configured, skipping")
- return nil
- }
- keywords, _ := cfg["keywords"].([]string)
- if len(keywords) == 0 {
- log.Println("[web_collector] no keywords provided")
- return nil
- }
- queries := expandSearchQueries(keywords)
- for _, q := range queries {
- if c.stopped.Load() || ctx.Err() != nil {
- break
- }
- // Rotate proxy for each query if using pool
- c.rotateProxy()
- log.Printf("[web_collector] searching: %s", q)
- // ── Organic search ──
- t0 := time.Now()
- results, err := c.serper.Search(ctx, q)
- if err != nil {
- log.Printf("[web_collector] search error: %v", err)
- c.logger.LogError("search", "", err.Error())
- } else {
- log.Printf("[web_collector] organic search %q: %d results in %dms", q, len(results), time.Since(t0).Milliseconds())
- for i, r := range results {
- c.logger.LogSearchResult(q+" [organic]", i+1, r.Title, r.URL, r.Snippet)
- }
- c.processResults(ctx, results, q, callback)
- }
- time.Sleep(1 * time.Second)
- // ── Video search ──
- if c.stopped.Load() || ctx.Err() != nil {
- break
- }
- videoResults, err := c.serper.SearchVideos(ctx, q)
- if err != nil {
- c.logger.LogError("search_videos", "", err.Error())
- } else {
- for i, r := range videoResults {
- c.logger.LogSearchResult(q+" [video]", i+1, r.Title, r.URL, r.Snippet)
- }
- c.processResults(ctx, videoResults, q, callback)
- }
- select {
- case <-ctx.Done():
- return nil
- case <-time.After(2 * time.Second):
- }
- }
- log.Println("[web_collector] done")
- return nil
- }
- // processResults handles search results with full logging at every node.
- func (c *Collector) processResults(ctx context.Context, results []search.SearchResult, query string, callback func(plugin.MerchantData)) {
- for _, r := range results {
- if c.stopped.Load() || ctx.Err() != nil {
- break
- }
- // ── Node 1: Extract from snippet text ──
- snippetText := r.Title + " " + r.Snippet
- c.extractFromSnippet(snippetText, r.Title, r.URL, callback)
- // ── Node 2: Extract URLs from snippet → crawl them ──
- snippetURLs := reURL.FindAllString(r.Snippet, -1)
- for _, sURL := range snippetURLs {
- if c.stopped.Load() || ctx.Err() != nil {
- break
- }
- sURL = strings.TrimRight(sURL, ".,;)\"'")
- if strings.Contains(sURL, "t.me/") || strings.Contains(sURL, "telegram.me/") {
- username := extractTGUsername(sURL)
- if username != "" {
- md := plugin.MerchantData{
- TgUsername: username, TgLink: "https://t.me/" + username,
- SourceType: "web", SourceName: r.Title, SourceURL: r.URL,
- }
- c.logger.LogMerchantFound(md, "snippet_tme_url", 0, r.URL)
- callback(md)
- }
- continue
- }
- if isBlacklistDomain(sURL) {
- c.logger.LogSkip("crawl_snippet_url", sURL, "blacklisted_domain")
- continue
- }
- // Crawl URLs found inside snippets — depth=1, parent is the serper result
- c.crawlAndExtract(ctx, sURL, r.URL, 1, r.Title, callback)
- }
- // ── Node 3: Crawl the result URL itself ──
- classification := search.ClassifyURL(r.URL)
- c.logger.LogSkip("classify", r.URL, classification) // log classification decision
- switch classification {
- case "tg_channel":
- username := extractTGUsername(r.URL)
- if username != "" {
- md := plugin.MerchantData{
- TgUsername: username, TgLink: "https://t.me/" + username,
- SourceType: "web", SourceName: r.Title, SourceURL: r.URL,
- }
- c.logger.LogMerchantFound(md, "direct_tme_link", 0, "")
- callback(md)
- }
- case "nav_site", "web_page":
- if crawler.RuleFilter(r.URL) != crawler.FilterDiscard {
- c.crawlAndExtract(ctx, r.URL, "", 0, r.Title, callback)
- } else {
- c.logger.LogSkip("crawl", r.URL, "rule_filter_discard")
- }
- default:
- c.logger.LogSkip("crawl", r.URL, "classification_discard")
- }
- }
- }
- // extractFromSnippet extracts contacts from snippet/title text and logs everything.
- func (c *Collector) extractFromSnippet(text, title, sourceURL string, callback func(plugin.MerchantData)) {
- contacts := extractor.ExtractAll(text)
- var usernames []string
- for _, info := range contacts {
- if info.TgUsername == "" {
- continue
- }
- usernames = append(usernames, info.TgUsername)
- md := plugin.MerchantData{
- TgUsername: info.TgUsername, TgLink: "https://t.me/" + info.TgUsername,
- Website: info.Website, Email: info.Email, Phone: info.Phone,
- SourceType: "web", SourceName: title, SourceURL: sourceURL,
- OriginalText: text,
- }
- c.logger.LogMerchantFound(md, "snippet_regex", 0, "")
- callback(md)
- }
- // Always log snippet extraction — even if empty (for audit: "we looked, nothing found")
- c.logger.LogSnippetExtract(sourceURL, text, usernames)
- }
- // rotateProxy switches to the next proxy in the pool (if pool mode).
- // Returns the proxy URL being used (for health reporting).
- func (c *Collector) rotateProxy() string {
- if c.proxyPool == nil {
- return ""
- }
- nextURL := c.proxyPool.Next()
- if nextURL != "" {
- c.static.SetProxy(nextURL)
- log.Printf("[web_collector] rotated to proxy: %s", nextURL)
- }
- return nextURL
- }
- // reportProxyResult reports success/failure to the proxy pool for a specific proxy.
- func (c *Collector) reportProxyResult(proxyURL string, err error) {
- if c.proxyPool == nil || proxyURL == "" {
- return
- }
- if err != nil {
- c.proxyPool.ReportFailure(proxyURL)
- } else {
- c.proxyPool.ReportSuccess(proxyURL)
- }
- }
- // crawlAndExtract fetches a page, extracts contacts, and follows sub-links.
- // depth tracks how deep we are from the original serper result.
- // parentURL tracks which page led us here.
- func (c *Collector) crawlAndExtract(ctx context.Context, pageURL, parentURL string, depth int, title string, callback func(plugin.MerchantData)) {
- if depth > 2 {
- c.logger.LogSkip("crawl", pageURL, "max_depth_exceeded")
- return
- }
- // Rotate proxy and capture which proxy is being used
- usedProxy := c.rotateProxy()
- // ── Fetch page ──
- t0 := time.Now()
- result := c.static.Crawl(ctx, pageURL)
- c.reportProxyResult(usedProxy, result.Error)
- if result.Error != nil || result.HTML == "" {
- // On failure with pool, try once more with next proxy
- if c.proxyPool != nil && result.Error != nil {
- usedProxy = c.rotateProxy()
- result = c.static.Crawl(ctx, pageURL)
- c.reportProxyResult(usedProxy, result.Error)
- }
- if result.Error != nil || result.HTML == "" {
- result = c.dynamic.Crawl(ctx, pageURL)
- }
- }
- dur := time.Since(t0)
- if result.Error != nil || result.HTML == "" {
- c.logger.LogCrawlPage(pageURL, parentURL, depth, "", nil, 0, result.Error, dur)
- return
- }
- // Content filter
- hasTgLinks := len(result.TgLinks) > 0
- if !hasTgLinks {
- snippet := result.HTML
- if len(snippet) > 5000 {
- snippet = snippet[:5000]
- }
- if !extractor.ContainsChinese(snippet, 0) && !extractor.HasContact(snippet) {
- c.logger.LogCrawlPage(pageURL, parentURL, depth, snippet, nil, len(result.Links), nil, dur)
- c.logger.LogSkip("crawl", pageURL, "no_chinese_no_contact")
- return
- }
- }
- // ── Log crawl with content summary ──
- htmlSummary := result.HTML
- if len(htmlSummary) > 2000 {
- htmlSummary = htmlSummary[:2000]
- }
- c.logger.LogCrawlPage(pageURL, parentURL, depth, htmlSummary, result.TgLinks, len(result.Links), nil, dur)
- // ── Extract from t.me links in <a href> ──
- seenUsernames := map[string]bool{}
- for _, tgLink := range result.TgLinks {
- username := crawler.ExtractTGUsername(tgLink)
- if username == "" || seenUsernames[strings.ToLower(username)] {
- continue
- }
- seenUsernames[strings.ToLower(username)] = true
- md := plugin.MerchantData{
- TgUsername: username, TgLink: "https://t.me/" + username,
- SourceType: "web", SourceName: title, SourceURL: pageURL,
- }
- c.logger.LogMerchantFound(md, "crawl_href", depth, parentURL)
- callback(md)
- }
- // ── Extract from page text ──
- allContacts := extractor.ExtractAll(result.HTML)
- var extractedNames []string
- for _, info := range allContacts {
- if info.TgUsername == "" || seenUsernames[strings.ToLower(info.TgUsername)] {
- continue
- }
- seenUsernames[strings.ToLower(info.TgUsername)] = true
- extractedNames = append(extractedNames, info.TgUsername)
- md := plugin.MerchantData{
- TgUsername: info.TgUsername, TgLink: "https://t.me/" + info.TgUsername,
- Website: info.Website, Email: info.Email, Phone: info.Phone,
- SourceType: "web", SourceName: title, SourceURL: pageURL,
- }
- c.logger.LogMerchantFound(md, "crawl_text", depth, parentURL)
- callback(md)
- }
- // Log page extraction results
- contentSample := result.HTML
- if len(contentSample) > 1000 {
- contentSample = contentSample[:1000]
- }
- c.logger.LogPageExtract(pageURL, parentURL, depth, contentSample, extractedNames)
- // ── Follow sub-links to deeper pages (depth+1) ──
- if depth < 2 {
- subPages := collectSubPages(pageURL, result.Links)
- for _, link := range subPages {
- if c.stopped.Load() || ctx.Err() != nil {
- break
- }
- if strings.Contains(link, "t.me") || strings.Contains(link, "telegram.me") {
- continue
- }
- if crawler.RuleFilter(link) == crawler.FilterDiscard {
- continue
- }
- c.crawlAndExtract(ctx, link, pageURL, depth+1, title, callback)
- }
- }
- }
- // collectSubPages picks sub-pages worth crawling from a page's links.
- // Prioritizes contact/about/support pages plus same-domain internal links.
- func collectSubPages(baseURL string, links []string) []string {
- baseDomain := extractDomain(baseURL)
- if baseDomain == "" {
- return nil
- }
- // Priority paths
- contactPaths := []string{"/contact", "/contact-us", "/about", "/about-us", "/support", "/faq", "/help"}
- var priority, sameDomain []string
- seen := map[string]bool{baseURL: true}
- for _, link := range links {
- if seen[link] {
- continue
- }
- seen[link] = true
- linkDomain := extractDomain(link)
- if linkDomain != baseDomain {
- continue
- }
- lower := strings.ToLower(link)
- isPriority := false
- for _, p := range contactPaths {
- if strings.Contains(lower, p) {
- priority = append(priority, link)
- isPriority = true
- break
- }
- }
- if !isPriority && len(sameDomain) < 5 {
- sameDomain = append(sameDomain, link)
- }
- }
- result := append(priority, sameDomain...)
- if len(result) > 10 {
- result = result[:10]
- }
- return result
- }
- func expandSearchQueries(keywords []string) []string {
- suffixes := []string{
- "",
- " telegram",
- " t.me",
- " 电报",
- " 联系方式 telegram",
- }
- seen := map[string]bool{}
- var queries []string
- for _, kw := range keywords {
- for _, suffix := range suffixes {
- q := kw + suffix
- if !seen[q] {
- seen[q] = true
- queries = append(queries, q)
- }
- }
- }
- return queries
- }
- func isBlacklistDomain(u string) bool {
- bl := []string{"youtube.com", "google.com", "twitter.com", "facebook.com",
- "instagram.com", "bit.ly", "gstatic.com", "wikipedia.org", "x.com"}
- lower := strings.ToLower(u)
- for _, b := range bl {
- if strings.Contains(lower, b) {
- return true
- }
- }
- return false
- }
- var reTGUsername = regexp.MustCompile(`(?:t(?:elegram)?\.me)/([a-zA-Z][a-zA-Z0-9_]{4,31})`)
- var reURL = regexp.MustCompile(`https?://[^\s<>"'\x{4e00}-\x{9fa5}]+`)
- func extractTGUsername(rawURL string) string {
- m := reTGUsername.FindStringSubmatch(rawURL)
- if len(m) > 1 {
- return m[1]
- }
- return ""
- }
- func extractDomain(rawURL string) string {
- u, err := url.Parse(rawURL)
- if err != nil {
- return ""
- }
- return u.Hostname()
- }
|