| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112 |
- package pipeline
- import (
- "context"
- "fmt"
- "log"
- "spider/internal/model"
- "github.com/redis/go-redis/v9"
- "gorm.io/gorm"
- )
- // fullPhaseOrder defines the sequential execution order for a full pipeline run.
- var fullPhaseOrder = []string{
- "discover",
- "search",
- "github",
- "scrape",
- "crawl",
- "clean",
- "score",
- }
- // Runner Pipeline 调度器
- type Runner struct {
- db *gorm.DB
- redis *redis.Client
- phases map[string]Phase // 注册的 phase,key 是 phase 名称
- reporter ProgressReporter
- }
- // NewRunner creates a new pipeline Runner.
- func NewRunner(db *gorm.DB, rdb *redis.Client) *Runner {
- return &Runner{
- db: db,
- redis: rdb,
- phases: make(map[string]Phase),
- }
- }
- // RegisterPhase 注册一个 phase 实现
- func (r *Runner) RegisterPhase(p Phase) {
- r.phases[p.Name()] = p
- }
- // SetProgressReporter 设置进度上报函数
- func (r *Runner) SetProgressReporter(fn ProgressReporter) {
- r.reporter = fn
- }
- // report calls the reporter if one is set; otherwise logs to stderr.
- func (r *Runner) report(phase string, current, total int, message string) {
- if r.reporter != nil {
- r.reporter(phase, current, total, message)
- }
- }
- // Run 执行 pipeline
- // task.TaskType: "full" | "discover" | "search" | "github" | "scrape" | "crawl" | "clean" | "score"
- // full 类型按顺序执行所有未跳过的 phase
- // 单阶段类型直接执行对应 phase
- func (r *Runner) Run(ctx context.Context, task *model.Task, opts *Options) error {
- if task.TaskType == "full" {
- for _, phaseName := range fullPhaseOrder {
- if isContextDone(ctx) {
- return fmt.Errorf("pipeline cancelled before phase %s", phaseName)
- }
- if ShouldSkip(phaseName, opts.SkipPhases) {
- log.Printf("[pipeline] skipping phase=%s (in SkipPhases)", phaseName)
- continue
- }
- r.report(phaseName, 0, 0, "开始 "+phaseName)
- if err := r.runSingle(ctx, task, phaseName, opts); err != nil {
- log.Printf("[pipeline] phase=%s error: %v (continuing)", phaseName, err)
- }
- r.report(phaseName, 100, 100, phaseName+" 完成")
- }
- return nil
- }
- // Single-phase task
- phaseName := task.TaskType
- if isContextDone(ctx) {
- return fmt.Errorf("pipeline cancelled before phase %s", phaseName)
- }
- r.report(phaseName, 0, 0, "开始 "+phaseName)
- if err := r.runSingle(ctx, task, phaseName, opts); err != nil {
- r.report(phaseName, 0, 0, phaseName+" 失败: "+err.Error())
- return err
- }
- r.report(phaseName, 100, 100, phaseName+" 完成")
- return nil
- }
- // runSingle 执行单个 phase
- func (r *Runner) runSingle(ctx context.Context, task *model.Task, phaseName string, opts *Options) error {
- p, ok := r.phases[phaseName]
- if !ok {
- return fmt.Errorf("phase %q not registered", phaseName)
- }
- return p.Run(ctx, task, opts)
- }
- // isContextDone 检查 context 是否已取消(用于各阶段检查停止信号)
- func isContextDone(ctx context.Context) bool {
- select {
- case <-ctx.Done():
- return true
- default:
- return false
- }
- }
|