package task import ( "fmt" "log" "strconv" "strings" "sync" "time" "gorm.io/gorm" "spider/internal/model" ) // Scheduler checks enabled ScheduleJobs every minute and starts tasks when due. type Scheduler struct { db *gorm.DB manager *Manager mu sync.Mutex jobs []model.ScheduleJob stopCh chan struct{} done chan struct{} } // NewScheduler creates a new Scheduler. func NewScheduler(db *gorm.DB, mgr *Manager) *Scheduler { return &Scheduler{ db: db, manager: mgr, stopCh: make(chan struct{}), done: make(chan struct{}), } } // Start loads jobs from DB and begins the ticker loop. func (s *Scheduler) Start() { s.loadJobs() go s.loop() log.Println("[scheduler] started") } // Stop signals the loop to exit and waits. func (s *Scheduler) Stop() { close(s.stopCh) <-s.done log.Println("[scheduler] stopped") } // Reload re-reads all enabled jobs from the database. func (s *Scheduler) Reload() { s.loadJobs() log.Println("[scheduler] reloaded jobs") } func (s *Scheduler) loadJobs() { var jobs []model.ScheduleJob if err := s.db.Where("enabled = ?", true).Find(&jobs).Error; err != nil { log.Printf("[scheduler] load jobs error: %v", err) return } // Calculate NextRunAt for any job that doesn't have one yet now := time.Now() for i := range jobs { if jobs[i].NextRunAt == nil { next, err := calcNextRun(jobs[i].CronExpr, now) if err != nil { log.Printf("[scheduler] bad cron for job %d (%s): %v", jobs[i].ID, jobs[i].CronExpr, err) continue } jobs[i].NextRunAt = &next s.db.Model(&jobs[i]).Update("next_run_at", next) } } s.mu.Lock() s.jobs = jobs s.mu.Unlock() } func (s *Scheduler) loop() { defer close(s.done) ticker := time.NewTicker(1 * time.Minute) defer ticker.Stop() for { select { case <-s.stopCh: return case now := <-ticker.C: s.tick(now) } } } func (s *Scheduler) tick(now time.Time) { defer func() { if r := recover(); r != nil { log.Printf("[scheduler] PANIC in tick: %v", r) } }() s.mu.Lock() jobs := make([]model.ScheduleJob, len(s.jobs)) copy(jobs, s.jobs) s.mu.Unlock() for _, job := range jobs { if job.NextRunAt == nil { continue } if !now.Before(*job.NextRunAt) { s.runJob(job, now) } } } func (s *Scheduler) runJob(job model.ScheduleJob, now time.Time) { defer func() { if r := recover(); r != nil { log.Printf("[scheduler] PANIC running job %d: %v", job.ID, r) } }() log.Printf("[scheduler] triggering job %d (%s) plugin=%s", job.ID, job.Name, job.PluginName) req := StartRequest{ PluginName: job.PluginName, } _, err := s.manager.StartTask(req) if err != nil { log.Printf("[scheduler] job %d start error: %v", job.ID, err) } s.manager.notify("schedule_run", "定时任务触发", fmt.Sprintf("定时任务 [%s] (插件: %s) 已触发执行", job.Name, job.PluginName)) // Update last_run_at and next_run_at lastRun := now next, calcErr := calcNextRun(job.CronExpr, now) updates := map[string]any{"last_run_at": lastRun} if calcErr == nil { updates["next_run_at"] = next } s.db.Model(&model.ScheduleJob{}).Where("id = ?", job.ID).Updates(updates) // Update in-memory copy s.mu.Lock() for i := range s.jobs { if s.jobs[i].ID == job.ID { s.jobs[i].LastRunAt = &lastRun if calcErr == nil { s.jobs[i].NextRunAt = &next } break } } s.mu.Unlock() } // calcNextRun computes the next run time from a cron expression. // Supported formats: // - */N * * * * — every N minutes // - 0 H * * * — daily at hour H // - 0 H * * D — weekly on day D at hour H (0=Sunday) func calcNextRun(expr string, after time.Time) (time.Time, error) { parts := strings.Fields(expr) if len(parts) != 5 { return time.Time{}, fmt.Errorf("invalid cron: need 5 fields, got %d", len(parts)) } minute, hour, _, _, dow := parts[0], parts[1], parts[2], parts[3], parts[4] // Case 1: */N * * * * — every N minutes if strings.HasPrefix(minute, "*/") && hour == "*" && dow == "*" { nStr := strings.TrimPrefix(minute, "*/") n, err := strconv.Atoi(nStr) if err != nil || n <= 0 { return time.Time{}, fmt.Errorf("invalid interval: %s", nStr) } next := after.Add(time.Duration(n) * time.Minute) // Truncate seconds next = next.Truncate(time.Minute) return next, nil } // Minute must be a number for the remaining patterns m, err := strconv.Atoi(minute) if err != nil { return time.Time{}, fmt.Errorf("invalid minute: %s", minute) } h, err := strconv.Atoi(hour) if err != nil { return time.Time{}, fmt.Errorf("invalid hour: %s", hour) } // Case 2: 0 H * * * — daily at hour H if dow == "*" { candidate := time.Date(after.Year(), after.Month(), after.Day(), h, m, 0, 0, after.Location()) if !candidate.After(after) { candidate = candidate.AddDate(0, 0, 1) } return candidate, nil } // Case 3: 0 H * * D — weekly on day D at hour H d, err := strconv.Atoi(dow) if err != nil { return time.Time{}, fmt.Errorf("invalid day of week: %s", dow) } targetDay := time.Weekday(d) candidate := time.Date(after.Year(), after.Month(), after.Day(), h, m, 0, 0, after.Location()) // Move to target weekday daysUntil := int(targetDay) - int(candidate.Weekday()) if daysUntil < 0 { daysUntil += 7 } candidate = candidate.AddDate(0, 0, daysUntil) if !candidate.After(after) { candidate = candidate.AddDate(0, 0, 7) } return candidate, nil }