phase6_clean.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322
  1. package pipeline
  2. import (
  3. "context"
  4. "fmt"
  5. "log"
  6. "math"
  7. "regexp"
  8. "strings"
  9. "time"
  10. "gorm.io/datatypes"
  11. "gorm.io/gorm"
  12. "spider/internal/extractor"
  13. "spider/internal/model"
  14. "spider/internal/telegram"
  15. )
  16. // CleanPhase Phase 6: 数据清洗
  17. type CleanPhase struct {
  18. db *gorm.DB
  19. tgManager *telegram.AccountManager
  20. settings Settings
  21. reporter ProgressReporter
  22. }
  23. // NewCleanPhase creates a new CleanPhase.
  24. func NewCleanPhase(db *gorm.DB, tgManager *telegram.AccountManager, settings Settings) *CleanPhase {
  25. return &CleanPhase{
  26. db: db,
  27. tgManager: tgManager,
  28. settings: settings,
  29. }
  30. }
  31. func (p *CleanPhase) Name() string { return "clean" }
  32. func (p *CleanPhase) SetReporter(r ProgressReporter) { p.reporter = r }
  33. func (p *CleanPhase) Run(ctx context.Context, task *model.Task, opts *Options) error {
  34. // 取所有 status=raw 的商户
  35. var raws []model.MerchantRaw
  36. q := p.db.Where("status = ?", "raw")
  37. if opts.TestRun != nil && opts.TestRun.ItemLimit > 0 {
  38. q = q.Limit(opts.TestRun.ItemLimit)
  39. }
  40. q.Find(&raws)
  41. total := len(raws)
  42. log.Printf("[clean] processing %d raw merchants", total)
  43. // 第一关:黑名单过滤
  44. var pass1 []model.MerchantRaw
  45. for _, raw := range raws {
  46. status := p.filterBlacklist(raw)
  47. if status != "" {
  48. p.saveCleaned(raw, status, nil)
  49. } else {
  50. pass1 = append(pass1, raw)
  51. }
  52. }
  53. if p.reporter != nil {
  54. p.reporter("clean", 1, 3, "第一关完成,剩余 "+itoa(len(pass1))+" 条")
  55. }
  56. // 第二关:去重
  57. pass2 := p.deduplicate(pass1)
  58. if p.reporter != nil {
  59. p.reporter("clean", 2, 3, "第二关完成,去重后 "+itoa(len(pass2))+" 条")
  60. }
  61. // 第三关:TG 真实性验证(有独立 rate limiter)
  62. delayVerify := 3.0
  63. if p.settings != nil {
  64. delayVerify = p.settings.GetFloat(ctx, "tg_scraper.delay_per_verify", 3.0)
  65. }
  66. for i, raw := range pass2 {
  67. if isContextDone(ctx) {
  68. break
  69. }
  70. if p.reporter != nil {
  71. p.reporter("clean", i+1, len(pass2), "验证: @"+raw.TgUsername)
  72. }
  73. if raw.TgUsername == "" {
  74. // 没有 TG 用户名但有其他联系方式,标记为 valid
  75. p.saveCleaned(raw, "valid", nil)
  76. continue
  77. }
  78. userInfo, err := p.verifyTG(ctx, raw.TgUsername)
  79. if err != nil {
  80. log.Printf("[clean] verify error for %s: %v", raw.TgUsername, err)
  81. continue
  82. }
  83. status := "invalid"
  84. if userInfo != nil {
  85. if userInfo.IsChannel {
  86. status = "group"
  87. } else if userInfo.IsBot {
  88. status = "bot"
  89. } else if userInfo.Exists {
  90. status = "valid"
  91. }
  92. }
  93. p.saveCleaned(raw, status, userInfo)
  94. // 独立 rate limiter
  95. select {
  96. case <-ctx.Done():
  97. return nil
  98. case <-time.After(time.Duration(float64(time.Second) * delayVerify)):
  99. }
  100. }
  101. log.Printf("[clean] done")
  102. return nil
  103. }
  104. // filterBlacklist 第一关:黑名单过滤
  105. // 返回应被标记的状态,"" 表示通过
  106. func (p *CleanPhase) filterBlacklist(raw model.MerchantRaw) string {
  107. // 系统 bot 黑名单
  108. botNames := []string{
  109. "telegram", "telegramhints", "gif", "pic", "bing", "vid",
  110. "bold", "vote", "like", "sticker", "music",
  111. "channel_bot", "BotFather", "SpamBot",
  112. }
  113. username := strings.ToLower(raw.TgUsername)
  114. for _, b := range botNames {
  115. if username == strings.ToLower(b) {
  116. return "bot"
  117. }
  118. }
  119. // xxxbot 后缀
  120. if strings.HasSuffix(username, "bot") && len(username) > 3 {
  121. return "bot"
  122. }
  123. // 邀请链接哈希(16-24位 base64)
  124. if len(raw.TgUsername) >= 16 && len(raw.TgUsername) <= 24 {
  125. reBase64 := regexp.MustCompile(`^[A-Za-z0-9_-]{16,24}$`)
  126. if reBase64.MatchString(raw.TgUsername) {
  127. // 计算熵:如果大写+小写+数字混合度高,认为是哈希
  128. if entropy(raw.TgUsername) > 3.5 {
  129. return "invalid"
  130. }
  131. }
  132. }
  133. // original_message 非空且不含中文
  134. if raw.OriginalMessage != "" && !extractor.ContainsChinese(raw.OriginalMessage, 0) {
  135. return "invalid"
  136. }
  137. return ""
  138. }
  139. // entropy 计算字符串的信息熵
  140. func entropy(s string) float64 {
  141. freq := map[rune]int{}
  142. for _, r := range s {
  143. freq[r]++
  144. }
  145. n := float64(len(s))
  146. h := 0.0
  147. for _, count := range freq {
  148. p := float64(count) / n
  149. h -= p * math.Log2(p)
  150. }
  151. return h
  152. }
  153. // deduplicate 第二关:去重
  154. // 同 tg_username 保留信息最丰富的一条,其余标 duplicate
  155. func (p *CleanPhase) deduplicate(raws []model.MerchantRaw) []model.MerchantRaw {
  156. // 按 tg_username 分组
  157. groups := map[string][]model.MerchantRaw{}
  158. for _, raw := range raws {
  159. key := raw.TgUsername
  160. if key == "" {
  161. key = raw.Website
  162. }
  163. if key == "" {
  164. key = raw.Email
  165. }
  166. if key == "" {
  167. key = itoa(int(raw.ID)) // 无法去重的保留
  168. }
  169. groups[key] = append(groups[key], raw)
  170. }
  171. var keepers []model.MerchantRaw
  172. for _, group := range groups {
  173. if len(group) == 1 {
  174. keepers = append(keepers, group[0])
  175. continue
  176. }
  177. // 按丰富度打分,保留最高分
  178. best := group[0]
  179. bestScore := richness(best)
  180. for _, r := range group[1:] {
  181. s := richness(r)
  182. if s > bestScore {
  183. // 将被替换的标为 duplicate
  184. p.saveCleaned(best, "duplicate", nil)
  185. bestScore = s
  186. best = r
  187. } else {
  188. p.saveCleaned(r, "duplicate", nil)
  189. }
  190. }
  191. keepers = append(keepers, best)
  192. }
  193. return keepers
  194. }
  195. // richness 信息丰富度评分
  196. func richness(r model.MerchantRaw) int {
  197. score := 0
  198. if r.TgUsername != "" {
  199. score++
  200. }
  201. if r.Website != "" {
  202. score++
  203. }
  204. if r.Email != "" {
  205. score++
  206. }
  207. if r.Phone != "" {
  208. score++
  209. }
  210. if r.MerchantName != "" {
  211. score++
  212. }
  213. return score
  214. }
  215. // verifyTG 调用 TG API 验证用户名
  216. func (p *CleanPhase) verifyTG(ctx context.Context, username string) (*telegram.UserInfo, error) {
  217. if p.tgManager == nil {
  218. return nil, nil
  219. }
  220. acc, err := p.tgManager.Acquire(ctx)
  221. if err != nil {
  222. return nil, err
  223. }
  224. if err := acc.Client.Connect(ctx); err != nil {
  225. p.tgManager.Release(acc, 0)
  226. return nil, err
  227. }
  228. userInfo, err := acc.Client.VerifyUser(ctx, username)
  229. if err != nil {
  230. if fw, ok := err.(*telegram.FloodWaitError); ok {
  231. handleErr := p.tgManager.HandleFloodWait(acc, fw.Seconds)
  232. return nil, handleErr
  233. }
  234. p.tgManager.Release(acc, 0)
  235. return nil, err
  236. }
  237. p.tgManager.Release(acc, 0)
  238. return userInfo, nil
  239. }
  240. // saveCleaned 将原始商户写入 merchants_clean
  241. func (p *CleanPhase) saveCleaned(raw model.MerchantRaw, status string, userInfo *telegram.UserInfo) {
  242. clean := model.MerchantClean{
  243. RawID: &raw.ID,
  244. MerchantName: raw.MerchantName,
  245. TgUsername: raw.TgUsername,
  246. Website: raw.Website,
  247. Email: raw.Email,
  248. Phone: raw.Phone,
  249. Industry: raw.Industry,
  250. Status: status,
  251. SourceCount: 1,
  252. SourceLinks: datatypes.JSON([]byte(`[]`)),
  253. }
  254. if userInfo != nil && userInfo.Exists {
  255. clean.TgFirstName = userInfo.FirstName
  256. clean.TgLastName = userInfo.LastName
  257. clean.IsPremium = userInfo.IsPremium
  258. clean.LastOnline = userInfo.LastOnline
  259. // 活跃度
  260. if userInfo.LastOnline != nil {
  261. days := time.Since(*userInfo.LastOnline).Hours() / 24
  262. if days < 3 {
  263. clean.ActiveLevel = "active"
  264. } else if days < 30 {
  265. clean.ActiveLevel = "moderate"
  266. } else {
  267. clean.ActiveLevel = "inactive"
  268. }
  269. }
  270. }
  271. // 冲突时按 tg_username unique 更新
  272. if clean.TgUsername != "" {
  273. p.db.Where(model.MerchantClean{TgUsername: clean.TgUsername}).FirstOrCreate(&clean)
  274. } else {
  275. p.db.Create(&clean)
  276. }
  277. }
  278. // itoa converts int to string.
  279. func itoa(n int) string {
  280. return fmt.Sprintf("%d", n)
  281. }