package tgcollector import ( "context" "fmt" "log" "regexp" "strings" "sync/atomic" "time" "spider/internal/extractor" "spider/internal/llm" "spider/internal/model" "spider/internal/plugin" proxypool "spider/internal/proxy" "spider/internal/store" "spider/internal/telegram" ) // Collector implements plugin.Collector for TG channel collection. // Combines BFS channel discovery (from seeds) + message scraping. // AI: regex first, LLM fallback only for non-standard contact formats. type Collector struct { tgManager *telegram.AccountManager llmClient *llm.Client // can be nil store *store.Store stopped atomic.Bool logger plugin.TaskLogger proxyPool *proxypool.Pool } // New creates a new TG collector. func New(tgManager *telegram.AccountManager, llmClient *llm.Client, s *store.Store) *Collector { return &Collector{ tgManager: tgManager, llmClient: llmClient, store: s, logger: plugin.NopLogger(), } } func (c *Collector) Name() string { return "tg_collector" } func (c *Collector) SetLogger(l plugin.TaskLogger) { c.logger = l } func (c *Collector) Stop() error { c.stopped.Store(true) return nil } // Run executes the TG collection pipeline: // 1. BFS discover channels from seeds // 2. Scrape messages from discovered channels // 3. Extract merchants via regex (+ LLM fallback) // // cfg keys: // - "seeds": []string — seed channel names // - "max_depth": int — BFS max depth (default 3) // - "max_channels": int — max channels to discover (default 500) // - "message_limit": int — messages per channel (default 500) func (c *Collector) Run(ctx context.Context, cfg map[string]any, callback func(plugin.MerchantData)) error { c.stopped.Store(false) // Proxy pool rotation: rotate proxy on each account acquire var pool *proxypool.Pool if p, ok := cfg["proxy_pool"].(*proxypool.Pool); ok && p != nil { pool = p log.Printf("[tg_collector] using proxy pool with %d proxies", pool.Size()) // Set initial proxy if next := pool.Next(); next != "" { c.tgManager.SetProxy(next) } } else if proxyURL, ok := cfg["proxy_url"].(string); ok && proxyURL != "" { log.Printf("[tg_collector] proxy configured: %s (pass to TG account manager)", proxyURL) c.tgManager.SetProxy(proxyURL) } c.proxyPool = pool if c.tgManager == nil { log.Println("[tg_collector] no TG account manager, skipping") return nil } seeds, _ := cfg["seeds"].([]string) if len(seeds) == 0 { log.Println("[tg_collector] no seeds provided") return nil } maxDepth := getIntCfg(cfg, "max_depth", 3) maxChannels := getIntCfg(cfg, "max_channels", 500) msgLimit := getIntCfg(cfg, "message_limit", 500) // Phase 1: BFS channel discovery t0 := time.Now() channels := c.discover(ctx, seeds, maxDepth, maxChannels) log.Printf("[tg_collector] discovered %d channels", len(channels)) c.logger.LogSearchResult("BFS discover", 0, fmt.Sprintf("discovered %d channels from seeds", len(channels)), strings.Join(seeds, ","), fmt.Sprintf("duration: %v", time.Since(t0))) // Phase 2: Scrape each channel for i, ch := range channels { if c.stopped.Load() || ctx.Err() != nil { break } log.Printf("[tg_collector] scraping %d/%d: @%s", i+1, len(channels), ch) c.logger.LogCrawlPage("tg://"+ch, "", 0, fmt.Sprintf("scraping channel %d/%d", i+1, len(channels)), nil, 0, nil, 0) c.scrapeChannel(ctx, ch, msgLimit, callback) // Delay between channels select { case <-ctx.Done(): return nil case <-time.After(5 * time.Second): } } log.Println("[tg_collector] done") return nil } type queueItem struct { Username string Depth int Source string } func (c *Collector) discover(ctx context.Context, seeds []string, maxDepth, maxTotal int) []string { queue := make([]queueItem, 0, len(seeds)) for _, s := range seeds { queue = append(queue, queueItem{Username: cleanUsername(s), Depth: 0, Source: "seed"}) } visited := map[string]bool{} var result []string for len(queue) > 0 && len(result) < maxTotal { if c.stopped.Load() || ctx.Err() != nil { break } item := queue[0] queue = queue[1:] username := cleanUsername(item.Username) if username == "" || visited[username] { continue } visited[username] = true // Rotate proxy before each channel in BFS c.rotateProxy() acc, err := c.tgManager.Acquire(ctx) if err != nil { log.Printf("[tg_collector] no available TG account: %v, waiting 30s before retry", err) select { case <-ctx.Done(): return result case <-time.After(30 * time.Second): } acc, err = c.tgManager.Acquire(ctx) if err != nil { log.Printf("[tg_collector] still no available TG account after retry: %v, stopping discovery", err) break } } if err := acc.Client.Connect(ctx); err != nil { log.Printf("[tg_collector] connect failed: %v", err) c.tgManager.Release(acc, 0) continue } _, err = acc.Client.GetChannelInfo(ctx, username) if err != nil { if fw, ok := err.(*telegram.FloodWaitError); ok { c.tgManager.HandleFloodWait(acc, fw.Seconds) } else { c.tgManager.Release(acc, 0) } continue } // Save channel to DB c.store.UpsertChannel(&model.Channel{ Username: username, Source: item.Source, Status: "pending", }) result = append(result, username) // BFS: read messages to find more channels if item.Depth < maxDepth { msgs, err := acc.Client.GetMessages(ctx, username, 0, 100) if err == nil { for _, msg := range msgs { if msg.ForwardFromChannel != "" { fwd := cleanUsername(msg.ForwardFromChannel) if fwd != "" && !visited[fwd] { queue = append(queue, queueItem{fwd, item.Depth + 1, "snowball"}) } } for _, link := range msg.Links { name := extractUsernameFromLink(link) if name != "" && !visited[name] { queue = append(queue, queueItem{name, item.Depth + 1, "snowball"}) } } } } } c.tgManager.Release(acc, 0) select { case <-ctx.Done(): return result case <-time.After(5 * time.Second): } } return result } func (c *Collector) scrapeChannel(ctx context.Context, username string, msgLimit int, callback func(plugin.MerchantData)) { // Rotate proxy before each channel scrape c.rotateProxy() acc, err := c.tgManager.Acquire(ctx) if err != nil { log.Printf("[tg_collector] scrape %s: no available account: %v, waiting 30s", username, err) select { case <-ctx.Done(): return case <-time.After(30 * time.Second): } acc, err = c.tgManager.Acquire(ctx) if err != nil { log.Printf("[tg_collector] scrape %s: still no account, skipping", username) return } } if err := acc.Client.Connect(ctx); err != nil { c.tgManager.Release(acc, 0) return } // Read pinned messages pinnedMsgs, _ := acc.Client.GetPinnedMessages(ctx, username) c.processMessages(ctx, pinnedMsgs, username, callback) // Read historical messages offsetID := 0 fetched := 0 for fetched < msgLimit { if c.stopped.Load() || ctx.Err() != nil { break } batchSize := 100 if msgLimit-fetched < batchSize { batchSize = msgLimit - fetched } msgs, err := acc.Client.GetMessages(ctx, username, offsetID, batchSize) if err != nil { if fw, ok := err.(*telegram.FloodWaitError); ok { c.tgManager.HandleFloodWait(acc, fw.Seconds) acc = nil } break } if len(msgs) == 0 { break } c.processMessages(ctx, msgs, username, callback) offsetID = msgs[len(msgs)-1].ID fetched += len(msgs) } if acc != nil { c.tgManager.Release(acc, 0) } // Update channel status c.store.DB.Model(&model.Channel{}).Where("username = ?", username). Update("status", "scraped") } func (c *Collector) processMessages(ctx context.Context, msgs []telegram.Message, channelUsername string, callback func(plugin.MerchantData)) { channelLower := strings.ToLower(channelUsername) for _, msg := range msgs { if msg.IsService { continue } // ── Path 1: sender-based (group chats) ── // If the message has a sender username, record that person directly. // This is the correct way to collect users who post in a chat group. if msg.SenderUsername != "" && strings.ToLower(msg.SenderUsername) != channelLower { md := plugin.MerchantData{ TgUsername: msg.SenderUsername, TgLink: "https://t.me/" + msg.SenderUsername, SourceType: "tg_group", SourceName: channelUsername, SourceURL: "https://t.me/" + channelUsername, OriginalText: msg.Text, GroupUsername: channelUsername, } c.logger.LogMerchantFound(md, "tg_sender", 0, "tg://"+channelUsername) callback(md) continue } // ── Path 2: text-based extraction (broadcast channels) ── // Only run when there's no sender (broadcast channel posts) and text is non-empty. if msg.Text == "" || !extractor.HasContact(msg.Text) { continue } info := extractor.Extract(msg.Text) merchantName := "" industry := "" // LLM fallback only when regex found no TG username if info.TgUsername == "" && c.llmClient != nil { merchantInfo, err := c.llmClient.ParseMerchant(ctx, msg.Text) if err == nil && merchantInfo != nil && merchantInfo.TgUsername != "" { info.TgUsername = strings.TrimPrefix(merchantInfo.TgUsername, "@") if merchantInfo.Website != "" && info.Website == "" { info.Website = merchantInfo.Website } if merchantInfo.Email != "" && info.Email == "" { info.Email = merchantInfo.Email } if merchantInfo.Phone != "" && info.Phone == "" { info.Phone = merchantInfo.Phone } merchantName = extractor.CleanMerchantName(merchantInfo.MerchantName) industry = merchantInfo.Industry } } // Skip if still no username, or if it's the channel itself (self-referencing link) if info.TgUsername == "" || strings.ToLower(info.TgUsername) == channelLower { continue } md := plugin.MerchantData{ TgUsername: info.TgUsername, TgLink: "https://t.me/" + info.TgUsername, MerchantName: merchantName, Website: info.Website, Email: info.Email, Phone: info.Phone, SourceType: "tg_channel", SourceName: channelUsername, SourceURL: "https://t.me/" + channelUsername, OriginalText: msg.Text, IndustryTag: industry, GroupUsername: channelUsername, } c.logger.LogMerchantFound(md, "tg_message_extract", 0, "tg://"+channelUsername) callback(md) } } // rotateProxy switches to the next proxy in the pool for the TG account manager. func (c *Collector) rotateProxy() { if c.proxyPool == nil { return } next := c.proxyPool.Next() if next != "" { c.tgManager.SetProxy(next) log.Printf("[tg_collector] rotated to proxy: %s (active: %d/%d)", next, c.proxyPool.ActiveCount(), c.proxyPool.Size()) } else { log.Printf("[tg_collector] proxy pool exhausted, all proxies disabled") } } func cleanUsername(s string) string { return strings.TrimPrefix(strings.TrimSpace(s), "@") } var reUsernameFromLink = regexp.MustCompile(`t(?:elegram)?\.me/([a-zA-Z][a-zA-Z0-9_]{4,31})`) func extractUsernameFromLink(link string) string { m := reUsernameFromLink.FindStringSubmatch(link) if len(m) > 1 { return m[1] } return "" } func getIntCfg(cfg map[string]any, key string, def int) int { if v, ok := cfg[key].(int); ok { return v } if v, ok := cfg[key].(float64); ok { return int(v) } return def }