123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325 |
- const cron = require('node-cron');
- const logger = require('../utils/logger');
- const SpeedTester = require('./speedTester');
- const TelegramNotifier = require('./notifier');
- const SubscriptionManager = require('./subscriptionManager');
- const { Node, TestResult } = require('../models');
- class Scheduler {
- constructor() {
- this.speedTester = new SpeedTester();
- this.notifier = new TelegramNotifier();
- this.subscriptionManager = new SubscriptionManager();
- this.isRunning = false;
- this.testInterval = parseInt(process.env.SPEED_TEST_INTERVAL) || 15; // 分钟
- this.failureThreshold = parseInt(process.env.NOTIFICATION_FAILURE_THRESHOLD) || 3;
- this.recoveryThreshold = parseInt(process.env.NOTIFICATION_RECOVERY_THRESHOLD) || 2;
- }
- /**
- * 启动调度器
- */
- start() {
- logger.info('启动定时任务调度器 - 定时测速已关闭');
- // 暂时关闭定时测速
- // const cronExpression = `*/${this.testInterval} * * * *`;
- // this.speedTestJob = cron.schedule(cronExpression, async () => {
- // await this.runSpeedTest();
- // }, {
- // scheduled: true,
- // timezone: 'Asia/Shanghai'
- // });
- // 每小时重试失败的通知
- this.notificationRetryJob = cron.schedule('0 * * * *', async () => {
- await this.notifier.retryFailedNotifications();
- }, {
- scheduled: true,
- timezone: 'Asia/Shanghai'
- });
- // 每天凌晨2点清理旧数据
- this.cleanupJob = cron.schedule('0 2 * * *', async () => {
- await this.cleanupOldData();
- }, {
- scheduled: true,
- timezone: 'Asia/Shanghai'
- });
- // 启动订阅自动更新
- this.subscriptionManager.startAutoUpdate();
- logger.info('定时任务调度器启动成功');
- }
- /**
- * 停止调度器
- */
- stop() {
- if (this.speedTestJob) {
- this.speedTestJob.stop();
- }
- if (this.notificationRetryJob) {
- this.notificationRetryJob.stop();
- }
- if (this.cleanupJob) {
- this.cleanupJob.stop();
- }
- this.subscriptionManager.stopAutoUpdate();
- logger.info('定时任务调度器已停止');
- }
- /**
- * 运行速度测试
- */
- async runSpeedTest() {
- if (this.isRunning) {
- logger.warn('上一次测试还在运行中,跳过本次测试');
- return;
- }
- this.isRunning = true;
- const startTime = Date.now();
- try {
- // 获取所有启用的节点
- const nodes = await Node.findAll({
- where: { isActive: true },
- order: [['group', 'ASC'], ['name', 'ASC']]
- });
- if (nodes.length === 0) {
- logger.warn('没有找到启用的节点');
- return;
- }
- logger.info(`找到 ${nodes.length} 个节点,开始测试`);
- // 批量测试节点
- const testResults = await this.speedTester.testNodes(nodes);
- // 处理测试结果
- await this.processTestResults(nodes, testResults);
- // 生成并发送摘要
- await this.sendTestSummary(nodes, testResults);
- const duration = Date.now() - startTime;
- logger.info(`定时测速完成 - ${nodes.length}个节点,${testResults.filter(r => r.isSuccess).length}个成功,耗时${duration}ms`);
- } catch (error) {
- logger.error('定时速度测试失败', { error: error.message });
-
- // 发送系统错误通知
- await this.notifier.sendSystemNotification(
- '⚠️ 系统错误通知',
- `定时速度测试过程中发生错误:\n\n${error.message}`,
- { error: error.message, timestamp: new Date().toISOString() }
- );
- } finally {
- this.isRunning = false;
- }
- }
- /**
- * 处理测试结果
- */
- async processTestResults(nodes, testResults) {
- const nodeMap = new Map(nodes.map(node => [node.id, node]));
- const resultMap = new Map(testResults.map(result => [result.nodeId, result]));
- for (const node of nodes) {
- const testResult = resultMap.get(node.id);
- if (!testResult) continue;
- const previousStatus = node.status;
- const previousFailureCount = node.failureCount;
- // 更新节点状态
- if (testResult.isSuccess) {
- // 测试成功
- if (node.status === 'offline') {
- // 节点从离线恢复
- if (previousFailureCount >= this.recoveryThreshold) {
- await this.notifier.sendRecoveryNotification(node, testResult);
- }
- }
- await node.update({
- status: 'online',
- lastTestTime: testResult.testTime,
- lastTestResult: true,
- failureCount: 0,
- averageLatency: testResult.latency,
- averageSpeed: testResult.downloadSpeed
- });
- } else {
- // 测试失败
- const newFailureCount = previousFailureCount + 1;
-
- await node.update({
- status: 'offline',
- lastTestTime: testResult.testTime,
- lastTestResult: false,
- failureCount: newFailureCount
- });
- // 检查是否需要发送故障通知
- if (newFailureCount === this.failureThreshold && previousStatus === 'online') {
- await this.notifier.sendFailureNotification(node, testResult);
- }
- }
- }
- }
- /**
- * 发送测试摘要
- */
- async sendTestSummary(nodes, testResults) {
- const totalNodes = nodes.length;
- const onlineNodes = testResults.filter(r => r.isSuccess).length;
- const offlineNodes = totalNodes - onlineNodes;
- const successRate = totalNodes > 0 ? Math.round((onlineNodes / totalNodes) * 100) : 0;
- // 计算平均延迟和速度
- const successfulResults = testResults.filter(r => r.isSuccess && r.latency);
- const averageLatency = successfulResults.length > 0
- ? Math.round(successfulResults.reduce((sum, r) => sum + r.latency, 0) / successfulResults.length)
- : null;
- const speedResults = testResults.filter(r => r.isSuccess && r.downloadSpeed);
- const averageSpeed = speedResults.length > 0
- ? Math.round((speedResults.reduce((sum, r) => sum + r.downloadSpeed, 0) / speedResults.length) * 100) / 100
- : null;
- // 获取故障节点列表
- const failedNodes = nodes.filter(node => {
- const result = testResults.find(r => r.nodeId === node.id);
- return result && !result.isSuccess;
- }).slice(0, 10); // 最多显示10个
- // 获取最佳节点列表
- const bestNodes = testResults
- .filter(r => r.isSuccess && r.latency)
- .sort((a, b) => a.latency - b.latency)
- .slice(0, 5)
- .map(result => {
- const node = nodes.find(n => n.id === result.nodeId);
- return {
- name: node ? node.name : 'Unknown',
- latency: result.latency,
- group: node ? node.group : 'Unknown'
- };
- });
- // 按分组统计
- const groupStats = {};
- nodes.forEach(node => {
- const group = node.group || '默认';
- if (!groupStats[group]) {
- groupStats[group] = { total: 0, online: 0, offline: 0 };
- }
- groupStats[group].total++;
-
- const result = testResults.find(r => r.nodeId === node.id);
- if (result && result.isSuccess) {
- groupStats[group].online++;
- } else {
- groupStats[group].offline++;
- }
- });
- const summary = {
- totalNodes,
- onlineNodes,
- offlineNodes,
- successRate,
- averageLatency,
- averageSpeed,
- failedNodes,
- bestNodes,
- groupStats,
- testTime: new Date().toLocaleString('zh-CN')
- };
- // 每次测速完成后都发送详细的测试总结报告
- await this.notifier.sendSummaryNotification(summary);
- logger.info('测试摘要生成完成', summary);
- }
- /**
- * 清理旧数据
- */
- async cleanupOldData() {
- try {
- logger.info('开始清理旧数据');
- // 删除30天前的测试结果
- const thirtyDaysAgo = new Date();
- thirtyDaysAgo.setDate(thirtyDaysAgo.getDate() - 30);
- const deletedTestResults = await TestResult.destroy({
- where: {
- testTime: {
- [require('sequelize').Op.lt]: thirtyDaysAgo
- }
- }
- });
- // 删除7天前的通知记录
- const sevenDaysAgo = new Date();
- sevenDaysAgo.setDate(sevenDaysAgo.getDate() - 7);
- const deletedNotifications = await require('../models').Notification.destroy({
- where: {
- createdAt: {
- [require('sequelize').Op.lt]: sevenDaysAgo
- },
- isSent: true
- }
- });
- logger.info('旧数据清理完成', {
- deletedTestResults,
- deletedNotifications
- });
- } catch (error) {
- logger.error('清理旧数据失败', { error: error.message });
- }
- }
- /**
- * 手动触发测试
- */
- async triggerManualTest() {
- if (this.isRunning) {
- throw new Error('测试正在进行中,请稍后再试');
- }
- logger.info('手动触发速度测试');
- await this.runSpeedTest();
- }
- /**
- * 获取调度器状态
- */
- getStatus() {
- return {
- isRunning: this.isRunning,
- testInterval: this.testInterval,
- failureThreshold: this.failureThreshold,
- recoveryThreshold: this.recoveryThreshold,
- jobs: {
- speedTest: this.speedTestJob ? 'running' : 'stopped',
- notificationRetry: this.notificationRetryJob ? 'running' : 'stopped',
- cleanup: this.cleanupJob ? 'running' : 'stopped'
- }
- };
- }
- }
- module.exports = Scheduler;
|