tmechecker.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. package processor
  2. import (
  3. "context"
  4. "log"
  5. "spider/internal/crawler"
  6. "spider/internal/model"
  7. "sync"
  8. )
  9. // TMeChecker filters dead TG accounts via HTTP t.me page scraping.
  10. // Uses a concurrent worker pool for performance.
  11. type TMeChecker struct {
  12. validator *crawler.TMeValidator
  13. concurrency int
  14. }
  15. // NewTMeChecker creates a new TMeChecker with default concurrency.
  16. func NewTMeChecker() *TMeChecker {
  17. return &TMeChecker{
  18. validator: crawler.NewTMeValidator(),
  19. concurrency: 10,
  20. }
  21. }
  22. // checkedMerchant pairs a raw merchant with its alive status.
  23. type checkedMerchant struct {
  24. raw model.MerchantRaw
  25. alive bool
  26. }
  27. // Filter checks each raw merchant's tg_username against t.me concurrently.
  28. // Returns alive merchants; dead ones are returned separately.
  29. func (c *TMeChecker) Filter(ctx context.Context, raws []model.MerchantRaw) (alive []model.MerchantRaw, dead []model.MerchantRaw) {
  30. if len(raws) == 0 {
  31. return nil, nil
  32. }
  33. // For small batches, run sequentially
  34. if len(raws) <= 3 {
  35. return c.filterSequential(ctx, raws)
  36. }
  37. workers := c.concurrency
  38. if workers > len(raws) {
  39. workers = len(raws)
  40. }
  41. jobs := make(chan model.MerchantRaw, len(raws))
  42. results := make(chan checkedMerchant, len(raws))
  43. // Start workers
  44. var wg sync.WaitGroup
  45. for i := 0; i < workers; i++ {
  46. wg.Add(1)
  47. go func() {
  48. defer wg.Done()
  49. for raw := range jobs {
  50. if ctx.Err() != nil {
  51. results <- checkedMerchant{raw: raw, alive: true} // assume alive on cancel
  52. continue
  53. }
  54. if raw.TgUsername == "" {
  55. results <- checkedMerchant{raw: raw, alive: true}
  56. continue
  57. }
  58. isAlive := c.validator.IsAlive(ctx, raw.TgUsername)
  59. if !isAlive {
  60. log.Printf("[tmechecker] dead: @%s", raw.TgUsername)
  61. }
  62. results <- checkedMerchant{raw: raw, alive: isAlive}
  63. }
  64. }()
  65. }
  66. // Send jobs
  67. for _, raw := range raws {
  68. jobs <- raw
  69. }
  70. close(jobs)
  71. // Collect results in background
  72. go func() {
  73. wg.Wait()
  74. close(results)
  75. }()
  76. for r := range results {
  77. if r.alive {
  78. alive = append(alive, r.raw)
  79. } else {
  80. dead = append(dead, r.raw)
  81. }
  82. }
  83. return
  84. }
  85. // filterSequential is the simple path for small batches.
  86. func (c *TMeChecker) filterSequential(ctx context.Context, raws []model.MerchantRaw) (alive []model.MerchantRaw, dead []model.MerchantRaw) {
  87. for _, raw := range raws {
  88. if ctx.Err() != nil {
  89. break
  90. }
  91. if raw.TgUsername == "" {
  92. alive = append(alive, raw)
  93. continue
  94. }
  95. if c.validator.IsAlive(ctx, raw.TgUsername) {
  96. alive = append(alive, raw)
  97. } else {
  98. log.Printf("[tmechecker] dead: @%s", raw.TgUsername)
  99. dead = append(dead, raw)
  100. }
  101. }
  102. return
  103. }