| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173 |
- package processor
- import (
- "context"
- "log"
- "spider/internal/model"
- "spider/internal/store"
- "time"
- "gorm.io/datatypes"
- )
- // ProgressFn reports processing progress.
- type ProgressFn func(step string, current, total int, message string)
- // Processor runs the 4-step cleaning pipeline.
- type Processor struct {
- store *store.Store
- checker *TMeChecker
- onProgress ProgressFn
- }
- // NewProcessor creates a new Processor.
- func NewProcessor(s *store.Store) *Processor {
- return &Processor{
- store: s,
- checker: NewTMeChecker(),
- }
- }
- // SetProgressFn sets the progress callback.
- func (p *Processor) SetProgressFn(fn ProgressFn) {
- p.onProgress = fn
- }
- func (p *Processor) report(step string, current, total int, msg string) {
- if p.onProgress != nil {
- p.onProgress(step, current, total, msg)
- }
- }
- // ProcessResult summarizes a processor run.
- type ProcessResult struct {
- InputCount int
- AliveCount int
- PassedCount int
- DedupedCount int
- OutputCount int
- HotCount int
- WarmCount int
- ColdCount int
- }
- // Process runs the 4-step pipeline on raw merchants with status="raw".
- func (p *Processor) Process(ctx context.Context) (*ProcessResult, error) {
- raws, err := p.store.ListRawByStatus("raw", 0)
- if err != nil {
- return nil, err
- }
- result := &ProcessResult{InputCount: len(raws)}
- log.Printf("[processor] processing %d raw merchants", len(raws))
- if len(raws) == 0 {
- return result, nil
- }
- // Step 1: t.me dead account check
- p.report("tmechecker", 0, len(raws), "开始死号预检...")
- alive, dead := p.checker.Filter(ctx, raws)
- result.AliveCount = len(alive)
- log.Printf("[processor] step1 tmechecker: %d alive, %d dead", len(alive), len(dead))
- p.report("tmechecker", len(raws), len(raws), "死号预检完成")
- // Mark dead ones
- for _, d := range dead {
- p.saveClean(d, "invalid", "", "Cold", nil, 1)
- p.store.UpdateRawStatus(d.ID, "done")
- }
- if ctx.Err() != nil {
- return result, ctx.Err()
- }
- // Step 2: Blacklist filter
- p.report("blacklist", 0, len(alive), "黑名单过滤...")
- blResult := FilterBlacklist(alive)
- result.PassedCount = len(blResult.Passed)
- log.Printf("[processor] step2 blacklist: %d passed, %d blocked", len(blResult.Passed), len(blResult.Blocked))
- p.report("blacklist", len(alive), len(alive), "黑名单完成")
- for _, b := range blResult.Blocked {
- p.saveClean(b.Raw, b.Status, "", "Cold", nil, 1)
- p.store.UpdateRawStatus(b.Raw.ID, "done")
- }
- if ctx.Err() != nil {
- return result, ctx.Err()
- }
- // Step 3: Dedup + merge sources
- p.report("dedup", 0, len(blResult.Passed), "去重合并...")
- dedupResult := Deduplicate(blResult.Passed)
- result.DedupedCount = len(dedupResult.Keepers)
- log.Printf("[processor] step3 dedup: %d keepers, %d duplicates", len(dedupResult.Keepers), len(dedupResult.Duplicates))
- p.report("dedup", len(blResult.Passed), len(blResult.Passed), "去重完成")
- for _, dup := range dedupResult.Duplicates {
- p.saveClean(dup, "duplicate", "", "Cold", nil, 1)
- p.store.UpdateRawStatus(dup.ID, "done")
- }
- if ctx.Err() != nil {
- return result, ctx.Err()
- }
- // Step 4: Tag + grade
- p.report("tagger", 0, len(dedupResult.Keepers), "打标签分等级...")
- tagged := TagAndGrade(dedupResult.Keepers)
- log.Printf("[processor] step4 tagger: %d tagged", len(tagged))
- for _, t := range tagged {
- switch t.Level {
- case "Hot":
- result.HotCount++
- case "Warm":
- result.WarmCount++
- case "Cold":
- result.ColdCount++
- }
- sources := MarshalSources(t.Merged.AllSources)
- p.saveClean(t.Merged.Best, "valid", t.IndustryTag, t.Level, sources, t.Merged.SourceCount)
- p.store.UpdateRawStatus(t.Merged.Best.ID, "done")
- }
- result.OutputCount = len(tagged)
- p.report("tagger", len(tagged), len(tagged), "分级完成")
- log.Printf("[processor] done: Hot=%d, Warm=%d, Cold=%d", result.HotCount, result.WarmCount, result.ColdCount)
- return result, nil
- }
- func (p *Processor) saveClean(raw model.MerchantRaw, status, industryTag, level string, allSources []byte, sourceCount int) {
- now := time.Now()
- tgLink := raw.TgLink
- if tgLink == "" && raw.TgUsername != "" {
- tgLink = "https://t.me/" + raw.TgUsername
- }
- sourcesJSON := datatypes.JSON([]byte("[]"))
- if allSources != nil {
- sourcesJSON = datatypes.JSON(allSources)
- }
- clean := &model.MerchantClean{
- TgUsername: raw.TgUsername,
- TgLink: tgLink,
- MerchantName: raw.MerchantName,
- Website: raw.Website,
- Email: raw.Email,
- Phone: raw.Phone,
- IndustryTag: industryTag,
- Level: level,
- Status: status,
- IsAlive: status == "valid",
- LastCheckedAt: &now,
- SourceCount: sourceCount,
- AllSources: sourcesJSON,
- }
- p.store.SaveClean(clean)
- }
|