package pipeline import ( "context" "fmt" "log" "math" "regexp" "strings" "time" "gorm.io/datatypes" "gorm.io/gorm" "spider/internal/extractor" "spider/internal/model" "spider/internal/telegram" ) // CleanPhase Phase 6: 数据清洗 type CleanPhase struct { db *gorm.DB tgManager *telegram.AccountManager settings Settings reporter ProgressReporter } // NewCleanPhase creates a new CleanPhase. func NewCleanPhase(db *gorm.DB, tgManager *telegram.AccountManager, settings Settings) *CleanPhase { return &CleanPhase{ db: db, tgManager: tgManager, settings: settings, } } func (p *CleanPhase) Name() string { return "clean" } func (p *CleanPhase) SetReporter(r ProgressReporter) { p.reporter = r } func (p *CleanPhase) Run(ctx context.Context, task *model.Task, opts *Options) error { // 取所有 status=raw 的商户 var raws []model.MerchantRaw q := p.db.Where("status = ?", "raw") if opts.TestRun != nil && opts.TestRun.ItemLimit > 0 { q = q.Limit(opts.TestRun.ItemLimit) } q.Find(&raws) total := len(raws) log.Printf("[clean] processing %d raw merchants", total) // 第一关:黑名单过滤 var pass1 []model.MerchantRaw for _, raw := range raws { status := p.filterBlacklist(raw) if status != "" { p.saveCleaned(raw, status, nil) } else { pass1 = append(pass1, raw) } } if p.reporter != nil { p.reporter("clean", 1, 3, "第一关完成,剩余 "+itoa(len(pass1))+" 条") } // 第二关:去重 pass2 := p.deduplicate(pass1) if p.reporter != nil { p.reporter("clean", 2, 3, "第二关完成,去重后 "+itoa(len(pass2))+" 条") } // 第三关:TG 真实性验证(有独立 rate limiter) delayVerify := 3.0 if p.settings != nil { delayVerify = p.settings.GetFloat(ctx, "tg_scraper.delay_per_verify", 3.0) } for i, raw := range pass2 { if isContextDone(ctx) { break } if p.reporter != nil { p.reporter("clean", i+1, len(pass2), "验证: @"+raw.TgUsername) } if raw.TgUsername == "" { // 没有 TG 用户名但有其他联系方式,标记为 valid p.saveCleaned(raw, "valid", nil) continue } userInfo, err := p.verifyTG(ctx, raw.TgUsername) if err != nil { log.Printf("[clean] verify error for %s: %v", raw.TgUsername, err) continue } status := "invalid" if userInfo != nil { if userInfo.IsChannel { status = "group" } else if userInfo.IsBot { status = "bot" } else if userInfo.Exists { status = "valid" } } p.saveCleaned(raw, status, userInfo) // 独立 rate limiter select { case <-ctx.Done(): return nil case <-time.After(time.Duration(float64(time.Second) * delayVerify)): } } log.Printf("[clean] done") return nil } // filterBlacklist 第一关:黑名单过滤 // 返回应被标记的状态,"" 表示通过 func (p *CleanPhase) filterBlacklist(raw model.MerchantRaw) string { // 系统 bot 黑名单 botNames := []string{ "telegram", "telegramhints", "gif", "pic", "bing", "vid", "bold", "vote", "like", "sticker", "music", "channel_bot", "BotFather", "SpamBot", } username := strings.ToLower(raw.TgUsername) for _, b := range botNames { if username == strings.ToLower(b) { return "bot" } } // xxxbot 后缀 if strings.HasSuffix(username, "bot") && len(username) > 3 { return "bot" } // 邀请链接哈希(16-24位 base64) if len(raw.TgUsername) >= 16 && len(raw.TgUsername) <= 24 { reBase64 := regexp.MustCompile(`^[A-Za-z0-9_-]{16,24}$`) if reBase64.MatchString(raw.TgUsername) { // 计算熵:如果大写+小写+数字混合度高,认为是哈希 if entropy(raw.TgUsername) > 3.5 { return "invalid" } } } // original_message 非空且不含中文 if raw.OriginalMessage != "" && !extractor.ContainsChinese(raw.OriginalMessage, 0) { return "invalid" } return "" } // entropy 计算字符串的信息熵 func entropy(s string) float64 { freq := map[rune]int{} for _, r := range s { freq[r]++ } n := float64(len(s)) h := 0.0 for _, count := range freq { p := float64(count) / n h -= p * math.Log2(p) } return h } // deduplicate 第二关:去重 // 同 tg_username 保留信息最丰富的一条,其余标 duplicate func (p *CleanPhase) deduplicate(raws []model.MerchantRaw) []model.MerchantRaw { // 按 tg_username 分组 groups := map[string][]model.MerchantRaw{} for _, raw := range raws { key := raw.TgUsername if key == "" { key = raw.Website } if key == "" { key = raw.Email } if key == "" { key = itoa(int(raw.ID)) // 无法去重的保留 } groups[key] = append(groups[key], raw) } var keepers []model.MerchantRaw for _, group := range groups { if len(group) == 1 { keepers = append(keepers, group[0]) continue } // 按丰富度打分,保留最高分 best := group[0] bestScore := richness(best) for _, r := range group[1:] { s := richness(r) if s > bestScore { // 将被替换的标为 duplicate p.saveCleaned(best, "duplicate", nil) bestScore = s best = r } else { p.saveCleaned(r, "duplicate", nil) } } keepers = append(keepers, best) } return keepers } // richness 信息丰富度评分 func richness(r model.MerchantRaw) int { score := 0 if r.TgUsername != "" { score++ } if r.Website != "" { score++ } if r.Email != "" { score++ } if r.Phone != "" { score++ } if r.MerchantName != "" { score++ } return score } // verifyTG 调用 TG API 验证用户名 func (p *CleanPhase) verifyTG(ctx context.Context, username string) (*telegram.UserInfo, error) { if p.tgManager == nil { return nil, nil } acc, err := p.tgManager.Acquire(ctx) if err != nil { return nil, err } if err := acc.Client.Connect(ctx); err != nil { p.tgManager.Release(acc, 0) return nil, err } userInfo, err := acc.Client.VerifyUser(ctx, username) if err != nil { if fw, ok := err.(*telegram.FloodWaitError); ok { handleErr := p.tgManager.HandleFloodWait(acc, fw.Seconds) return nil, handleErr } p.tgManager.Release(acc, 0) return nil, err } p.tgManager.Release(acc, 0) return userInfo, nil } // saveCleaned 将原始商户写入 merchants_clean func (p *CleanPhase) saveCleaned(raw model.MerchantRaw, status string, userInfo *telegram.UserInfo) { clean := model.MerchantClean{ RawID: &raw.ID, MerchantName: raw.MerchantName, TgUsername: raw.TgUsername, Website: raw.Website, Email: raw.Email, Phone: raw.Phone, Industry: raw.Industry, Status: status, SourceCount: 1, SourceLinks: datatypes.JSON([]byte(`[]`)), } if userInfo != nil && userInfo.Exists { clean.TgFirstName = userInfo.FirstName clean.TgLastName = userInfo.LastName clean.IsPremium = userInfo.IsPremium clean.LastOnline = userInfo.LastOnline // 活跃度 if userInfo.LastOnline != nil { days := time.Since(*userInfo.LastOnline).Hours() / 24 if days < 3 { clean.ActiveLevel = "active" } else if days < 30 { clean.ActiveLevel = "moderate" } else { clean.ActiveLevel = "inactive" } } } // 冲突时按 tg_username unique 更新 if clean.TgUsername != "" { p.db.Where(model.MerchantClean{TgUsername: clean.TgUsername}).FirstOrCreate(&clean) } else { p.db.Create(&clean) } } // itoa converts int to string. func itoa(n int) string { return fmt.Sprintf("%d", n) }