pipeline.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. package processor
  2. import (
  3. "context"
  4. "log"
  5. "spider/internal/model"
  6. "spider/internal/store"
  7. "time"
  8. "gorm.io/datatypes"
  9. )
  10. // ProgressFn reports processing progress.
  11. type ProgressFn func(step string, current, total int, message string)
  12. // Processor runs the 4-step cleaning pipeline.
  13. type Processor struct {
  14. store *store.Store
  15. checker *TMeChecker
  16. onProgress ProgressFn
  17. }
  18. // NewProcessor creates a new Processor.
  19. func NewProcessor(s *store.Store) *Processor {
  20. return &Processor{
  21. store: s,
  22. checker: NewTMeChecker(),
  23. }
  24. }
  25. // SetProgressFn sets the progress callback.
  26. func (p *Processor) SetProgressFn(fn ProgressFn) {
  27. p.onProgress = fn
  28. }
  29. func (p *Processor) report(step string, current, total int, msg string) {
  30. if p.onProgress != nil {
  31. p.onProgress(step, current, total, msg)
  32. }
  33. }
  34. // ProcessResult summarizes a processor run.
  35. type ProcessResult struct {
  36. InputCount int
  37. AliveCount int
  38. PassedCount int
  39. DedupedCount int
  40. OutputCount int
  41. HotCount int
  42. WarmCount int
  43. ColdCount int
  44. }
  45. // Process runs the 4-step pipeline on raw merchants with status="raw".
  46. func (p *Processor) Process(ctx context.Context) (*ProcessResult, error) {
  47. raws, err := p.store.ListRawByStatus("raw", 0)
  48. if err != nil {
  49. return nil, err
  50. }
  51. result := &ProcessResult{InputCount: len(raws)}
  52. log.Printf("[processor] processing %d raw merchants", len(raws))
  53. if len(raws) == 0 {
  54. return result, nil
  55. }
  56. // Step 1: t.me dead account check
  57. p.report("tmechecker", 0, len(raws), "开始死号预检...")
  58. alive, dead := p.checker.Filter(ctx, raws)
  59. result.AliveCount = len(alive)
  60. log.Printf("[processor] step1 tmechecker: %d alive, %d dead", len(alive), len(dead))
  61. p.report("tmechecker", len(raws), len(raws), "死号预检完成")
  62. // Mark dead ones
  63. for _, d := range dead {
  64. p.saveClean(d, "invalid", "", "Cold", nil, 1)
  65. p.store.UpdateRawStatus(d.ID, "done")
  66. }
  67. if ctx.Err() != nil {
  68. return result, ctx.Err()
  69. }
  70. // Step 2: Blacklist filter
  71. p.report("blacklist", 0, len(alive), "黑名单过滤...")
  72. blResult := FilterBlacklist(alive)
  73. result.PassedCount = len(blResult.Passed)
  74. log.Printf("[processor] step2 blacklist: %d passed, %d blocked", len(blResult.Passed), len(blResult.Blocked))
  75. p.report("blacklist", len(alive), len(alive), "黑名单完成")
  76. for _, b := range blResult.Blocked {
  77. p.saveClean(b.Raw, b.Status, "", "Cold", nil, 1)
  78. p.store.UpdateRawStatus(b.Raw.ID, "done")
  79. }
  80. if ctx.Err() != nil {
  81. return result, ctx.Err()
  82. }
  83. // Step 3: Dedup + merge sources
  84. p.report("dedup", 0, len(blResult.Passed), "去重合并...")
  85. dedupResult := Deduplicate(blResult.Passed)
  86. result.DedupedCount = len(dedupResult.Keepers)
  87. log.Printf("[processor] step3 dedup: %d keepers, %d duplicates", len(dedupResult.Keepers), len(dedupResult.Duplicates))
  88. p.report("dedup", len(blResult.Passed), len(blResult.Passed), "去重完成")
  89. for _, dup := range dedupResult.Duplicates {
  90. p.saveClean(dup, "duplicate", "", "Cold", nil, 1)
  91. p.store.UpdateRawStatus(dup.ID, "done")
  92. }
  93. if ctx.Err() != nil {
  94. return result, ctx.Err()
  95. }
  96. // Step 4: Tag + grade
  97. p.report("tagger", 0, len(dedupResult.Keepers), "打标签分等级...")
  98. tagged := TagAndGrade(dedupResult.Keepers)
  99. log.Printf("[processor] step4 tagger: %d tagged", len(tagged))
  100. for _, t := range tagged {
  101. switch t.Level {
  102. case "Hot":
  103. result.HotCount++
  104. case "Warm":
  105. result.WarmCount++
  106. case "Cold":
  107. result.ColdCount++
  108. }
  109. sources := MarshalSources(t.Merged.AllSources)
  110. p.saveClean(t.Merged.Best, "valid", t.IndustryTag, t.Level, sources, t.Merged.SourceCount)
  111. p.store.UpdateRawStatus(t.Merged.Best.ID, "done")
  112. }
  113. result.OutputCount = len(tagged)
  114. p.report("tagger", len(tagged), len(tagged), "分级完成")
  115. log.Printf("[processor] done: Hot=%d, Warm=%d, Cold=%d", result.HotCount, result.WarmCount, result.ColdCount)
  116. return result, nil
  117. }
  118. func (p *Processor) saveClean(raw model.MerchantRaw, status, industryTag, level string, allSources []byte, sourceCount int) {
  119. now := time.Now()
  120. tgLink := raw.TgLink
  121. if tgLink == "" && raw.TgUsername != "" {
  122. tgLink = "https://t.me/" + raw.TgUsername
  123. }
  124. sourcesJSON := datatypes.JSON([]byte("[]"))
  125. if allSources != nil {
  126. sourcesJSON = datatypes.JSON(allSources)
  127. }
  128. clean := &model.MerchantClean{
  129. TgUsername: raw.TgUsername,
  130. TgLink: tgLink,
  131. MerchantName: raw.MerchantName,
  132. Website: raw.Website,
  133. Email: raw.Email,
  134. Phone: raw.Phone,
  135. IndustryTag: industryTag,
  136. Level: level,
  137. Status: status,
  138. IsAlive: status == "valid",
  139. LastCheckedAt: &now,
  140. SourceCount: sourceCount,
  141. AllSources: sourcesJSON,
  142. }
  143. p.store.SaveClean(clean)
  144. }