| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117 |
- package processor
- import (
- "context"
- "log"
- "spider/internal/crawler"
- "spider/internal/model"
- "sync"
- )
- // TMeChecker filters dead TG accounts via HTTP t.me page scraping.
- // Uses a concurrent worker pool for performance.
- type TMeChecker struct {
- validator *crawler.TMeValidator
- concurrency int
- }
- // NewTMeChecker creates a new TMeChecker with default concurrency.
- func NewTMeChecker() *TMeChecker {
- return &TMeChecker{
- validator: crawler.NewTMeValidator(),
- concurrency: 10,
- }
- }
- // checkedMerchant pairs a raw merchant with its alive status.
- type checkedMerchant struct {
- raw model.MerchantRaw
- alive bool
- }
- // Filter checks each raw merchant's tg_username against t.me concurrently.
- // Returns alive merchants; dead ones are returned separately.
- func (c *TMeChecker) Filter(ctx context.Context, raws []model.MerchantRaw) (alive []model.MerchantRaw, dead []model.MerchantRaw) {
- if len(raws) == 0 {
- return nil, nil
- }
- // For small batches, run sequentially
- if len(raws) <= 3 {
- return c.filterSequential(ctx, raws)
- }
- workers := c.concurrency
- if workers > len(raws) {
- workers = len(raws)
- }
- jobs := make(chan model.MerchantRaw, len(raws))
- results := make(chan checkedMerchant, len(raws))
- // Start workers
- var wg sync.WaitGroup
- for i := 0; i < workers; i++ {
- wg.Add(1)
- go func() {
- defer wg.Done()
- for raw := range jobs {
- if ctx.Err() != nil {
- results <- checkedMerchant{raw: raw, alive: true} // assume alive on cancel
- continue
- }
- if raw.TgUsername == "" {
- results <- checkedMerchant{raw: raw, alive: true}
- continue
- }
- isAlive := c.validator.IsAlive(ctx, raw.TgUsername)
- if !isAlive {
- log.Printf("[tmechecker] dead: @%s", raw.TgUsername)
- }
- results <- checkedMerchant{raw: raw, alive: isAlive}
- }
- }()
- }
- // Send jobs
- for _, raw := range raws {
- jobs <- raw
- }
- close(jobs)
- // Collect results in background
- go func() {
- wg.Wait()
- close(results)
- }()
- for r := range results {
- if r.alive {
- alive = append(alive, r.raw)
- } else {
- dead = append(dead, r.raw)
- }
- }
- return
- }
- // filterSequential is the simple path for small batches.
- func (c *TMeChecker) filterSequential(ctx context.Context, raws []model.MerchantRaw) (alive []model.MerchantRaw, dead []model.MerchantRaw) {
- for _, raw := range raws {
- if ctx.Err() != nil {
- break
- }
- if raw.TgUsername == "" {
- alive = append(alive, raw)
- continue
- }
- if c.validator.IsAlive(ctx, raw.TgUsername) {
- alive = append(alive, raw)
- } else {
- log.Printf("[tmechecker] dead: @%s", raw.TgUsername)
- dead = append(dead, raw)
- }
- }
- return
- }
|