const cron = require('node-cron'); const logger = require('../utils/logger'); const SpeedTester = require('./speedTester'); const TelegramNotifier = require('./notifier'); const SubscriptionManager = require('./subscriptionManager'); const { Node, TestResult } = require('../models'); class Scheduler { constructor() { this.speedTester = new SpeedTester(); this.notifier = new TelegramNotifier(); this.subscriptionManager = new SubscriptionManager(); this.isRunning = false; this.testInterval = parseInt(process.env.SPEED_TEST_INTERVAL) || 15; // 分钟 this.failureThreshold = parseInt(process.env.NOTIFICATION_FAILURE_THRESHOLD) || 3; this.recoveryThreshold = parseInt(process.env.NOTIFICATION_RECOVERY_THRESHOLD) || 2; } /** * 启动调度器 */ start() { logger.info('启动定时任务调度器 - 定时测速已关闭'); // 暂时关闭定时测速 // const cronExpression = `*/${this.testInterval} * * * *`; // this.speedTestJob = cron.schedule(cronExpression, async () => { // await this.runSpeedTest(); // }, { // scheduled: true, // timezone: 'Asia/Shanghai' // }); // 每小时重试失败的通知 this.notificationRetryJob = cron.schedule('0 * * * *', async () => { await this.notifier.retryFailedNotifications(); }, { scheduled: true, timezone: 'Asia/Shanghai' }); // 每天凌晨2点清理旧数据 this.cleanupJob = cron.schedule('0 2 * * *', async () => { await this.cleanupOldData(); }, { scheduled: true, timezone: 'Asia/Shanghai' }); // 启动订阅自动更新 this.subscriptionManager.startAutoUpdate(); logger.info('定时任务调度器启动成功'); } /** * 停止调度器 */ stop() { if (this.speedTestJob) { this.speedTestJob.stop(); } if (this.notificationRetryJob) { this.notificationRetryJob.stop(); } if (this.cleanupJob) { this.cleanupJob.stop(); } this.subscriptionManager.stopAutoUpdate(); logger.info('定时任务调度器已停止'); } /** * 运行速度测试 */ async runSpeedTest() { if (this.isRunning) { logger.warn('上一次测试还在运行中,跳过本次测试'); return; } this.isRunning = true; const startTime = Date.now(); try { // 获取所有启用的节点 const nodes = await Node.findAll({ where: { isActive: true }, order: [['group', 'ASC'], ['name', 'ASC']] }); if (nodes.length === 0) { logger.warn('没有找到启用的节点'); return; } logger.info(`找到 ${nodes.length} 个节点,开始测试`); // 批量测试节点 const testResults = await this.speedTester.testNodes(nodes); // 处理测试结果 await this.processTestResults(nodes, testResults); // 生成并发送摘要 await this.sendTestSummary(nodes, testResults); const duration = Date.now() - startTime; logger.info(`定时测速完成 - ${nodes.length}个节点,${testResults.filter(r => r.isSuccess).length}个成功,耗时${duration}ms`); } catch (error) { logger.error('定时速度测试失败', { error: error.message }); // 发送系统错误通知 await this.notifier.sendSystemNotification( '⚠️ 系统错误通知', `定时速度测试过程中发生错误:\n\n${error.message}`, { error: error.message, timestamp: new Date().toISOString() } ); } finally { this.isRunning = false; } } /** * 处理测试结果 */ async processTestResults(nodes, testResults) { const nodeMap = new Map(nodes.map(node => [node.id, node])); const resultMap = new Map(testResults.map(result => [result.nodeId, result])); for (const node of nodes) { const testResult = resultMap.get(node.id); if (!testResult) continue; const previousStatus = node.status; const previousFailureCount = node.failureCount; // 更新节点状态 if (testResult.isSuccess) { // 测试成功 if (node.status === 'offline') { // 节点从离线恢复 if (previousFailureCount >= this.recoveryThreshold) { await this.notifier.sendRecoveryNotification(node, testResult); } } await node.update({ status: 'online', lastTestTime: testResult.testTime, lastTestResult: true, failureCount: 0, averageLatency: testResult.latency, averageSpeed: testResult.downloadSpeed }); } else { // 测试失败 const newFailureCount = previousFailureCount + 1; await node.update({ status: 'offline', lastTestTime: testResult.testTime, lastTestResult: false, failureCount: newFailureCount }); // 检查是否需要发送故障通知 if (newFailureCount === this.failureThreshold && previousStatus === 'online') { await this.notifier.sendFailureNotification(node, testResult); } } } } /** * 发送测试摘要 */ async sendTestSummary(nodes, testResults) { const totalNodes = nodes.length; const onlineNodes = testResults.filter(r => r.isSuccess).length; const offlineNodes = totalNodes - onlineNodes; const successRate = totalNodes > 0 ? Math.round((onlineNodes / totalNodes) * 100) : 0; // 计算平均延迟和速度 const successfulResults = testResults.filter(r => r.isSuccess && r.latency); const averageLatency = successfulResults.length > 0 ? Math.round(successfulResults.reduce((sum, r) => sum + r.latency, 0) / successfulResults.length) : null; const speedResults = testResults.filter(r => r.isSuccess && r.downloadSpeed); const averageSpeed = speedResults.length > 0 ? Math.round((speedResults.reduce((sum, r) => sum + r.downloadSpeed, 0) / speedResults.length) * 100) / 100 : null; // 获取故障节点列表 const failedNodes = nodes.filter(node => { const result = testResults.find(r => r.nodeId === node.id); return result && !result.isSuccess; }).slice(0, 10); // 最多显示10个 // 获取最佳节点列表 const bestNodes = testResults .filter(r => r.isSuccess && r.latency) .sort((a, b) => a.latency - b.latency) .slice(0, 5) .map(result => { const node = nodes.find(n => n.id === result.nodeId); return { name: node ? node.name : 'Unknown', latency: result.latency, group: node ? node.group : 'Unknown' }; }); // 按分组统计 const groupStats = {}; nodes.forEach(node => { const group = node.group || '默认'; if (!groupStats[group]) { groupStats[group] = { total: 0, online: 0, offline: 0 }; } groupStats[group].total++; const result = testResults.find(r => r.nodeId === node.id); if (result && result.isSuccess) { groupStats[group].online++; } else { groupStats[group].offline++; } }); const summary = { totalNodes, onlineNodes, offlineNodes, successRate, averageLatency, averageSpeed, failedNodes, bestNodes, groupStats, testTime: new Date().toLocaleString('zh-CN') }; // 每次测速完成后都发送详细的测试总结报告 await this.notifier.sendSummaryNotification(summary); logger.info('测试摘要生成完成', summary); } /** * 清理旧数据 */ async cleanupOldData() { try { logger.info('开始清理旧数据'); // 删除30天前的测试结果 const thirtyDaysAgo = new Date(); thirtyDaysAgo.setDate(thirtyDaysAgo.getDate() - 30); const deletedTestResults = await TestResult.destroy({ where: { testTime: { [require('sequelize').Op.lt]: thirtyDaysAgo } } }); // 删除7天前的通知记录 const sevenDaysAgo = new Date(); sevenDaysAgo.setDate(sevenDaysAgo.getDate() - 7); const deletedNotifications = await require('../models').Notification.destroy({ where: { createdAt: { [require('sequelize').Op.lt]: sevenDaysAgo }, isSent: true } }); logger.info('旧数据清理完成', { deletedTestResults, deletedNotifications }); } catch (error) { logger.error('清理旧数据失败', { error: error.message }); } } /** * 手动触发测试 */ async triggerManualTest() { if (this.isRunning) { throw new Error('测试正在进行中,请稍后再试'); } logger.info('手动触发速度测试'); await this.runSpeedTest(); } /** * 获取调度器状态 */ getStatus() { return { isRunning: this.isRunning, testInterval: this.testInterval, failureThreshold: this.failureThreshold, recoveryThreshold: this.recoveryThreshold, jobs: { speedTest: this.speedTestJob ? 'running' : 'stopped', notificationRetry: this.notificationRetryJob ? 'running' : 'stopped', cleanup: this.cleanupJob ? 'running' : 'stopped' } }; } } module.exports = Scheduler;