pipeline.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. package processor
  2. import (
  3. "context"
  4. "fmt"
  5. "log"
  6. "spider/internal/model"
  7. "spider/internal/plugin"
  8. "spider/internal/store"
  9. "time"
  10. "gorm.io/datatypes"
  11. )
  12. // ProgressFn reports processing progress.
  13. type ProgressFn func(step string, current, total int, message string)
  14. // Processor runs the 4-step cleaning pipeline.
  15. type Processor struct {
  16. store *store.Store
  17. checker *TMeChecker
  18. onProgress ProgressFn
  19. logger plugin.TaskLogger
  20. }
  21. // NewProcessor creates a new Processor.
  22. func NewProcessor(s *store.Store) *Processor {
  23. return &Processor{
  24. store: s,
  25. checker: NewTMeChecker(),
  26. logger: plugin.NopLogger(),
  27. }
  28. }
  29. // SetProgressFn sets the progress callback.
  30. func (p *Processor) SetProgressFn(fn ProgressFn) {
  31. p.onProgress = fn
  32. }
  33. // SetLogger sets the detail logger for cleaning pipeline audit.
  34. func (p *Processor) SetLogger(l plugin.TaskLogger) {
  35. p.logger = l
  36. }
  37. func (p *Processor) report(step string, current, total int, msg string) {
  38. if p.onProgress != nil {
  39. p.onProgress(step, current, total, msg)
  40. }
  41. }
  42. // ProcessResult summarizes a processor run.
  43. type ProcessResult struct {
  44. InputCount int
  45. AliveCount int
  46. PassedCount int
  47. DedupedCount int
  48. OutputCount int
  49. HotCount int
  50. WarmCount int
  51. ColdCount int
  52. }
  53. // Process runs the 4-step pipeline on raw merchants with status="raw".
  54. // Processes in batches to avoid loading all records into memory.
  55. func (p *Processor) Process(ctx context.Context) (*ProcessResult, error) {
  56. const batchSize = 2000
  57. totalResult := &ProcessResult{}
  58. for {
  59. if ctx.Err() != nil {
  60. return totalResult, ctx.Err()
  61. }
  62. raws, err := p.store.ListRawByStatus("raw", batchSize)
  63. if err != nil {
  64. return totalResult, err
  65. }
  66. if len(raws) == 0 {
  67. break
  68. }
  69. result, err := p.processBatch(ctx, raws)
  70. if err != nil {
  71. return totalResult, err
  72. }
  73. // Merge results
  74. totalResult.InputCount += result.InputCount
  75. totalResult.AliveCount += result.AliveCount
  76. totalResult.PassedCount += result.PassedCount
  77. totalResult.DedupedCount += result.DedupedCount
  78. totalResult.OutputCount += result.OutputCount
  79. totalResult.HotCount += result.HotCount
  80. totalResult.WarmCount += result.WarmCount
  81. totalResult.ColdCount += result.ColdCount
  82. // If we got less than batchSize, no more records
  83. if len(raws) < batchSize {
  84. break
  85. }
  86. }
  87. log.Printf("[processor] done: input=%d, Hot=%d, Warm=%d, Cold=%d",
  88. totalResult.InputCount, totalResult.HotCount, totalResult.WarmCount, totalResult.ColdCount)
  89. return totalResult, nil
  90. }
  91. // processBatch runs the 4-step pipeline on a batch of raw merchants.
  92. func (p *Processor) processBatch(ctx context.Context, raws []model.MerchantRaw) (*ProcessResult, error) {
  93. result := &ProcessResult{InputCount: len(raws)}
  94. log.Printf("[processor] processing batch of %d raw merchants", len(raws))
  95. // Step 1: t.me dead account check
  96. p.report("tmechecker", 0, len(raws), "开始死号预检...")
  97. alive, dead := p.checker.Filter(ctx, raws)
  98. result.AliveCount = len(alive)
  99. log.Printf("[processor] step1 tmechecker: %d alive, %d dead", len(alive), len(dead))
  100. p.report("tmechecker", len(raws), len(raws), "死号预检完成")
  101. // Mark dead ones (batch)
  102. var deadIDs []uint
  103. for _, d := range dead {
  104. p.saveClean(d, "invalid", "", "Cold", nil, 1)
  105. deadIDs = append(deadIDs, d.ID)
  106. p.logger.LogCleanStep(d.TgUsername, "tmechecker", "dead", "t.me page not found or no profile")
  107. }
  108. for _, a := range alive {
  109. p.logger.LogCleanStep(a.TgUsername, "tmechecker", "alive", "")
  110. }
  111. p.store.BatchUpdateRawStatus(deadIDs, "done")
  112. if ctx.Err() != nil {
  113. return result, ctx.Err()
  114. }
  115. // Step 2: Blacklist filter
  116. p.report("blacklist", 0, len(alive), "黑名单过滤...")
  117. blResult := FilterBlacklist(alive)
  118. result.PassedCount = len(blResult.Passed)
  119. log.Printf("[processor] step2 blacklist: %d passed, %d blocked", len(blResult.Passed), len(blResult.Blocked))
  120. p.report("blacklist", len(alive), len(alive), "黑名单完成")
  121. var blockedIDs []uint
  122. for _, b := range blResult.Blocked {
  123. p.saveClean(b.Raw, b.Status, "", "Cold", nil, 1)
  124. blockedIDs = append(blockedIDs, b.Raw.ID)
  125. p.logger.LogCleanStep(b.Raw.TgUsername, "blacklist", b.Status, "blacklist rule matched")
  126. }
  127. p.store.BatchUpdateRawStatus(blockedIDs, "done")
  128. if ctx.Err() != nil {
  129. return result, ctx.Err()
  130. }
  131. // Step 3: Dedup + merge sources
  132. p.report("dedup", 0, len(blResult.Passed), "去重合并...")
  133. dedupResult := Deduplicate(blResult.Passed)
  134. result.DedupedCount = len(dedupResult.Keepers)
  135. log.Printf("[processor] step3 dedup: %d keepers, %d duplicates", len(dedupResult.Keepers), len(dedupResult.Duplicates))
  136. p.report("dedup", len(blResult.Passed), len(blResult.Passed), "去重完成")
  137. var dupIDs []uint
  138. for _, dup := range dedupResult.Duplicates {
  139. p.saveClean(dup, "duplicate", "", "Cold", nil, 1)
  140. dupIDs = append(dupIDs, dup.ID)
  141. p.logger.LogCleanStep(dup.TgUsername, "dedup", "duplicate", "merged into keeper")
  142. }
  143. for _, k := range dedupResult.Keepers {
  144. p.logger.LogCleanStep(k.Best.TgUsername, "dedup", "keeper", fmt.Sprintf("source_count=%d", k.SourceCount))
  145. }
  146. p.store.BatchUpdateRawStatus(dupIDs, "done")
  147. if ctx.Err() != nil {
  148. return result, ctx.Err()
  149. }
  150. // Step 4: Tag + grade (load rules from DB)
  151. p.report("tagger", 0, len(dedupResult.Keepers), "打标签分等级...")
  152. gradingCfg, _ := p.store.GetGradingConfig()
  153. tagged := TagAndGradeWithConfig(dedupResult.Keepers, gradingCfg)
  154. log.Printf("[processor] step4 tagger: %d tagged", len(tagged))
  155. var keeperIDs []uint
  156. for _, t := range tagged {
  157. switch t.Level {
  158. case "Hot":
  159. result.HotCount++
  160. case "Warm":
  161. result.WarmCount++
  162. case "Cold":
  163. result.ColdCount++
  164. }
  165. sources := MarshalSources(t.Merged.AllSources)
  166. p.saveClean(t.Merged.Best, "valid", t.IndustryTag, t.Level, sources, t.Merged.SourceCount)
  167. keeperIDs = append(keeperIDs, t.Merged.Best.ID)
  168. p.logger.LogCleanStep(t.Merged.Best.TgUsername, "tagger", t.Level, fmt.Sprintf("industry=%s", t.IndustryTag))
  169. }
  170. p.store.BatchUpdateRawStatus(keeperIDs, "done")
  171. result.OutputCount = len(tagged)
  172. p.report("tagger", len(tagged), len(tagged), "分级完成")
  173. return result, nil
  174. }
  175. func (p *Processor) saveClean(raw model.MerchantRaw, status, industryTag, level string, allSources []byte, sourceCount int) {
  176. now := time.Now()
  177. tgLink := raw.TgLink
  178. if tgLink == "" && raw.TgUsername != "" {
  179. tgLink = "https://t.me/" + raw.TgUsername
  180. }
  181. sourcesJSON := datatypes.JSON([]byte("[]"))
  182. if allSources != nil {
  183. sourcesJSON = datatypes.JSON(allSources)
  184. }
  185. clean := &model.MerchantClean{
  186. TgUsername: raw.TgUsername,
  187. TgLink: tgLink,
  188. MerchantName: raw.MerchantName,
  189. Website: raw.Website,
  190. Email: raw.Email,
  191. Phone: raw.Phone,
  192. IndustryTag: industryTag,
  193. Level: level,
  194. Status: status,
  195. IsAlive: status == "valid",
  196. LastCheckedAt: &now,
  197. SourceCount: sourceCount,
  198. AllSources: sourcesJSON,
  199. }
  200. p.store.SaveClean(clean)
  201. }