phase1_discover.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. package pipeline
  2. import (
  3. "context"
  4. "log"
  5. "regexp"
  6. "strings"
  7. "time"
  8. "spider/internal/model"
  9. "spider/internal/telegram"
  10. "gorm.io/gorm"
  11. )
  12. // DiscoverPhase Phase 1: TG 频道裂变发现
  13. type DiscoverPhase struct {
  14. db *gorm.DB
  15. tgManager *telegram.AccountManager
  16. settings Settings
  17. reporter ProgressReporter
  18. }
  19. // NewDiscoverPhase creates a new DiscoverPhase.
  20. func NewDiscoverPhase(db *gorm.DB, tgManager *telegram.AccountManager, settings Settings) *DiscoverPhase {
  21. return &DiscoverPhase{
  22. db: db,
  23. tgManager: tgManager,
  24. settings: settings,
  25. }
  26. }
  27. func (p *DiscoverPhase) Name() string { return "discover" }
  28. func (p *DiscoverPhase) Run(ctx context.Context, task *model.Task, opts *Options) error {
  29. log.Printf("[discover] starting, task_id=%d", task.ID)
  30. if p.tgManager == nil {
  31. log.Printf("[discover] tgManager is nil, skipping")
  32. return nil
  33. }
  34. // 1. 读配置
  35. maxDepth := 3
  36. maxPerLayer := p.settings.GetInt(ctx, "snowball.max_channels_per_layer", 200)
  37. maxTotal := p.settings.GetInt(ctx, "snowball.max_channels_total", 500)
  38. // 2. 从 managed_seeds 拿所有 active 种子
  39. var seeds []model.ManagedSeed
  40. p.db.Where("status = ?", "active").Find(&seeds)
  41. log.Printf("[discover] found %d active seeds", len(seeds))
  42. // 3. BFS 队列
  43. type QueueItem struct {
  44. Username string
  45. Depth int
  46. Source string // "seed" 或 "snowball"
  47. }
  48. queue := make([]QueueItem, 0, len(seeds))
  49. for _, s := range seeds {
  50. queue = append(queue, QueueItem{Username: s.ChannelName, Depth: 0, Source: "seed"})
  51. }
  52. visited := map[string]bool{}
  53. totalFound := 0
  54. // 4. BFS 处理
  55. for len(queue) > 0 && totalFound < maxTotal {
  56. if isContextDone(ctx) {
  57. break
  58. }
  59. item := queue[0]
  60. queue = queue[1:]
  61. username := cleanUsername(item.Username)
  62. if username == "" || visited[username] {
  63. continue
  64. }
  65. visited[username] = true
  66. // 获取 TG 账号
  67. acc, err := p.tgManager.Acquire(ctx)
  68. if err != nil {
  69. log.Printf("[discover] no available TG account: %v", err)
  70. break
  71. }
  72. // 连接并获取频道信息
  73. if err := acc.Client.Connect(ctx); err != nil {
  74. log.Printf("[discover] connect failed for account: %v", err)
  75. p.tgManager.Release(acc, 0)
  76. continue
  77. }
  78. channelInfo, err := acc.Client.GetChannelInfo(ctx, username)
  79. if err != nil {
  80. if fw, ok := err.(*telegram.FloodWaitError); ok {
  81. log.Printf("[discover] FloodWait %ds on @%s", fw.Seconds, username)
  82. p.tgManager.HandleFloodWait(acc, fw.Seconds)
  83. } else {
  84. log.Printf("[discover] GetChannelInfo error @%s: %v", username, err)
  85. p.tgManager.Release(acc, 0)
  86. }
  87. continue
  88. }
  89. // 写入 channels 表(忽略 unique 冲突)
  90. ch := &model.Channel{
  91. Username: username,
  92. Title: channelInfo.Title,
  93. MemberCount: channelInfo.MemberCount,
  94. About: channelInfo.About,
  95. Source: item.Source,
  96. Status: "pending",
  97. }
  98. p.db.Where(model.Channel{Username: username}).FirstOrCreate(ch)
  99. totalFound++
  100. if p.reporter != nil {
  101. p.reporter("discover", totalFound, maxTotal, "发现频道: @"+username)
  102. }
  103. // 如果还没到最大深度,读消息提取更多频道
  104. if item.Depth < maxDepth {
  105. msgs, err := acc.Client.GetMessages(ctx, username, 0, 100)
  106. if err == nil {
  107. layerCount := 0
  108. for _, msg := range msgs {
  109. if layerCount >= maxPerLayer {
  110. break
  111. }
  112. // 提取 forward 来源频道
  113. if msg.ForwardFromChannel != "" {
  114. fwdName := cleanUsername(msg.ForwardFromChannel)
  115. if fwdName != "" && !visited[fwdName] {
  116. queue = append(queue, QueueItem{fwdName, item.Depth + 1, "snowball"})
  117. layerCount++
  118. }
  119. }
  120. // 提取消息中的 t.me 链接
  121. for _, link := range msg.Links {
  122. name := extractUsernameFromLink(link)
  123. if name != "" && !visited[name] {
  124. queue = append(queue, QueueItem{name, item.Depth + 1, "snowball"})
  125. layerCount++
  126. }
  127. }
  128. }
  129. } else {
  130. log.Printf("[discover] GetMessages @%s: %v", username, err)
  131. }
  132. }
  133. p.tgManager.Release(acc, 0)
  134. // 频道间 sleep
  135. select {
  136. case <-ctx.Done():
  137. return nil
  138. case <-time.After(5 * time.Second):
  139. }
  140. }
  141. log.Printf("[discover] done, found %d channels", totalFound)
  142. return nil
  143. }
  144. // cleanUsername 清理用户名(去除 @ 前缀及空白)
  145. func cleanUsername(s string) string {
  146. return strings.TrimPrefix(strings.TrimSpace(s), "@")
  147. }
  148. // extractUsernameFromLink 从 t.me/xxx 链接提取用户名
  149. func extractUsernameFromLink(link string) string {
  150. re := regexp.MustCompile(`t(?:elegram)?\.me/([a-zA-Z][a-zA-Z0-9_]{4,31})`)
  151. m := re.FindStringSubmatch(link)
  152. if len(m) > 1 {
  153. return m[1]
  154. }
  155. return ""
  156. }