| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238 |
- package processor
- import (
- "context"
- "fmt"
- "log"
- "spider/internal/model"
- "spider/internal/plugin"
- "spider/internal/store"
- "time"
- "gorm.io/datatypes"
- )
- // ProgressFn reports processing progress.
- type ProgressFn func(step string, current, total int, message string)
- // Processor runs the 4-step cleaning pipeline.
- type Processor struct {
- store *store.Store
- checker *TMeChecker
- onProgress ProgressFn
- logger plugin.TaskLogger
- }
- // NewProcessor creates a new Processor.
- func NewProcessor(s *store.Store) *Processor {
- return &Processor{
- store: s,
- checker: NewTMeChecker(),
- logger: plugin.NopLogger(),
- }
- }
- // SetProgressFn sets the progress callback.
- func (p *Processor) SetProgressFn(fn ProgressFn) {
- p.onProgress = fn
- }
- // SetLogger sets the detail logger for cleaning pipeline audit.
- func (p *Processor) SetLogger(l plugin.TaskLogger) {
- p.logger = l
- }
- func (p *Processor) report(step string, current, total int, msg string) {
- if p.onProgress != nil {
- p.onProgress(step, current, total, msg)
- }
- }
- // ProcessResult summarizes a processor run.
- type ProcessResult struct {
- InputCount int
- AliveCount int
- PassedCount int
- DedupedCount int
- OutputCount int
- HotCount int
- WarmCount int
- ColdCount int
- }
- // Process runs the 4-step pipeline on raw merchants with status="raw".
- // Processes in batches to avoid loading all records into memory.
- func (p *Processor) Process(ctx context.Context) (*ProcessResult, error) {
- const batchSize = 2000
- totalResult := &ProcessResult{}
- for {
- if ctx.Err() != nil {
- return totalResult, ctx.Err()
- }
- raws, err := p.store.ListRawByStatus("raw", batchSize)
- if err != nil {
- return totalResult, err
- }
- if len(raws) == 0 {
- break
- }
- result, err := p.processBatch(ctx, raws)
- if err != nil {
- return totalResult, err
- }
- // Merge results
- totalResult.InputCount += result.InputCount
- totalResult.AliveCount += result.AliveCount
- totalResult.PassedCount += result.PassedCount
- totalResult.DedupedCount += result.DedupedCount
- totalResult.OutputCount += result.OutputCount
- totalResult.HotCount += result.HotCount
- totalResult.WarmCount += result.WarmCount
- totalResult.ColdCount += result.ColdCount
- // If we got less than batchSize, no more records
- if len(raws) < batchSize {
- break
- }
- }
- log.Printf("[processor] done: input=%d, Hot=%d, Warm=%d, Cold=%d",
- totalResult.InputCount, totalResult.HotCount, totalResult.WarmCount, totalResult.ColdCount)
- return totalResult, nil
- }
- // processBatch runs the 4-step pipeline on a batch of raw merchants.
- func (p *Processor) processBatch(ctx context.Context, raws []model.MerchantRaw) (*ProcessResult, error) {
- result := &ProcessResult{InputCount: len(raws)}
- log.Printf("[processor] processing batch of %d raw merchants", len(raws))
- // Step 1: t.me dead account check
- p.report("tmechecker", 0, len(raws), "开始死号预检...")
- alive, dead := p.checker.Filter(ctx, raws)
- result.AliveCount = len(alive)
- log.Printf("[processor] step1 tmechecker: %d alive, %d dead", len(alive), len(dead))
- p.report("tmechecker", len(raws), len(raws), "死号预检完成")
- // Mark dead ones (batch)
- var deadIDs []uint
- for _, d := range dead {
- p.saveClean(d, "invalid", "", "Cold", nil, 1)
- deadIDs = append(deadIDs, d.ID)
- p.logger.LogCleanStep(d.TgUsername, "tmechecker", "dead", "t.me page not found or no profile")
- }
- for _, a := range alive {
- p.logger.LogCleanStep(a.TgUsername, "tmechecker", "alive", "")
- }
- p.store.BatchUpdateRawStatus(deadIDs, "done")
- if ctx.Err() != nil {
- return result, ctx.Err()
- }
- // Step 2: Blacklist filter
- p.report("blacklist", 0, len(alive), "黑名单过滤...")
- blResult := FilterBlacklist(alive)
- result.PassedCount = len(blResult.Passed)
- log.Printf("[processor] step2 blacklist: %d passed, %d blocked", len(blResult.Passed), len(blResult.Blocked))
- p.report("blacklist", len(alive), len(alive), "黑名单完成")
- var blockedIDs []uint
- for _, b := range blResult.Blocked {
- p.saveClean(b.Raw, b.Status, "", "Cold", nil, 1)
- blockedIDs = append(blockedIDs, b.Raw.ID)
- p.logger.LogCleanStep(b.Raw.TgUsername, "blacklist", b.Status, "blacklist rule matched")
- }
- p.store.BatchUpdateRawStatus(blockedIDs, "done")
- if ctx.Err() != nil {
- return result, ctx.Err()
- }
- // Step 3: Dedup + merge sources
- p.report("dedup", 0, len(blResult.Passed), "去重合并...")
- dedupResult := Deduplicate(blResult.Passed)
- result.DedupedCount = len(dedupResult.Keepers)
- log.Printf("[processor] step3 dedup: %d keepers, %d duplicates", len(dedupResult.Keepers), len(dedupResult.Duplicates))
- p.report("dedup", len(blResult.Passed), len(blResult.Passed), "去重完成")
- var dupIDs []uint
- for _, dup := range dedupResult.Duplicates {
- p.saveClean(dup, "duplicate", "", "Cold", nil, 1)
- dupIDs = append(dupIDs, dup.ID)
- p.logger.LogCleanStep(dup.TgUsername, "dedup", "duplicate", "merged into keeper")
- }
- for _, k := range dedupResult.Keepers {
- p.logger.LogCleanStep(k.Best.TgUsername, "dedup", "keeper", fmt.Sprintf("source_count=%d", k.SourceCount))
- }
- p.store.BatchUpdateRawStatus(dupIDs, "done")
- if ctx.Err() != nil {
- return result, ctx.Err()
- }
- // Step 4: Tag + grade (load rules from DB)
- p.report("tagger", 0, len(dedupResult.Keepers), "打标签分等级...")
- gradingCfg, _ := p.store.GetGradingConfig()
- tagged := TagAndGradeWithConfig(dedupResult.Keepers, gradingCfg)
- log.Printf("[processor] step4 tagger: %d tagged", len(tagged))
- var keeperIDs []uint
- for _, t := range tagged {
- switch t.Level {
- case "Hot":
- result.HotCount++
- case "Warm":
- result.WarmCount++
- case "Cold":
- result.ColdCount++
- }
- sources := MarshalSources(t.Merged.AllSources)
- p.saveClean(t.Merged.Best, "valid", t.IndustryTag, t.Level, sources, t.Merged.SourceCount)
- keeperIDs = append(keeperIDs, t.Merged.Best.ID)
- p.logger.LogCleanStep(t.Merged.Best.TgUsername, "tagger", t.Level, fmt.Sprintf("industry=%s", t.IndustryTag))
- }
- p.store.BatchUpdateRawStatus(keeperIDs, "done")
- result.OutputCount = len(tagged)
- p.report("tagger", len(tagged), len(tagged), "分级完成")
- return result, nil
- }
- func (p *Processor) saveClean(raw model.MerchantRaw, status, industryTag, level string, allSources []byte, sourceCount int) {
- now := time.Now()
- tgLink := raw.TgLink
- if tgLink == "" && raw.TgUsername != "" {
- tgLink = "https://t.me/" + raw.TgUsername
- }
- sourcesJSON := datatypes.JSON([]byte("[]"))
- if allSources != nil {
- sourcesJSON = datatypes.JSON(allSources)
- }
- clean := &model.MerchantClean{
- TgUsername: raw.TgUsername,
- TgLink: tgLink,
- MerchantName: raw.MerchantName,
- Website: raw.Website,
- Email: raw.Email,
- Phone: raw.Phone,
- IndustryTag: industryTag,
- Level: level,
- Status: status,
- IsAlive: status == "valid",
- LastCheckedAt: &now,
- SourceCount: sourceCount,
- AllSources: sourcesJSON,
- }
- p.store.SaveClean(clean)
- }
|