collector.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448
  1. package webcollector
  2. import (
  3. "context"
  4. "log"
  5. "net/url"
  6. "regexp"
  7. "strings"
  8. "sync/atomic"
  9. "time"
  10. "spider/internal/crawler"
  11. "spider/internal/extractor"
  12. "spider/internal/plugin"
  13. proxypool "spider/internal/proxy"
  14. "spider/internal/search"
  15. )
  16. // Collector implements plugin.Collector for web-based merchant collection.
  17. type Collector struct {
  18. serper *search.SerperClient
  19. static *crawler.StaticCrawler
  20. dynamic *crawler.DynamicCrawler
  21. tmeValidator *crawler.TMeValidator
  22. stopped atomic.Bool
  23. logger plugin.TaskLogger
  24. proxyPool *proxypool.Pool
  25. }
  26. func New(serper *search.SerperClient) *Collector {
  27. return &Collector{
  28. serper: serper,
  29. static: crawler.NewStaticCrawler(),
  30. dynamic: crawler.NewDynamicCrawler(),
  31. tmeValidator: crawler.NewTMeValidator(),
  32. logger: plugin.NopLogger(),
  33. }
  34. }
  35. func (c *Collector) Name() string { return "web_collector" }
  36. func (c *Collector) SetLogger(l plugin.TaskLogger) { c.logger = l }
  37. func (c *Collector) Stop() error {
  38. c.stopped.Store(true)
  39. return nil
  40. }
  41. func (c *Collector) Run(ctx context.Context, cfg map[string]any, callback func(plugin.MerchantData)) error {
  42. c.stopped.Store(false)
  43. // Apply proxy pool or single proxy
  44. if pool, ok := cfg["proxy_pool"].(*proxypool.Pool); ok && pool != nil {
  45. c.proxyPool = pool
  46. log.Printf("[web_collector] using proxy pool with %d proxies", pool.Size())
  47. } else if proxyURL, ok := cfg["proxy_url"].(string); ok && proxyURL != "" {
  48. c.static.SetProxy(proxyURL)
  49. log.Printf("[web_collector] using proxy: %s", proxyURL)
  50. }
  51. if c.serper == nil {
  52. log.Println("[web_collector] no serper client configured, skipping")
  53. return nil
  54. }
  55. keywords, _ := cfg["keywords"].([]string)
  56. if len(keywords) == 0 {
  57. log.Println("[web_collector] no keywords provided")
  58. return nil
  59. }
  60. queries := expandSearchQueries(keywords)
  61. for _, q := range queries {
  62. if c.stopped.Load() || ctx.Err() != nil {
  63. break
  64. }
  65. // Rotate proxy for each query if using pool
  66. c.rotateProxy()
  67. log.Printf("[web_collector] searching: %s", q)
  68. // ── Organic search ──
  69. t0 := time.Now()
  70. results, err := c.serper.Search(ctx, q)
  71. if err != nil {
  72. log.Printf("[web_collector] search error: %v", err)
  73. c.logger.LogError("search", "", err.Error())
  74. } else {
  75. log.Printf("[web_collector] organic search %q: %d results in %dms", q, len(results), time.Since(t0).Milliseconds())
  76. for i, r := range results {
  77. c.logger.LogSearchResult(q+" [organic]", i+1, r.Title, r.URL, r.Snippet)
  78. }
  79. c.processResults(ctx, results, q, callback)
  80. }
  81. time.Sleep(1 * time.Second)
  82. // ── Video search ──
  83. if c.stopped.Load() || ctx.Err() != nil {
  84. break
  85. }
  86. videoResults, err := c.serper.SearchVideos(ctx, q)
  87. if err != nil {
  88. c.logger.LogError("search_videos", "", err.Error())
  89. } else {
  90. for i, r := range videoResults {
  91. c.logger.LogSearchResult(q+" [video]", i+1, r.Title, r.URL, r.Snippet)
  92. }
  93. c.processResults(ctx, videoResults, q, callback)
  94. }
  95. select {
  96. case <-ctx.Done():
  97. return nil
  98. case <-time.After(2 * time.Second):
  99. }
  100. }
  101. log.Println("[web_collector] done")
  102. return nil
  103. }
  104. // processResults handles search results with full logging at every node.
  105. func (c *Collector) processResults(ctx context.Context, results []search.SearchResult, query string, callback func(plugin.MerchantData)) {
  106. for _, r := range results {
  107. if c.stopped.Load() || ctx.Err() != nil {
  108. break
  109. }
  110. // ── Node 1: Extract from snippet text ──
  111. snippetText := r.Title + " " + r.Snippet
  112. c.extractFromSnippet(snippetText, r.Title, r.URL, callback)
  113. // ── Node 2: Extract URLs from snippet → crawl them ──
  114. snippetURLs := reURL.FindAllString(r.Snippet, -1)
  115. for _, sURL := range snippetURLs {
  116. if c.stopped.Load() || ctx.Err() != nil {
  117. break
  118. }
  119. sURL = strings.TrimRight(sURL, ".,;)\"'")
  120. if strings.Contains(sURL, "t.me/") || strings.Contains(sURL, "telegram.me/") {
  121. username := extractTGUsername(sURL)
  122. if username != "" {
  123. md := plugin.MerchantData{
  124. TgUsername: username, TgLink: "https://t.me/" + username,
  125. SourceType: "web", SourceName: r.Title, SourceURL: r.URL,
  126. }
  127. c.logger.LogMerchantFound(md, "snippet_tme_url", 0, r.URL)
  128. callback(md)
  129. }
  130. continue
  131. }
  132. if isBlacklistDomain(sURL) {
  133. c.logger.LogSkip("crawl_snippet_url", sURL, "blacklisted_domain")
  134. continue
  135. }
  136. // Crawl URLs found inside snippets — depth=1, parent is the serper result
  137. c.crawlAndExtract(ctx, sURL, r.URL, 1, r.Title, callback)
  138. }
  139. // ── Node 3: Crawl the result URL itself ──
  140. classification := search.ClassifyURL(r.URL)
  141. c.logger.LogSkip("classify", r.URL, classification) // log classification decision
  142. switch classification {
  143. case "tg_channel":
  144. username := extractTGUsername(r.URL)
  145. if username != "" {
  146. md := plugin.MerchantData{
  147. TgUsername: username, TgLink: "https://t.me/" + username,
  148. SourceType: "web", SourceName: r.Title, SourceURL: r.URL,
  149. }
  150. c.logger.LogMerchantFound(md, "direct_tme_link", 0, "")
  151. callback(md)
  152. }
  153. case "nav_site", "web_page":
  154. if crawler.RuleFilter(r.URL) != crawler.FilterDiscard {
  155. c.crawlAndExtract(ctx, r.URL, "", 0, r.Title, callback)
  156. } else {
  157. c.logger.LogSkip("crawl", r.URL, "rule_filter_discard")
  158. }
  159. default:
  160. c.logger.LogSkip("crawl", r.URL, "classification_discard")
  161. }
  162. }
  163. }
  164. // extractFromSnippet extracts contacts from snippet/title text and logs everything.
  165. func (c *Collector) extractFromSnippet(text, title, sourceURL string, callback func(plugin.MerchantData)) {
  166. contacts := extractor.ExtractAll(text)
  167. var usernames []string
  168. for _, info := range contacts {
  169. if info.TgUsername == "" {
  170. continue
  171. }
  172. usernames = append(usernames, info.TgUsername)
  173. md := plugin.MerchantData{
  174. TgUsername: info.TgUsername, TgLink: "https://t.me/" + info.TgUsername,
  175. Website: info.Website, Email: info.Email, Phone: info.Phone,
  176. SourceType: "web", SourceName: title, SourceURL: sourceURL,
  177. OriginalText: text,
  178. }
  179. c.logger.LogMerchantFound(md, "snippet_regex", 0, "")
  180. callback(md)
  181. }
  182. // Always log snippet extraction — even if empty (for audit: "we looked, nothing found")
  183. c.logger.LogSnippetExtract(sourceURL, text, usernames)
  184. }
  185. // rotateProxy switches to the next proxy in the pool (if pool mode).
  186. // Returns the proxy URL being used (for health reporting).
  187. func (c *Collector) rotateProxy() string {
  188. if c.proxyPool == nil {
  189. return ""
  190. }
  191. nextURL := c.proxyPool.Next()
  192. if nextURL != "" {
  193. c.static.SetProxy(nextURL)
  194. log.Printf("[web_collector] rotated to proxy: %s", nextURL)
  195. }
  196. return nextURL
  197. }
  198. // reportProxyResult reports success/failure to the proxy pool for a specific proxy.
  199. func (c *Collector) reportProxyResult(proxyURL string, err error) {
  200. if c.proxyPool == nil || proxyURL == "" {
  201. return
  202. }
  203. if err != nil {
  204. c.proxyPool.ReportFailure(proxyURL)
  205. } else {
  206. c.proxyPool.ReportSuccess(proxyURL)
  207. }
  208. }
  209. // crawlAndExtract fetches a page, extracts contacts, and follows sub-links.
  210. // depth tracks how deep we are from the original serper result.
  211. // parentURL tracks which page led us here.
  212. func (c *Collector) crawlAndExtract(ctx context.Context, pageURL, parentURL string, depth int, title string, callback func(plugin.MerchantData)) {
  213. if depth > 2 {
  214. c.logger.LogSkip("crawl", pageURL, "max_depth_exceeded")
  215. return
  216. }
  217. // Rotate proxy and capture which proxy is being used
  218. usedProxy := c.rotateProxy()
  219. // ── Fetch page ──
  220. t0 := time.Now()
  221. result := c.static.Crawl(ctx, pageURL)
  222. c.reportProxyResult(usedProxy, result.Error)
  223. if result.Error != nil || result.HTML == "" {
  224. // On failure with pool, try once more with next proxy
  225. if c.proxyPool != nil && result.Error != nil {
  226. usedProxy = c.rotateProxy()
  227. result = c.static.Crawl(ctx, pageURL)
  228. c.reportProxyResult(usedProxy, result.Error)
  229. }
  230. if result.Error != nil || result.HTML == "" {
  231. result = c.dynamic.Crawl(ctx, pageURL)
  232. }
  233. }
  234. dur := time.Since(t0)
  235. if result.Error != nil || result.HTML == "" {
  236. c.logger.LogCrawlPage(pageURL, parentURL, depth, "", nil, 0, result.Error, dur)
  237. return
  238. }
  239. // Content filter
  240. hasTgLinks := len(result.TgLinks) > 0
  241. if !hasTgLinks {
  242. snippet := result.HTML
  243. if len(snippet) > 5000 {
  244. snippet = snippet[:5000]
  245. }
  246. if !extractor.ContainsChinese(snippet, 0) && !extractor.HasContact(snippet) {
  247. c.logger.LogCrawlPage(pageURL, parentURL, depth, snippet, nil, len(result.Links), nil, dur)
  248. c.logger.LogSkip("crawl", pageURL, "no_chinese_no_contact")
  249. return
  250. }
  251. }
  252. // ── Log crawl with content summary ──
  253. htmlSummary := result.HTML
  254. if len(htmlSummary) > 2000 {
  255. htmlSummary = htmlSummary[:2000]
  256. }
  257. c.logger.LogCrawlPage(pageURL, parentURL, depth, htmlSummary, result.TgLinks, len(result.Links), nil, dur)
  258. // ── Extract from t.me links in <a href> ──
  259. seenUsernames := map[string]bool{}
  260. for _, tgLink := range result.TgLinks {
  261. username := crawler.ExtractTGUsername(tgLink)
  262. if username == "" || seenUsernames[strings.ToLower(username)] {
  263. continue
  264. }
  265. seenUsernames[strings.ToLower(username)] = true
  266. md := plugin.MerchantData{
  267. TgUsername: username, TgLink: "https://t.me/" + username,
  268. SourceType: "web", SourceName: title, SourceURL: pageURL,
  269. }
  270. c.logger.LogMerchantFound(md, "crawl_href", depth, parentURL)
  271. callback(md)
  272. }
  273. // ── Extract from page text ──
  274. allContacts := extractor.ExtractAll(result.HTML)
  275. var extractedNames []string
  276. for _, info := range allContacts {
  277. if info.TgUsername == "" || seenUsernames[strings.ToLower(info.TgUsername)] {
  278. continue
  279. }
  280. seenUsernames[strings.ToLower(info.TgUsername)] = true
  281. extractedNames = append(extractedNames, info.TgUsername)
  282. md := plugin.MerchantData{
  283. TgUsername: info.TgUsername, TgLink: "https://t.me/" + info.TgUsername,
  284. Website: info.Website, Email: info.Email, Phone: info.Phone,
  285. SourceType: "web", SourceName: title, SourceURL: pageURL,
  286. }
  287. c.logger.LogMerchantFound(md, "crawl_text", depth, parentURL)
  288. callback(md)
  289. }
  290. // Log page extraction results
  291. contentSample := result.HTML
  292. if len(contentSample) > 1000 {
  293. contentSample = contentSample[:1000]
  294. }
  295. c.logger.LogPageExtract(pageURL, parentURL, depth, contentSample, extractedNames)
  296. // ── Follow sub-links to deeper pages (depth+1) ──
  297. if depth < 2 {
  298. subPages := collectSubPages(pageURL, result.Links)
  299. for _, link := range subPages {
  300. if c.stopped.Load() || ctx.Err() != nil {
  301. break
  302. }
  303. if strings.Contains(link, "t.me") || strings.Contains(link, "telegram.me") {
  304. continue
  305. }
  306. if crawler.RuleFilter(link) == crawler.FilterDiscard {
  307. continue
  308. }
  309. c.crawlAndExtract(ctx, link, pageURL, depth+1, title, callback)
  310. }
  311. }
  312. }
  313. // collectSubPages picks sub-pages worth crawling from a page's links.
  314. // Prioritizes contact/about/support pages plus same-domain internal links.
  315. func collectSubPages(baseURL string, links []string) []string {
  316. baseDomain := extractDomain(baseURL)
  317. if baseDomain == "" {
  318. return nil
  319. }
  320. // Priority paths
  321. contactPaths := []string{"/contact", "/contact-us", "/about", "/about-us", "/support", "/faq", "/help"}
  322. var priority, sameDomain []string
  323. seen := map[string]bool{baseURL: true}
  324. for _, link := range links {
  325. if seen[link] {
  326. continue
  327. }
  328. seen[link] = true
  329. linkDomain := extractDomain(link)
  330. if linkDomain != baseDomain {
  331. continue
  332. }
  333. lower := strings.ToLower(link)
  334. isPriority := false
  335. for _, p := range contactPaths {
  336. if strings.Contains(lower, p) {
  337. priority = append(priority, link)
  338. isPriority = true
  339. break
  340. }
  341. }
  342. if !isPriority && len(sameDomain) < 5 {
  343. sameDomain = append(sameDomain, link)
  344. }
  345. }
  346. result := append(priority, sameDomain...)
  347. if len(result) > 10 {
  348. result = result[:10]
  349. }
  350. return result
  351. }
  352. func expandSearchQueries(keywords []string) []string {
  353. suffixes := []string{
  354. "",
  355. " telegram",
  356. " t.me",
  357. " 电报",
  358. " 联系方式 telegram",
  359. }
  360. seen := map[string]bool{}
  361. var queries []string
  362. for _, kw := range keywords {
  363. for _, suffix := range suffixes {
  364. q := kw + suffix
  365. if !seen[q] {
  366. seen[q] = true
  367. queries = append(queries, q)
  368. }
  369. }
  370. }
  371. return queries
  372. }
  373. func isBlacklistDomain(u string) bool {
  374. bl := []string{"youtube.com", "google.com", "twitter.com", "facebook.com",
  375. "instagram.com", "bit.ly", "gstatic.com", "wikipedia.org", "x.com"}
  376. lower := strings.ToLower(u)
  377. for _, b := range bl {
  378. if strings.Contains(lower, b) {
  379. return true
  380. }
  381. }
  382. return false
  383. }
  384. var reTGUsername = regexp.MustCompile(`(?:t(?:elegram)?\.me)/([a-zA-Z][a-zA-Z0-9_]{4,31})`)
  385. var reURL = regexp.MustCompile(`https?://[^\s<>"'\x{4e00}-\x{9fa5}]+`)
  386. func extractTGUsername(rawURL string) string {
  387. m := reTGUsername.FindStringSubmatch(rawURL)
  388. if len(m) > 1 {
  389. return m[1]
  390. }
  391. return ""
  392. }
  393. func extractDomain(rawURL string) string {
  394. u, err := url.Parse(rawURL)
  395. if err != nil {
  396. return ""
  397. }
  398. return u.Hostname()
  399. }