package pipeline import ( "context" "fmt" "log" "spider/internal/model" "github.com/redis/go-redis/v9" "gorm.io/gorm" ) // fullPhaseOrder defines the sequential execution order for a full pipeline run. var fullPhaseOrder = []string{ "discover", "search", "github", "scrape", "crawl", "clean", "score", } // Runner Pipeline 调度器 type Runner struct { db *gorm.DB redis *redis.Client phases map[string]Phase // 注册的 phase,key 是 phase 名称 reporter ProgressReporter } // NewRunner creates a new pipeline Runner. func NewRunner(db *gorm.DB, rdb *redis.Client) *Runner { return &Runner{ db: db, redis: rdb, phases: make(map[string]Phase), } } // RegisterPhase 注册一个 phase 实现 func (r *Runner) RegisterPhase(p Phase) { r.phases[p.Name()] = p } // SetProgressReporter 设置进度上报函数 func (r *Runner) SetProgressReporter(fn ProgressReporter) { r.reporter = fn } // report calls the reporter if one is set; otherwise logs to stderr. func (r *Runner) report(phase string, current, total int, message string) { if r.reporter != nil { r.reporter(phase, current, total, message) } } // Run 执行 pipeline // task.TaskType: "full" | "discover" | "search" | "github" | "scrape" | "crawl" | "clean" | "score" // full 类型按顺序执行所有未跳过的 phase // 单阶段类型直接执行对应 phase func (r *Runner) Run(ctx context.Context, task *model.Task, opts *Options) error { if task.TaskType == "full" { for _, phaseName := range fullPhaseOrder { if isContextDone(ctx) { return fmt.Errorf("pipeline cancelled before phase %s", phaseName) } if ShouldSkip(phaseName, opts.SkipPhases) { log.Printf("[pipeline] skipping phase=%s (in SkipPhases)", phaseName) continue } r.report(phaseName, 0, 0, "开始 "+phaseName) if err := r.runSingle(ctx, task, phaseName, opts); err != nil { log.Printf("[pipeline] phase=%s error: %v (continuing)", phaseName, err) } r.report(phaseName, 100, 100, phaseName+" 完成") } return nil } // Single-phase task phaseName := task.TaskType if isContextDone(ctx) { return fmt.Errorf("pipeline cancelled before phase %s", phaseName) } r.report(phaseName, 0, 0, "开始 "+phaseName) if err := r.runSingle(ctx, task, phaseName, opts); err != nil { r.report(phaseName, 0, 0, phaseName+" 失败: "+err.Error()) return err } r.report(phaseName, 100, 100, phaseName+" 完成") return nil } // runSingle 执行单个 phase func (r *Runner) runSingle(ctx context.Context, task *model.Task, phaseName string, opts *Options) error { p, ok := r.phases[phaseName] if !ok { return fmt.Errorf("phase %q not registered", phaseName) } return p.Run(ctx, task, opts) } // isContextDone 检查 context 是否已取消(用于各阶段检查停止信号) func isContextDone(ctx context.Context) bool { select { case <-ctx.Done(): return true default: return false } }