package tgcollector import ( "context" "log" "regexp" "strings" "sync/atomic" "time" "spider/internal/extractor" "spider/internal/llm" "spider/internal/model" "spider/internal/plugin" "spider/internal/store" "spider/internal/telegram" ) // Collector implements plugin.Collector for TG channel collection. // Combines BFS channel discovery (from seeds) + message scraping. // AI: regex first, LLM fallback only for non-standard contact formats. type Collector struct { tgManager *telegram.AccountManager llmClient *llm.Client // can be nil store *store.Store stopped atomic.Bool } // New creates a new TG collector. func New(tgManager *telegram.AccountManager, llmClient *llm.Client, s *store.Store) *Collector { return &Collector{ tgManager: tgManager, llmClient: llmClient, store: s, } } func (c *Collector) Name() string { return "tg_collector" } func (c *Collector) Stop() error { c.stopped.Store(true) return nil } // Run executes the TG collection pipeline: // 1. BFS discover channels from seeds // 2. Scrape messages from discovered channels // 3. Extract merchants via regex (+ LLM fallback) // // cfg keys: // - "seeds": []string — seed channel names // - "max_depth": int — BFS max depth (default 3) // - "max_channels": int — max channels to discover (default 500) // - "message_limit": int — messages per channel (default 500) func (c *Collector) Run(ctx context.Context, cfg map[string]any, callback func(plugin.MerchantData)) error { c.stopped.Store(false) if c.tgManager == nil { log.Println("[tg_collector] no TG account manager, skipping") return nil } seeds, _ := cfg["seeds"].([]string) if len(seeds) == 0 { log.Println("[tg_collector] no seeds provided") return nil } maxDepth := getIntCfg(cfg, "max_depth", 3) maxChannels := getIntCfg(cfg, "max_channels", 500) msgLimit := getIntCfg(cfg, "message_limit", 500) // Phase 1: BFS channel discovery channels := c.discover(ctx, seeds, maxDepth, maxChannels) log.Printf("[tg_collector] discovered %d channels", len(channels)) // Phase 2: Scrape each channel for i, ch := range channels { if c.stopped.Load() || ctx.Err() != nil { break } log.Printf("[tg_collector] scraping %d/%d: @%s", i+1, len(channels), ch) c.scrapeChannel(ctx, ch, msgLimit, callback) // Delay between channels select { case <-ctx.Done(): return nil case <-time.After(5 * time.Second): } } log.Println("[tg_collector] done") return nil } type queueItem struct { Username string Depth int Source string } func (c *Collector) discover(ctx context.Context, seeds []string, maxDepth, maxTotal int) []string { queue := make([]queueItem, 0, len(seeds)) for _, s := range seeds { queue = append(queue, queueItem{Username: cleanUsername(s), Depth: 0, Source: "seed"}) } visited := map[string]bool{} var result []string for len(queue) > 0 && len(result) < maxTotal { if c.stopped.Load() || ctx.Err() != nil { break } item := queue[0] queue = queue[1:] username := cleanUsername(item.Username) if username == "" || visited[username] { continue } visited[username] = true acc, err := c.tgManager.Acquire(ctx) if err != nil { log.Printf("[tg_collector] no available TG account: %v", err) break } if err := acc.Client.Connect(ctx); err != nil { log.Printf("[tg_collector] connect failed: %v", err) c.tgManager.Release(acc, 0) continue } _, err = acc.Client.GetChannelInfo(ctx, username) if err != nil { if fw, ok := err.(*telegram.FloodWaitError); ok { c.tgManager.HandleFloodWait(acc, fw.Seconds) } else { c.tgManager.Release(acc, 0) } continue } // Save channel to DB c.store.UpsertChannel(&model.Channel{ Username: username, Source: item.Source, Status: "pending", }) result = append(result, username) // BFS: read messages to find more channels if item.Depth < maxDepth { msgs, err := acc.Client.GetMessages(ctx, username, 0, 100) if err == nil { for _, msg := range msgs { if msg.ForwardFromChannel != "" { fwd := cleanUsername(msg.ForwardFromChannel) if fwd != "" && !visited[fwd] { queue = append(queue, queueItem{fwd, item.Depth + 1, "snowball"}) } } for _, link := range msg.Links { name := extractUsernameFromLink(link) if name != "" && !visited[name] { queue = append(queue, queueItem{name, item.Depth + 1, "snowball"}) } } } } } c.tgManager.Release(acc, 0) select { case <-ctx.Done(): return result case <-time.After(5 * time.Second): } } return result } func (c *Collector) scrapeChannel(ctx context.Context, username string, msgLimit int, callback func(plugin.MerchantData)) { acc, err := c.tgManager.Acquire(ctx) if err != nil { return } if err := acc.Client.Connect(ctx); err != nil { c.tgManager.Release(acc, 0) return } // Read pinned messages pinnedMsgs, _ := acc.Client.GetPinnedMessages(ctx, username) c.processMessages(ctx, pinnedMsgs, username, callback) // Read historical messages offsetID := 0 fetched := 0 for fetched < msgLimit { if c.stopped.Load() || ctx.Err() != nil { break } batchSize := 100 if msgLimit-fetched < batchSize { batchSize = msgLimit - fetched } msgs, err := acc.Client.GetMessages(ctx, username, offsetID, batchSize) if err != nil { if fw, ok := err.(*telegram.FloodWaitError); ok { c.tgManager.HandleFloodWait(acc, fw.Seconds) acc = nil } break } if len(msgs) == 0 { break } c.processMessages(ctx, msgs, username, callback) offsetID = msgs[len(msgs)-1].ID fetched += len(msgs) } if acc != nil { c.tgManager.Release(acc, 0) } // Update channel status c.store.DB.Model(&model.Channel{}).Where("username = ?", username). Update("status", "scraped") } func (c *Collector) processMessages(ctx context.Context, msgs []telegram.Message, channelUsername string, callback func(plugin.MerchantData)) { for _, msg := range msgs { if msg.IsService || msg.Text == "" { continue } if !extractor.ContainsChinese(msg.Text, 0) { continue } if !extractor.HasContact(msg.Text) { continue } // Regex first info := extractor.Extract(msg.Text) merchantName := "" industry := "" // LLM fallback only when regex found no TG username if info.TgUsername == "" && c.llmClient != nil { merchantInfo, err := c.llmClient.ParseMerchant(ctx, msg.Text) if err == nil && merchantInfo != nil && merchantInfo.TgUsername != "" { info.TgUsername = strings.TrimPrefix(merchantInfo.TgUsername, "@") if merchantInfo.Website != "" && info.Website == "" { info.Website = merchantInfo.Website } if merchantInfo.Email != "" && info.Email == "" { info.Email = merchantInfo.Email } if merchantInfo.Phone != "" && info.Phone == "" { info.Phone = merchantInfo.Phone } merchantName = extractor.CleanMerchantName(merchantInfo.MerchantName) industry = merchantInfo.Industry } } if info.TgUsername == "" { continue } callback(plugin.MerchantData{ TgUsername: info.TgUsername, TgLink: "https://t.me/" + info.TgUsername, MerchantName: merchantName, Website: info.Website, Email: info.Email, Phone: info.Phone, SourceType: "tg_channel", SourceName: channelUsername, SourceURL: "https://t.me/" + channelUsername, OriginalText: msg.Text, IndustryTag: industry, }) } } func cleanUsername(s string) string { return strings.TrimPrefix(strings.TrimSpace(s), "@") } var reUsernameFromLink = regexp.MustCompile(`t(?:elegram)?\.me/([a-zA-Z][a-zA-Z0-9_]{4,31})`) func extractUsernameFromLink(link string) string { m := reUsernameFromLink.FindStringSubmatch(link) if len(m) > 1 { return m[1] } return "" } func getIntCfg(cfg map[string]any, key string, def int) int { if v, ok := cfg[key].(int); ok { return v } if v, ok := cfg[key].(float64); ok { return int(v) } return def }