package pipeline import ( "context" "log" "regexp" "strings" "time" "spider/internal/model" "spider/internal/telegram" "gorm.io/gorm" ) // DiscoverPhase Phase 1: TG 频道裂变发现 type DiscoverPhase struct { db *gorm.DB tgManager *telegram.AccountManager settings Settings reporter ProgressReporter } // NewDiscoverPhase creates a new DiscoverPhase. func NewDiscoverPhase(db *gorm.DB, tgManager *telegram.AccountManager, settings Settings) *DiscoverPhase { return &DiscoverPhase{ db: db, tgManager: tgManager, settings: settings, } } func (p *DiscoverPhase) Name() string { return "discover" } func (p *DiscoverPhase) Run(ctx context.Context, task *model.Task, opts *Options) error { log.Printf("[discover] starting, task_id=%d", task.ID) if p.tgManager == nil { log.Printf("[discover] tgManager is nil, skipping") return nil } // 1. 读配置 maxDepth := 3 maxPerLayer := p.settings.GetInt(ctx, "snowball.max_channels_per_layer", 200) maxTotal := p.settings.GetInt(ctx, "snowball.max_channels_total", 500) // 2. 从 managed_seeds 拿所有 active 种子 var seeds []model.ManagedSeed p.db.Where("status = ?", "active").Find(&seeds) log.Printf("[discover] found %d active seeds", len(seeds)) // 3. BFS 队列 type QueueItem struct { Username string Depth int Source string // "seed" 或 "snowball" } queue := make([]QueueItem, 0, len(seeds)) for _, s := range seeds { queue = append(queue, QueueItem{Username: s.ChannelName, Depth: 0, Source: "seed"}) } visited := map[string]bool{} totalFound := 0 // 4. BFS 处理 for len(queue) > 0 && totalFound < maxTotal { if isContextDone(ctx) { break } item := queue[0] queue = queue[1:] username := cleanUsername(item.Username) if username == "" || visited[username] { continue } visited[username] = true // 获取 TG 账号 acc, err := p.tgManager.Acquire(ctx) if err != nil { log.Printf("[discover] no available TG account: %v", err) break } // 连接并获取频道信息 if err := acc.Client.Connect(ctx); err != nil { log.Printf("[discover] connect failed for account: %v", err) p.tgManager.Release(acc, 0) continue } channelInfo, err := acc.Client.GetChannelInfo(ctx, username) if err != nil { if fw, ok := err.(*telegram.FloodWaitError); ok { log.Printf("[discover] FloodWait %ds on @%s", fw.Seconds, username) p.tgManager.HandleFloodWait(acc, fw.Seconds) } else { log.Printf("[discover] GetChannelInfo error @%s: %v", username, err) p.tgManager.Release(acc, 0) } continue } // 写入 channels 表(忽略 unique 冲突) ch := &model.Channel{ Username: username, Title: channelInfo.Title, MemberCount: channelInfo.MemberCount, About: channelInfo.About, Source: item.Source, Status: "pending", } p.db.Where(model.Channel{Username: username}).FirstOrCreate(ch) totalFound++ if p.reporter != nil { p.reporter("discover", totalFound, maxTotal, "发现频道: @"+username) } // 如果还没到最大深度,读消息提取更多频道 if item.Depth < maxDepth { msgs, err := acc.Client.GetMessages(ctx, username, 0, 100) if err == nil { layerCount := 0 for _, msg := range msgs { if layerCount >= maxPerLayer { break } // 提取 forward 来源频道 if msg.ForwardFromChannel != "" { fwdName := cleanUsername(msg.ForwardFromChannel) if fwdName != "" && !visited[fwdName] { queue = append(queue, QueueItem{fwdName, item.Depth + 1, "snowball"}) layerCount++ } } // 提取消息中的 t.me 链接 for _, link := range msg.Links { name := extractUsernameFromLink(link) if name != "" && !visited[name] { queue = append(queue, QueueItem{name, item.Depth + 1, "snowball"}) layerCount++ } } } } else { log.Printf("[discover] GetMessages @%s: %v", username, err) } } p.tgManager.Release(acc, 0) // 频道间 sleep select { case <-ctx.Done(): return nil case <-time.After(5 * time.Second): } } log.Printf("[discover] done, found %d channels", totalFound) return nil } // cleanUsername 清理用户名(去除 @ 前缀及空白) func cleanUsername(s string) string { return strings.TrimPrefix(strings.TrimSpace(s), "@") } // extractUsernameFromLink 从 t.me/xxx 链接提取用户名 func extractUsernameFromLink(link string) string { re := regexp.MustCompile(`t(?:elegram)?\.me/([a-zA-Z][a-zA-Z0-9_]{4,31})`) m := re.FindStringSubmatch(link) if len(m) > 1 { return m[1] } return "" }