scheduler.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. package task
  2. import (
  3. "fmt"
  4. "log"
  5. "strconv"
  6. "strings"
  7. "sync"
  8. "time"
  9. "gorm.io/gorm"
  10. "spider/internal/model"
  11. )
  12. // Scheduler checks enabled ScheduleJobs every minute and starts tasks when due.
  13. type Scheduler struct {
  14. db *gorm.DB
  15. manager *Manager
  16. mu sync.Mutex
  17. jobs []model.ScheduleJob
  18. stopCh chan struct{}
  19. done chan struct{}
  20. }
  21. // NewScheduler creates a new Scheduler.
  22. func NewScheduler(db *gorm.DB, mgr *Manager) *Scheduler {
  23. return &Scheduler{
  24. db: db,
  25. manager: mgr,
  26. stopCh: make(chan struct{}),
  27. done: make(chan struct{}),
  28. }
  29. }
  30. // Start loads jobs from DB and begins the ticker loop.
  31. func (s *Scheduler) Start() {
  32. s.loadJobs()
  33. go s.loop()
  34. log.Println("[scheduler] started")
  35. }
  36. // Stop signals the loop to exit and waits.
  37. func (s *Scheduler) Stop() {
  38. close(s.stopCh)
  39. <-s.done
  40. log.Println("[scheduler] stopped")
  41. }
  42. // Reload re-reads all enabled jobs from the database.
  43. func (s *Scheduler) Reload() {
  44. s.loadJobs()
  45. log.Println("[scheduler] reloaded jobs")
  46. }
  47. func (s *Scheduler) loadJobs() {
  48. var jobs []model.ScheduleJob
  49. if err := s.db.Where("enabled = ?", true).Find(&jobs).Error; err != nil {
  50. log.Printf("[scheduler] load jobs error: %v", err)
  51. return
  52. }
  53. // Calculate NextRunAt for any job that doesn't have one yet
  54. now := time.Now()
  55. for i := range jobs {
  56. if jobs[i].NextRunAt == nil {
  57. next, err := calcNextRun(jobs[i].CronExpr, now)
  58. if err != nil {
  59. log.Printf("[scheduler] bad cron for job %d (%s): %v", jobs[i].ID, jobs[i].CronExpr, err)
  60. continue
  61. }
  62. jobs[i].NextRunAt = &next
  63. s.db.Model(&jobs[i]).Update("next_run_at", next)
  64. }
  65. }
  66. s.mu.Lock()
  67. s.jobs = jobs
  68. s.mu.Unlock()
  69. }
  70. func (s *Scheduler) loop() {
  71. defer close(s.done)
  72. ticker := time.NewTicker(1 * time.Minute)
  73. defer ticker.Stop()
  74. for {
  75. select {
  76. case <-s.stopCh:
  77. return
  78. case now := <-ticker.C:
  79. s.tick(now)
  80. }
  81. }
  82. }
  83. func (s *Scheduler) tick(now time.Time) {
  84. defer func() {
  85. if r := recover(); r != nil {
  86. log.Printf("[scheduler] PANIC in tick: %v", r)
  87. }
  88. }()
  89. s.mu.Lock()
  90. jobs := make([]model.ScheduleJob, len(s.jobs))
  91. copy(jobs, s.jobs)
  92. s.mu.Unlock()
  93. for _, job := range jobs {
  94. if job.NextRunAt == nil {
  95. continue
  96. }
  97. if !now.Before(*job.NextRunAt) {
  98. s.runJob(job, now)
  99. }
  100. }
  101. }
  102. func (s *Scheduler) runJob(job model.ScheduleJob, now time.Time) {
  103. defer func() {
  104. if r := recover(); r != nil {
  105. log.Printf("[scheduler] PANIC running job %d: %v", job.ID, r)
  106. }
  107. }()
  108. log.Printf("[scheduler] triggering job %d (%s) plugin=%s", job.ID, job.Name, job.PluginName)
  109. req := StartRequest{
  110. PluginName: job.PluginName,
  111. }
  112. _, err := s.manager.StartTask(req)
  113. if err != nil {
  114. log.Printf("[scheduler] job %d start error: %v", job.ID, err)
  115. }
  116. s.manager.notify("schedule_run", "定时任务触发",
  117. fmt.Sprintf("定时任务 [%s] (插件: %s) 已触发执行", job.Name, job.PluginName))
  118. // Update last_run_at and next_run_at
  119. lastRun := now
  120. next, calcErr := calcNextRun(job.CronExpr, now)
  121. updates := map[string]any{"last_run_at": lastRun}
  122. if calcErr == nil {
  123. updates["next_run_at"] = next
  124. }
  125. s.db.Model(&model.ScheduleJob{}).Where("id = ?", job.ID).Updates(updates)
  126. // Update in-memory copy
  127. s.mu.Lock()
  128. for i := range s.jobs {
  129. if s.jobs[i].ID == job.ID {
  130. s.jobs[i].LastRunAt = &lastRun
  131. if calcErr == nil {
  132. s.jobs[i].NextRunAt = &next
  133. }
  134. break
  135. }
  136. }
  137. s.mu.Unlock()
  138. }
  139. // calcNextRun computes the next run time from a cron expression.
  140. // Supported formats:
  141. // - */N * * * * — every N minutes
  142. // - 0 H * * * — daily at hour H
  143. // - 0 H * * D — weekly on day D at hour H (0=Sunday)
  144. func calcNextRun(expr string, after time.Time) (time.Time, error) {
  145. parts := strings.Fields(expr)
  146. if len(parts) != 5 {
  147. return time.Time{}, fmt.Errorf("invalid cron: need 5 fields, got %d", len(parts))
  148. }
  149. minute, hour, _, _, dow := parts[0], parts[1], parts[2], parts[3], parts[4]
  150. // Case 1: */N * * * * — every N minutes
  151. if strings.HasPrefix(minute, "*/") && hour == "*" && dow == "*" {
  152. nStr := strings.TrimPrefix(minute, "*/")
  153. n, err := strconv.Atoi(nStr)
  154. if err != nil || n <= 0 {
  155. return time.Time{}, fmt.Errorf("invalid interval: %s", nStr)
  156. }
  157. next := after.Add(time.Duration(n) * time.Minute)
  158. // Truncate seconds
  159. next = next.Truncate(time.Minute)
  160. return next, nil
  161. }
  162. // Minute must be a number for the remaining patterns
  163. m, err := strconv.Atoi(minute)
  164. if err != nil {
  165. return time.Time{}, fmt.Errorf("invalid minute: %s", minute)
  166. }
  167. h, err := strconv.Atoi(hour)
  168. if err != nil {
  169. return time.Time{}, fmt.Errorf("invalid hour: %s", hour)
  170. }
  171. // Case 2: 0 H * * * — daily at hour H
  172. if dow == "*" {
  173. candidate := time.Date(after.Year(), after.Month(), after.Day(), h, m, 0, 0, after.Location())
  174. if !candidate.After(after) {
  175. candidate = candidate.AddDate(0, 0, 1)
  176. }
  177. return candidate, nil
  178. }
  179. // Case 3: 0 H * * D — weekly on day D at hour H
  180. d, err := strconv.Atoi(dow)
  181. if err != nil {
  182. return time.Time{}, fmt.Errorf("invalid day of week: %s", dow)
  183. }
  184. targetDay := time.Weekday(d)
  185. candidate := time.Date(after.Year(), after.Month(), after.Day(), h, m, 0, 0, after.Location())
  186. // Move to target weekday
  187. daysUntil := int(targetDay) - int(candidate.Weekday())
  188. if daysUntil < 0 {
  189. daysUntil += 7
  190. }
  191. candidate = candidate.AddDate(0, 0, daysUntil)
  192. if !candidate.After(after) {
  193. candidate = candidate.AddDate(0, 0, 7)
  194. }
  195. return candidate, nil
  196. }