package pipeline import ( "context" "log" "strings" "time" "github.com/redis/go-redis/v9" "gorm.io/gorm" "spider/internal/extractor" "spider/internal/llm" "spider/internal/model" "spider/internal/telegram" ) // ScrapePhase Phase 4: TG 消息采集 type ScrapePhase struct { db *gorm.DB tgManager *telegram.AccountManager llmClient *llm.Client settings Settings redis *redis.Client reporter ProgressReporter } // NewScrapePhase creates a new ScrapePhase. func NewScrapePhase(db *gorm.DB, tgManager *telegram.AccountManager, llmClient *llm.Client, settings Settings, rdb *redis.Client) *ScrapePhase { return &ScrapePhase{ db: db, tgManager: tgManager, llmClient: llmClient, settings: settings, redis: rdb, } } func (p *ScrapePhase) Name() string { return "scrape" } func (p *ScrapePhase) Run(ctx context.Context, task *model.Task, opts *Options) error { log.Printf("[scrape] starting, task_id=%d", task.ID) if p.tgManager == nil { log.Printf("[scrape] tgManager is nil, skipping") return nil } msgLimit := p.settings.GetInt(ctx, "tg_scraper.message_limit_per_channel", 500) delayMsg := p.settings.GetFloat(ctx, "tg_scraper.delay_per_message", 1.0) delayChannel := p.settings.GetFloat(ctx, "tg_scraper.delay_per_channel", 5.0) if opts.TestRun != nil && opts.TestRun.MessageLimit > 0 { msgLimit = opts.TestRun.MessageLimit } // 取 pending 频道 var channels []model.Channel q := p.db.Where("status = ?", "pending") if opts.TestRun != nil && opts.TestRun.ItemLimit > 0 { q = q.Limit(opts.TestRun.ItemLimit) } q.Find(&channels) total := len(channels) log.Printf("[scrape] found %d pending channels", total) for i, ch := range channels { if isContextDone(ctx) { break } if p.reporter != nil { p.reporter("scrape", i+1, total, "采集频道: @"+ch.Username) } acc, err := p.tgManager.Acquire(ctx) if err != nil { log.Printf("[scrape] no available account: %v", err) break } if err := acc.Client.Connect(ctx); err != nil { log.Printf("[scrape] connect failed: %v", err) p.tgManager.Release(acc, 0) p.db.Model(&ch).Update("status", "failed") continue } // LLM 相关性评估 if p.llmClient != nil { score, err := p.llmClient.EvalChannelRelevance(ctx, ch.Title, ch.About, ch.MemberCount) if err == nil && score < 0.5 { log.Printf("[scrape] skipping @%s, relevance score=%.2f", ch.Username, score) p.tgManager.Release(acc, 0) p.db.Model(&ch).Update("status", "skipped") continue } } // 读置顶消息 pinnedMsgs, _ := acc.Client.GetPinnedMessages(ctx, ch.Username) p.processMessages(ctx, pinnedMsgs, &ch, delayMsg) // 读历史消息(断点续传) offsetID := ch.LastMessageID fetched := 0 for fetched < msgLimit { if isContextDone(ctx) { break } batchSize := 100 if msgLimit-fetched < batchSize { batchSize = msgLimit - fetched } msgs, err := acc.Client.GetMessages(ctx, ch.Username, offsetID, batchSize) if err != nil { if fw, ok := err.(*telegram.FloodWaitError); ok { log.Printf("[scrape] FloodWait %ds on @%s", fw.Seconds, ch.Username) p.tgManager.HandleFloodWait(acc, fw.Seconds) acc = nil } else { log.Printf("[scrape] GetMessages @%s: %v", ch.Username, err) } break } if len(msgs) == 0 { break } p.processMessages(ctx, msgs, &ch, delayMsg) // 更新断点 lastID := msgs[len(msgs)-1].ID p.db.Model(&ch).Update("last_message_id", lastID) offsetID = lastID fetched += len(msgs) } if acc != nil { p.tgManager.Release(acc, 0) } p.db.Model(&ch).Update("status", "scraped") select { case <-ctx.Done(): return nil case <-time.After(time.Duration(float64(time.Second) * delayChannel)): } } log.Printf("[scrape] done") return nil } // processMessages 处理一批消息,提取商户写入 merchants_raw func (p *ScrapePhase) processMessages(ctx context.Context, msgs []telegram.Message, ch *model.Channel, delayMsg float64) { for _, msg := range msgs { if msg.IsService || msg.Text == "" { continue } if !extractor.ContainsChinese(msg.Text, 0) { continue } if !extractor.HasContact(msg.Text) { continue } // 快速去重(Redis SET NX key) if p.redis != nil { info := extractor.Extract(msg.Text) if info.TgUsername != "" { dedupKey := "spider:dedup:merchant:" + info.TgUsername set, _ := p.redis.SetNX(ctx, dedupKey, "1", 7*24*time.Hour).Result() if !set { continue // 已存在,跳过 } } } // LLM 精准解析 var merchantInfo *extractor.MerchantInfo if p.llmClient != nil { merchantInfo, _ = p.llmClient.ParseMerchant(ctx, msg.Text) } // Fallback 到正则 if merchantInfo == nil || merchantInfo.TgUsername == "" { info := extractor.Extract(msg.Text) merchantInfo = &extractor.MerchantInfo{ TgUsername: info.TgUsername, Website: info.Website, Email: info.Email, Phone: info.Phone, } } if merchantInfo.TgUsername == "" && merchantInfo.Website == "" { continue } raw := &model.MerchantRaw{ MerchantName: extractor.CleanMerchantName(merchantInfo.MerchantName), TgUsername: strings.TrimPrefix(merchantInfo.TgUsername, "@"), Website: merchantInfo.Website, Email: merchantInfo.Email, Phone: merchantInfo.Phone, Industry: merchantInfo.Industry, SourceType: "tg_scrape", SourceID: ch.Username, OriginalMessage: msg.Text, Status: "raw", } p.db.Create(raw) time.Sleep(time.Duration(float64(time.Second) * delayMsg)) } }