package core import ( "sync" "time" "clash-speed-test/internal/config" "clash-speed-test/internal/database" "clash-speed-test/internal/logger" ) type Scheduler struct { speedTester *SpeedTester config *config.Config ticker *time.Ticker stopChan chan struct{} running bool mu sync.RWMutex } func NewScheduler(speedTester *SpeedTester, cfg *config.Config) *Scheduler { return &Scheduler{ speedTester: speedTester, config: cfg, stopChan: make(chan struct{}), } } // 启动调度器 func (s *Scheduler) Start() { s.mu.Lock() defer s.mu.Unlock() if s.running { logger.Warn("调度器已在运行", nil) return } s.running = true s.ticker = time.NewTicker(s.config.Speed.Interval) logger.Info("启动定时任务调度器", map[string]interface{}{ "interval": s.config.Speed.Interval, }) // 如果配置了启动时立即测试 if s.config.Speed.TestOnStart { go s.runSpeedTest() } // 启动定时任务 go func() { for { select { case <-s.ticker.C: s.runSpeedTest() case <-s.stopChan: return } } }() } // 停止调度器 func (s *Scheduler) Stop() { s.mu.Lock() defer s.mu.Unlock() if !s.running { return } s.running = false if s.ticker != nil { s.ticker.Stop() } close(s.stopChan) logger.Info("定时任务调度器已停止", nil) } // 手动触发测速 func (s *Scheduler) TriggerSpeedTest() { s.mu.RLock() if !s.running { s.mu.RUnlock() logger.Warn("调度器未运行,无法触发测速", nil) return } s.mu.RUnlock() go s.runSpeedTest() } // 运行测速任务 func (s *Scheduler) runSpeedTest() { logger.Info("开始执行定时测速任务", nil) // 获取所有活跃节点 nodes, err := database.GetActiveNodes() if err != nil { logger.Error("获取活跃节点失败", map[string]interface{}{ "error": err.Error(), }) return } if len(nodes) == 0 { logger.Warn("没有找到活跃节点", nil) return } logger.Info("开始测试节点", map[string]interface{}{ "count": len(nodes), }) // 执行测速 results := s.speedTester.TestNodes(nodes) // 处理测试结果并更新节点状态 s.processTestResults(nodes, results) // 统计结果 successCount := 0 failCount := 0 for _, result := range results { if result.IsSuccess { successCount++ } else { failCount++ } } logger.Info("测速任务完成", map[string]interface{}{ "total": len(results), "success": successCount, "failed": failCount, }) } // 处理测试结果并更新节点状态 func (s *Scheduler) processTestResults(nodes []database.Node, results []*database.TestResult) { for _, node := range nodes { // 查找对应的测试结果 var testResult *database.TestResult for _, result := range results { if result.NodeID == node.ID { testResult = result break } } if testResult == nil { continue } previousStatus := node.Status previousFailureCount := node.FailureCount // 更新节点状态 if testResult.IsSuccess { // 检查延迟是否超时(超过2000ms) isTimeout := testResult.Latency != nil && *testResult.Latency > 2000 if isTimeout { // 延迟超时,标记为故障节点 newFailureCount := previousFailureCount + 1 updateData := map[string]interface{}{ "status": "offline", "last_test_time": testResult.TestTime, "last_test_result": false, "failure_count": newFailureCount, } if testResult.Latency != nil { updateData["average_latency"] = *testResult.Latency } if err := database.UpdateNode(node.ID, updateData); err != nil { logger.Error("更新节点状态失败", map[string]interface{}{ "node_id": node.ID, "error": err.Error(), }) } logger.Warn("节点延迟超时,标记为故障", map[string]interface{}{ "node_name": node.Name, "latency": *testResult.Latency, }) } else { // 测试成功且延迟正常 updateData := map[string]interface{}{ "status": "online", "last_test_time": testResult.TestTime, "last_test_result": true, "failure_count": 0, } if testResult.Latency != nil { updateData["average_latency"] = *testResult.Latency } if testResult.DownloadSpeed != nil { updateData["average_speed"] = *testResult.DownloadSpeed } if err := database.UpdateNode(node.ID, updateData); err != nil { logger.Error("更新节点状态失败", map[string]interface{}{ "node_id": node.ID, "error": err.Error(), }) } } } else { // 测试失败 newFailureCount := previousFailureCount + 1 updateData := map[string]interface{}{ "status": "offline", "last_test_time": testResult.TestTime, "last_test_result": false, "failure_count": newFailureCount, } if err := database.UpdateNode(node.ID, updateData); err != nil { logger.Error("更新节点状态失败", map[string]interface{}{ "node_id": node.ID, "error": err.Error(), }) } } } } // 获取调度器状态 func (s *Scheduler) IsRunning() bool { s.mu.RLock() defer s.mu.RUnlock() return s.running }