| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220 |
- package pipeline
- import (
- "context"
- "log"
- "strings"
- "time"
- "github.com/redis/go-redis/v9"
- "gorm.io/gorm"
- "spider/internal/extractor"
- "spider/internal/llm"
- "spider/internal/model"
- "spider/internal/telegram"
- )
- // ScrapePhase Phase 4: TG 消息采集
- type ScrapePhase struct {
- db *gorm.DB
- tgManager *telegram.AccountManager
- llmClient *llm.Client
- settings Settings
- redis *redis.Client
- reporter ProgressReporter
- }
- // NewScrapePhase creates a new ScrapePhase.
- func NewScrapePhase(db *gorm.DB, tgManager *telegram.AccountManager, llmClient *llm.Client, settings Settings, rdb *redis.Client) *ScrapePhase {
- return &ScrapePhase{
- db: db,
- tgManager: tgManager,
- llmClient: llmClient,
- settings: settings,
- redis: rdb,
- }
- }
- func (p *ScrapePhase) Name() string { return "scrape" }
- func (p *ScrapePhase) Run(ctx context.Context, task *model.Task, opts *Options) error {
- log.Printf("[scrape] starting, task_id=%d", task.ID)
- if p.tgManager == nil {
- log.Printf("[scrape] tgManager is nil, skipping")
- return nil
- }
- msgLimit := p.settings.GetInt(ctx, "tg_scraper.message_limit_per_channel", 500)
- delayMsg := p.settings.GetFloat(ctx, "tg_scraper.delay_per_message", 1.0)
- delayChannel := p.settings.GetFloat(ctx, "tg_scraper.delay_per_channel", 5.0)
- if opts.TestRun != nil && opts.TestRun.MessageLimit > 0 {
- msgLimit = opts.TestRun.MessageLimit
- }
- // 取 pending 频道
- var channels []model.Channel
- q := p.db.Where("status = ?", "pending")
- if opts.TestRun != nil && opts.TestRun.ItemLimit > 0 {
- q = q.Limit(opts.TestRun.ItemLimit)
- }
- q.Find(&channels)
- total := len(channels)
- log.Printf("[scrape] found %d pending channels", total)
- for i, ch := range channels {
- if isContextDone(ctx) {
- break
- }
- if p.reporter != nil {
- p.reporter("scrape", i+1, total, "采集频道: @"+ch.Username)
- }
- acc, err := p.tgManager.Acquire(ctx)
- if err != nil {
- log.Printf("[scrape] no available account: %v", err)
- break
- }
- if err := acc.Client.Connect(ctx); err != nil {
- log.Printf("[scrape] connect failed: %v", err)
- p.tgManager.Release(acc, 0)
- p.db.Model(&ch).Update("status", "failed")
- continue
- }
- // LLM 相关性评估
- if p.llmClient != nil {
- score, err := p.llmClient.EvalChannelRelevance(ctx, ch.Title, ch.About, ch.MemberCount)
- if err == nil && score < 0.5 {
- log.Printf("[scrape] skipping @%s, relevance score=%.2f", ch.Username, score)
- p.tgManager.Release(acc, 0)
- p.db.Model(&ch).Update("status", "skipped")
- continue
- }
- }
- // 读置顶消息
- pinnedMsgs, _ := acc.Client.GetPinnedMessages(ctx, ch.Username)
- p.processMessages(ctx, pinnedMsgs, &ch, delayMsg)
- // 读历史消息(断点续传)
- offsetID := ch.LastMessageID
- fetched := 0
- for fetched < msgLimit {
- if isContextDone(ctx) {
- break
- }
- batchSize := 100
- if msgLimit-fetched < batchSize {
- batchSize = msgLimit - fetched
- }
- msgs, err := acc.Client.GetMessages(ctx, ch.Username, offsetID, batchSize)
- if err != nil {
- if fw, ok := err.(*telegram.FloodWaitError); ok {
- log.Printf("[scrape] FloodWait %ds on @%s", fw.Seconds, ch.Username)
- p.tgManager.HandleFloodWait(acc, fw.Seconds)
- acc = nil
- } else {
- log.Printf("[scrape] GetMessages @%s: %v", ch.Username, err)
- }
- break
- }
- if len(msgs) == 0 {
- break
- }
- p.processMessages(ctx, msgs, &ch, delayMsg)
- // 更新断点
- lastID := msgs[len(msgs)-1].ID
- p.db.Model(&ch).Update("last_message_id", lastID)
- offsetID = lastID
- fetched += len(msgs)
- }
- if acc != nil {
- p.tgManager.Release(acc, 0)
- }
- p.db.Model(&ch).Update("status", "scraped")
- select {
- case <-ctx.Done():
- return nil
- case <-time.After(time.Duration(float64(time.Second) * delayChannel)):
- }
- }
- log.Printf("[scrape] done")
- return nil
- }
- // processMessages 处理一批消息,提取商户写入 merchants_raw
- func (p *ScrapePhase) processMessages(ctx context.Context, msgs []telegram.Message, ch *model.Channel, delayMsg float64) {
- for _, msg := range msgs {
- if msg.IsService || msg.Text == "" {
- continue
- }
- if !extractor.ContainsChinese(msg.Text, 0) {
- continue
- }
- if !extractor.HasContact(msg.Text) {
- continue
- }
- // 快速去重(Redis SET NX key)
- if p.redis != nil {
- info := extractor.Extract(msg.Text)
- if info.TgUsername != "" {
- dedupKey := "spider:dedup:merchant:" + info.TgUsername
- set, _ := p.redis.SetNX(ctx, dedupKey, "1", 7*24*time.Hour).Result()
- if !set {
- continue // 已存在,跳过
- }
- }
- }
- // LLM 精准解析
- var merchantInfo *extractor.MerchantInfo
- if p.llmClient != nil {
- merchantInfo, _ = p.llmClient.ParseMerchant(ctx, msg.Text)
- }
- // Fallback 到正则
- if merchantInfo == nil || merchantInfo.TgUsername == "" {
- info := extractor.Extract(msg.Text)
- merchantInfo = &extractor.MerchantInfo{
- TgUsername: info.TgUsername,
- Website: info.Website,
- Email: info.Email,
- Phone: info.Phone,
- }
- }
- if merchantInfo.TgUsername == "" && merchantInfo.Website == "" {
- continue
- }
- raw := &model.MerchantRaw{
- MerchantName: extractor.CleanMerchantName(merchantInfo.MerchantName),
- TgUsername: strings.TrimPrefix(merchantInfo.TgUsername, "@"),
- Website: merchantInfo.Website,
- Email: merchantInfo.Email,
- Phone: merchantInfo.Phone,
- Industry: merchantInfo.Industry,
- SourceType: "tg_scrape",
- SourceID: ch.Username,
- OriginalMessage: msg.Text,
- Status: "raw",
- }
- p.db.Create(raw)
- time.Sleep(time.Duration(float64(time.Second) * delayMsg))
- }
- }
|