| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225 |
- package task
- import (
- "fmt"
- "log"
- "strconv"
- "strings"
- "sync"
- "time"
- "gorm.io/gorm"
- "spider/internal/model"
- )
- // Scheduler checks enabled ScheduleJobs every minute and starts tasks when due.
- type Scheduler struct {
- db *gorm.DB
- manager *Manager
- mu sync.Mutex
- jobs []model.ScheduleJob
- stopCh chan struct{}
- done chan struct{}
- }
- // NewScheduler creates a new Scheduler.
- func NewScheduler(db *gorm.DB, mgr *Manager) *Scheduler {
- return &Scheduler{
- db: db,
- manager: mgr,
- stopCh: make(chan struct{}),
- done: make(chan struct{}),
- }
- }
- // Start loads jobs from DB and begins the ticker loop.
- func (s *Scheduler) Start() {
- s.loadJobs()
- go s.loop()
- log.Println("[scheduler] started")
- }
- // Stop signals the loop to exit and waits.
- func (s *Scheduler) Stop() {
- close(s.stopCh)
- <-s.done
- log.Println("[scheduler] stopped")
- }
- // Reload re-reads all enabled jobs from the database.
- func (s *Scheduler) Reload() {
- s.loadJobs()
- log.Println("[scheduler] reloaded jobs")
- }
- func (s *Scheduler) loadJobs() {
- var jobs []model.ScheduleJob
- if err := s.db.Where("enabled = ?", true).Find(&jobs).Error; err != nil {
- log.Printf("[scheduler] load jobs error: %v", err)
- return
- }
- // Calculate NextRunAt for any job that doesn't have one yet
- now := time.Now()
- for i := range jobs {
- if jobs[i].NextRunAt == nil {
- next, err := calcNextRun(jobs[i].CronExpr, now)
- if err != nil {
- log.Printf("[scheduler] bad cron for job %d (%s): %v", jobs[i].ID, jobs[i].CronExpr, err)
- continue
- }
- jobs[i].NextRunAt = &next
- s.db.Model(&jobs[i]).Update("next_run_at", next)
- }
- }
- s.mu.Lock()
- s.jobs = jobs
- s.mu.Unlock()
- }
- func (s *Scheduler) loop() {
- defer close(s.done)
- ticker := time.NewTicker(1 * time.Minute)
- defer ticker.Stop()
- for {
- select {
- case <-s.stopCh:
- return
- case now := <-ticker.C:
- s.tick(now)
- }
- }
- }
- func (s *Scheduler) tick(now time.Time) {
- defer func() {
- if r := recover(); r != nil {
- log.Printf("[scheduler] PANIC in tick: %v", r)
- }
- }()
- s.mu.Lock()
- jobs := make([]model.ScheduleJob, len(s.jobs))
- copy(jobs, s.jobs)
- s.mu.Unlock()
- for _, job := range jobs {
- if job.NextRunAt == nil {
- continue
- }
- if !now.Before(*job.NextRunAt) {
- s.runJob(job, now)
- }
- }
- }
- func (s *Scheduler) runJob(job model.ScheduleJob, now time.Time) {
- defer func() {
- if r := recover(); r != nil {
- log.Printf("[scheduler] PANIC running job %d: %v", job.ID, r)
- }
- }()
- log.Printf("[scheduler] triggering job %d (%s) plugin=%s", job.ID, job.Name, job.PluginName)
- req := StartRequest{
- PluginName: job.PluginName,
- }
- _, err := s.manager.StartTask(req)
- if err != nil {
- log.Printf("[scheduler] job %d start error: %v", job.ID, err)
- }
- s.manager.notify("schedule_run", "定时任务触发",
- fmt.Sprintf("定时任务 [%s] (插件: %s) 已触发执行", job.Name, job.PluginName))
- // Update last_run_at and next_run_at
- lastRun := now
- next, calcErr := calcNextRun(job.CronExpr, now)
- updates := map[string]any{"last_run_at": lastRun}
- if calcErr == nil {
- updates["next_run_at"] = next
- }
- s.db.Model(&model.ScheduleJob{}).Where("id = ?", job.ID).Updates(updates)
- // Update in-memory copy
- s.mu.Lock()
- for i := range s.jobs {
- if s.jobs[i].ID == job.ID {
- s.jobs[i].LastRunAt = &lastRun
- if calcErr == nil {
- s.jobs[i].NextRunAt = &next
- }
- break
- }
- }
- s.mu.Unlock()
- }
- // calcNextRun computes the next run time from a cron expression.
- // Supported formats:
- // - */N * * * * — every N minutes
- // - 0 H * * * — daily at hour H
- // - 0 H * * D — weekly on day D at hour H (0=Sunday)
- func calcNextRun(expr string, after time.Time) (time.Time, error) {
- parts := strings.Fields(expr)
- if len(parts) != 5 {
- return time.Time{}, fmt.Errorf("invalid cron: need 5 fields, got %d", len(parts))
- }
- minute, hour, _, _, dow := parts[0], parts[1], parts[2], parts[3], parts[4]
- // Case 1: */N * * * * — every N minutes
- if strings.HasPrefix(minute, "*/") && hour == "*" && dow == "*" {
- nStr := strings.TrimPrefix(minute, "*/")
- n, err := strconv.Atoi(nStr)
- if err != nil || n <= 0 {
- return time.Time{}, fmt.Errorf("invalid interval: %s", nStr)
- }
- next := after.Add(time.Duration(n) * time.Minute)
- // Truncate seconds
- next = next.Truncate(time.Minute)
- return next, nil
- }
- // Minute must be a number for the remaining patterns
- m, err := strconv.Atoi(minute)
- if err != nil {
- return time.Time{}, fmt.Errorf("invalid minute: %s", minute)
- }
- h, err := strconv.Atoi(hour)
- if err != nil {
- return time.Time{}, fmt.Errorf("invalid hour: %s", hour)
- }
- // Case 2: 0 H * * * — daily at hour H
- if dow == "*" {
- candidate := time.Date(after.Year(), after.Month(), after.Day(), h, m, 0, 0, after.Location())
- if !candidate.After(after) {
- candidate = candidate.AddDate(0, 0, 1)
- }
- return candidate, nil
- }
- // Case 3: 0 H * * D — weekly on day D at hour H
- d, err := strconv.Atoi(dow)
- if err != nil {
- return time.Time{}, fmt.Errorf("invalid day of week: %s", dow)
- }
- targetDay := time.Weekday(d)
- candidate := time.Date(after.Year(), after.Month(), after.Day(), h, m, 0, 0, after.Location())
- // Move to target weekday
- daysUntil := int(targetDay) - int(candidate.Weekday())
- if daysUntil < 0 {
- daysUntil += 7
- }
- candidate = candidate.AddDate(0, 0, daysUntil)
- if !candidate.After(after) {
- candidate = candidate.AddDate(0, 0, 7)
- }
- return candidate, nil
- }
|