package processor import ( "context" "log" "spider/internal/model" "spider/internal/store" "time" "gorm.io/datatypes" ) // ProgressFn reports processing progress. type ProgressFn func(step string, current, total int, message string) // Processor runs the 4-step cleaning pipeline. type Processor struct { store *store.Store checker *TMeChecker onProgress ProgressFn } // NewProcessor creates a new Processor. func NewProcessor(s *store.Store) *Processor { return &Processor{ store: s, checker: NewTMeChecker(), } } // SetProgressFn sets the progress callback. func (p *Processor) SetProgressFn(fn ProgressFn) { p.onProgress = fn } func (p *Processor) report(step string, current, total int, msg string) { if p.onProgress != nil { p.onProgress(step, current, total, msg) } } // ProcessResult summarizes a processor run. type ProcessResult struct { InputCount int AliveCount int PassedCount int DedupedCount int OutputCount int HotCount int WarmCount int ColdCount int } // Process runs the 4-step pipeline on raw merchants with status="raw". func (p *Processor) Process(ctx context.Context) (*ProcessResult, error) { raws, err := p.store.ListRawByStatus("raw", 0) if err != nil { return nil, err } result := &ProcessResult{InputCount: len(raws)} log.Printf("[processor] processing %d raw merchants", len(raws)) if len(raws) == 0 { return result, nil } // Step 1: t.me dead account check p.report("tmechecker", 0, len(raws), "开始死号预检...") alive, dead := p.checker.Filter(ctx, raws) result.AliveCount = len(alive) log.Printf("[processor] step1 tmechecker: %d alive, %d dead", len(alive), len(dead)) p.report("tmechecker", len(raws), len(raws), "死号预检完成") // Mark dead ones for _, d := range dead { p.saveClean(d, "invalid", "", "Cold", nil, 1) p.store.UpdateRawStatus(d.ID, "done") } if ctx.Err() != nil { return result, ctx.Err() } // Step 2: Blacklist filter p.report("blacklist", 0, len(alive), "黑名单过滤...") blResult := FilterBlacklist(alive) result.PassedCount = len(blResult.Passed) log.Printf("[processor] step2 blacklist: %d passed, %d blocked", len(blResult.Passed), len(blResult.Blocked)) p.report("blacklist", len(alive), len(alive), "黑名单完成") for _, b := range blResult.Blocked { p.saveClean(b.Raw, b.Status, "", "Cold", nil, 1) p.store.UpdateRawStatus(b.Raw.ID, "done") } if ctx.Err() != nil { return result, ctx.Err() } // Step 3: Dedup + merge sources p.report("dedup", 0, len(blResult.Passed), "去重合并...") dedupResult := Deduplicate(blResult.Passed) result.DedupedCount = len(dedupResult.Keepers) log.Printf("[processor] step3 dedup: %d keepers, %d duplicates", len(dedupResult.Keepers), len(dedupResult.Duplicates)) p.report("dedup", len(blResult.Passed), len(blResult.Passed), "去重完成") for _, dup := range dedupResult.Duplicates { p.saveClean(dup, "duplicate", "", "Cold", nil, 1) p.store.UpdateRawStatus(dup.ID, "done") } if ctx.Err() != nil { return result, ctx.Err() } // Step 4: Tag + grade p.report("tagger", 0, len(dedupResult.Keepers), "打标签分等级...") tagged := TagAndGrade(dedupResult.Keepers) log.Printf("[processor] step4 tagger: %d tagged", len(tagged)) for _, t := range tagged { switch t.Level { case "Hot": result.HotCount++ case "Warm": result.WarmCount++ case "Cold": result.ColdCount++ } sources := MarshalSources(t.Merged.AllSources) p.saveClean(t.Merged.Best, "valid", t.IndustryTag, t.Level, sources, t.Merged.SourceCount) p.store.UpdateRawStatus(t.Merged.Best.ID, "done") } result.OutputCount = len(tagged) p.report("tagger", len(tagged), len(tagged), "分级完成") log.Printf("[processor] done: Hot=%d, Warm=%d, Cold=%d", result.HotCount, result.WarmCount, result.ColdCount) return result, nil } func (p *Processor) saveClean(raw model.MerchantRaw, status, industryTag, level string, allSources []byte, sourceCount int) { now := time.Now() tgLink := raw.TgLink if tgLink == "" && raw.TgUsername != "" { tgLink = "https://t.me/" + raw.TgUsername } sourcesJSON := datatypes.JSON([]byte("[]")) if allSources != nil { sourcesJSON = datatypes.JSON(allSources) } clean := &model.MerchantClean{ TgUsername: raw.TgUsername, TgLink: tgLink, MerchantName: raw.MerchantName, Website: raw.Website, Email: raw.Email, Phone: raw.Phone, IndustryTag: industryTag, Level: level, Status: status, IsAlive: status == "valid", LastCheckedAt: &now, SourceCount: sourceCount, AllSources: sourcesJSON, } p.store.SaveClean(clean) }