| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183 |
- package pipeline
- import (
- "context"
- "log"
- "regexp"
- "strings"
- "time"
- "spider/internal/model"
- "spider/internal/telegram"
- "gorm.io/gorm"
- )
- // DiscoverPhase Phase 1: TG 频道裂变发现
- type DiscoverPhase struct {
- db *gorm.DB
- tgManager *telegram.AccountManager
- settings Settings
- reporter ProgressReporter
- }
- // NewDiscoverPhase creates a new DiscoverPhase.
- func NewDiscoverPhase(db *gorm.DB, tgManager *telegram.AccountManager, settings Settings) *DiscoverPhase {
- return &DiscoverPhase{
- db: db,
- tgManager: tgManager,
- settings: settings,
- }
- }
- func (p *DiscoverPhase) Name() string { return "discover" }
- func (p *DiscoverPhase) Run(ctx context.Context, task *model.Task, opts *Options) error {
- log.Printf("[discover] starting, task_id=%d", task.ID)
- if p.tgManager == nil {
- log.Printf("[discover] tgManager is nil, skipping")
- return nil
- }
- // 1. 读配置
- maxDepth := 3
- maxPerLayer := p.settings.GetInt(ctx, "snowball.max_channels_per_layer", 200)
- maxTotal := p.settings.GetInt(ctx, "snowball.max_channels_total", 500)
- // 2. 从 managed_seeds 拿所有 active 种子
- var seeds []model.ManagedSeed
- p.db.Where("status = ?", "active").Find(&seeds)
- log.Printf("[discover] found %d active seeds", len(seeds))
- // 3. BFS 队列
- type QueueItem struct {
- Username string
- Depth int
- Source string // "seed" 或 "snowball"
- }
- queue := make([]QueueItem, 0, len(seeds))
- for _, s := range seeds {
- queue = append(queue, QueueItem{Username: s.ChannelName, Depth: 0, Source: "seed"})
- }
- visited := map[string]bool{}
- totalFound := 0
- // 4. BFS 处理
- for len(queue) > 0 && totalFound < maxTotal {
- if isContextDone(ctx) {
- break
- }
- item := queue[0]
- queue = queue[1:]
- username := cleanUsername(item.Username)
- if username == "" || visited[username] {
- continue
- }
- visited[username] = true
- // 获取 TG 账号
- acc, err := p.tgManager.Acquire(ctx)
- if err != nil {
- log.Printf("[discover] no available TG account: %v", err)
- break
- }
- // 连接并获取频道信息
- if err := acc.Client.Connect(ctx); err != nil {
- log.Printf("[discover] connect failed for account: %v", err)
- p.tgManager.Release(acc, 0)
- continue
- }
- channelInfo, err := acc.Client.GetChannelInfo(ctx, username)
- if err != nil {
- if fw, ok := err.(*telegram.FloodWaitError); ok {
- log.Printf("[discover] FloodWait %ds on @%s", fw.Seconds, username)
- p.tgManager.HandleFloodWait(acc, fw.Seconds)
- } else {
- log.Printf("[discover] GetChannelInfo error @%s: %v", username, err)
- p.tgManager.Release(acc, 0)
- }
- continue
- }
- // 写入 channels 表(忽略 unique 冲突)
- ch := &model.Channel{
- Username: username,
- Title: channelInfo.Title,
- MemberCount: channelInfo.MemberCount,
- About: channelInfo.About,
- Source: item.Source,
- Status: "pending",
- }
- p.db.Where(model.Channel{Username: username}).FirstOrCreate(ch)
- totalFound++
- if p.reporter != nil {
- p.reporter("discover", totalFound, maxTotal, "发现频道: @"+username)
- }
- // 如果还没到最大深度,读消息提取更多频道
- if item.Depth < maxDepth {
- msgs, err := acc.Client.GetMessages(ctx, username, 0, 100)
- if err == nil {
- layerCount := 0
- for _, msg := range msgs {
- if layerCount >= maxPerLayer {
- break
- }
- // 提取 forward 来源频道
- if msg.ForwardFromChannel != "" {
- fwdName := cleanUsername(msg.ForwardFromChannel)
- if fwdName != "" && !visited[fwdName] {
- queue = append(queue, QueueItem{fwdName, item.Depth + 1, "snowball"})
- layerCount++
- }
- }
- // 提取消息中的 t.me 链接
- for _, link := range msg.Links {
- name := extractUsernameFromLink(link)
- if name != "" && !visited[name] {
- queue = append(queue, QueueItem{name, item.Depth + 1, "snowball"})
- layerCount++
- }
- }
- }
- } else {
- log.Printf("[discover] GetMessages @%s: %v", username, err)
- }
- }
- p.tgManager.Release(acc, 0)
- // 频道间 sleep
- select {
- case <-ctx.Done():
- return nil
- case <-time.After(5 * time.Second):
- }
- }
- log.Printf("[discover] done, found %d channels", totalFound)
- return nil
- }
- // cleanUsername 清理用户名(去除 @ 前缀及空白)
- func cleanUsername(s string) string {
- return strings.TrimPrefix(strings.TrimSpace(s), "@")
- }
- // extractUsernameFromLink 从 t.me/xxx 链接提取用户名
- func extractUsernameFromLink(link string) string {
- re := regexp.MustCompile(`t(?:elegram)?\.me/([a-zA-Z][a-zA-Z0-9_]{4,31})`)
- m := re.FindStringSubmatch(link)
- if len(m) > 1 {
- return m[1]
- }
- return ""
- }
|