Ver Fonte

修复节点重复问题

Taio_O há 3 meses atrás
pai
commit
feb3ccc4a6

+ 0 - 28
check_subscriptions.js

@@ -1,28 +0,0 @@
-const { Subscription } = require('./src/models');
-
-async function checkSubscriptions() {
-  try {
-    const subscriptions = await Subscription.findAll();
-    console.log('所有订阅配置:');
-    console.log('='.repeat(80));
-    
-    if (subscriptions.length === 0) {
-      console.log('数据库中没有订阅配置');
-      return;
-    }
-    
-    subscriptions.forEach(sub => {
-      console.log(`ID: ${sub.id}`);
-      console.log(`名称: ${sub.name}`);
-      console.log(`URL: ${sub.url}`);
-      console.log(`活跃状态: ${sub.isActive}`);
-      console.log(`创建时间: ${sub.createdAt}`);
-      console.log(`更新时间: ${sub.updatedAt}`);
-      console.log('-'.repeat(40));
-    });
-  } catch (error) {
-    console.error('检查订阅配置失败:', error.message);
-  }
-}
-
-checkSubscriptions(); 

+ 131 - 43
src/core/multiSubscriptionManager.js

@@ -3,11 +3,13 @@ const yaml = require('yaml');
 const logger = require('../utils/logger');
 const ClashParser = require('./clashParser');
 const { Node, Subscription } = require('../models');
+const sequelize = require('../config/database');
 
 class MultiSubscriptionManager {
   constructor() {
     this.updateInterval = parseInt(process.env.SUBSCRIPTION_UPDATE_INTERVAL) || 3600000; // 默认1小时
     this.updateTimer = null;
+    this.isUpdating = false; // 添加更新锁,防止并发更新
   }
 
   /**
@@ -138,9 +140,12 @@ class MultiSubscriptionManager {
   }
 
   /**
-   * 更新单个订阅的节点列表
+   * 更新单个订阅的节点列表 - 使用事务保护
    */
   async updateSubscriptionNodes(subscription) {
+    // 使用事务来确保数据一致性
+    const transaction = await sequelize.transaction();
+    
     try {
       logger.info('开始更新订阅节点', { 
         subscriptionId: subscription.id,
@@ -150,28 +155,47 @@ class MultiSubscriptionManager {
       // 获取订阅配置
       const config = await this.fetchSubscription(subscription);
       
-      // 解析节点
+      // 解析节点 - 在解析阶段就去重
       const parser = new ClashParser();
       const newNodes = [];
+      const parsedKeys = new Set(); // 用于在解析阶段去重
       
       for (const proxy of config.proxies) {
         const node = parser.parseProxy(proxy);
         if (node) {
+          const key = `${node.name}-${node.server}-${node.port}`;
+          
+          // 检查是否已经解析过相同的节点
+          if (parsedKeys.has(key)) {
+            logger.debug(`跳过重复解析的节点: ${node.name} (${node.server}:${node.port})`, {
+              subscriptionId: subscription.id
+            });
+            continue;
+          }
+          
+          parsedKeys.add(key);
           newNodes.push(node);
         }
       }
+      
+      logger.info(`解析完成,去重后节点数量: ${newNodes.length}`, {
+        subscriptionId: subscription.id,
+        originalCount: config.proxies.length
+      });
 
       if (newNodes.length === 0) {
         logger.warn('订阅中没有找到有效的节点', { subscriptionId: subscription.id });
+        await transaction.rollback();
         return { updated: 0, added: 0, removed: 0 };
       }
 
-      // 获取现有节点
+      // 在事务中获取现有节点
       const existingNodes = await Node.findAll({
         where: { 
           subscriptionId: subscription.id,
           isActive: true 
-        }
+        },
+        transaction
       });
 
       const existingNodeMap = new Map();
@@ -184,58 +208,79 @@ class MultiSubscriptionManager {
       let updated = 0;
       let removed = 0;
 
-      // 批量处理节点更新
-      const updatePromises = [];
-      const createPromises = [];
-      const deactivatePromises = [];
-
-      // 处理新节点
+      // 处理新节点 - 使用数据库级别的去重
+      const processedKeys = new Set();
+      
       for (const nodeData of newNodes) {
         const key = `${nodeData.name}-${nodeData.server}-${nodeData.port}`;
+        
+        // 检查是否已经处理过相同的节点
+        if (processedKeys.has(key)) {
+          logger.warn(`跳过重复节点: ${nodeData.name} (${nodeData.server}:${nodeData.port})`, {
+            subscriptionId: subscription.id
+          });
+          continue;
+        }
+        
+        processedKeys.add(key);
         const existingNode = existingNodeMap.get(key);
 
         if (existingNode) {
-          // 批量更新现有节点
-          updatePromises.push(
-            existingNode.update({
-              ...nodeData,
-              subscriptionId: subscription.id,
-              updatedAt: new Date()
-            })
-          );
+          // 更新现有节点
+          await existingNode.update({
+            ...nodeData,
+            subscriptionId: subscription.id,
+            updatedAt: new Date()
+          }, { transaction });
           updated++;
           existingNodeMap.delete(key);
         } else {
-          // 批量创建新节点
-          createPromises.push(
-            Node.create({
-              ...nodeData,
-              subscriptionId: subscription.id,
-              isActive: true,
-              status: 'offline'
-            })
-          );
+          // 检查数据库中是否已存在相同的节点(防止并发创建)
+          const existingDuplicate = await Node.findOne({
+            where: {
+              name: nodeData.name,
+              server: nodeData.server,
+              port: nodeData.port,
+              subscriptionId: subscription.id
+            },
+            transaction
+          });
+          
+          if (existingDuplicate) {
+            logger.warn(`数据库中已存在相同节点,跳过创建: ${nodeData.name}`, {
+              subscriptionId: subscription.id
+            });
+            continue;
+          }
+          
+          // 创建新节点
+          await Node.create({
+            ...nodeData,
+            subscriptionId: subscription.id,
+            isActive: true,
+            status: 'offline'
+          }, { transaction });
           added++;
         }
       }
 
-      // 批量标记不再存在的节点为非活跃
+      // 标记不再存在的节点为非活跃
       for (const [key, node] of existingNodeMap) {
-        deactivatePromises.push(node.update({ isActive: false }));
+        await node.update({ isActive: false }, { transaction });
         removed++;
       }
 
-      // 并行执行所有数据库操作
-      logger.info(`正在批量更新数据库,共${updatePromises.length + createPromises.length + deactivatePromises.length}个操作...`);
-      await Promise.all([...updatePromises, ...createPromises, ...deactivatePromises]);
-
       // 更新订阅的节点数量
+      const actualNodeCount = processedKeys.size;
       await subscription.update({
-        nodeCount: newNodes.length,
+        nodeCount: actualNodeCount,
         lastUpdateTime: new Date()
-      });
+      }, { transaction });
 
-      logger.info(`订阅节点更新完成 - 新增${added}个,更新${updated}个,移除${removed}个`, {
+      // 提交事务
+      await transaction.commit();
+
+      logger.info(`订阅节点更新完成 - 新增${added}个,更新${updated}个,移除${removed}个,实际节点数${actualNodeCount}`, {
         subscriptionId: subscription.id,
         subscriptionName: subscription.name
       });
@@ -243,14 +288,16 @@ class MultiSubscriptionManager {
       // 如果有新增或更新的节点,触发测速
       if (added > 0 || updated > 0) {
         logger.info('检测到节点更新,准备触发测速...');
-        // 延迟3秒后触发测速,确保数据库操作完成
+        // 延迟5秒后触发测速,确保数据库操作完成
         setTimeout(() => {
           this.triggerSpeedTest();
-        }, 3000);
+        }, 5000);
       }
 
-      return { updated: newNodes.length, added, updated, removed };
+      return { updated, added, removed, actualNodeCount };
     } catch (error) {
+      // 回滚事务
+      await transaction.rollback();
       logger.error('更新订阅节点失败', { 
         error: error.message,
         subscriptionId: subscription.id 
@@ -260,22 +307,48 @@ class MultiSubscriptionManager {
   }
 
   /**
-   * 更新所有订阅的节点
+   * 更新所有订阅的节点 - 添加锁机制防止并发
    */
   async updateAllSubscriptions() {
+    // 检查是否正在更新,如果是则跳过
+    if (this.isUpdating) {
+      logger.warn('订阅更新正在进行中,跳过本次更新');
+      return [];
+    }
+    
+    this.isUpdating = true;
+    
     try {
       const subscriptions = await this.getActiveSubscriptions();
       logger.info(`开始更新所有订阅,共${subscriptions.length}个活跃订阅`);
 
       const results = [];
+      
+      // 串行执行订阅更新,避免并发问题
       for (const subscription of subscriptions) {
         try {
+          logger.info(`开始更新订阅: ${subscription.name}`, {
+            subscriptionId: subscription.id
+          });
+          
           const result = await this.updateSubscriptionNodes(subscription);
           results.push({
             subscriptionId: subscription.id,
             subscriptionName: subscription.name,
             ...result
           });
+          
+          logger.info(`订阅更新完成: ${subscription.name}`, {
+            subscriptionId: subscription.id,
+            added: result.added,
+            updated: result.updated,
+            removed: result.removed,
+            actualNodeCount: result.actualNodeCount
+          });
+          
+          // 每个订阅更新后稍作延迟,避免过于频繁的数据库操作
+          await new Promise(resolve => setTimeout(resolve, 2000));
+          
         } catch (error) {
           logger.error('更新订阅失败', {
             subscriptionId: subscription.id,
@@ -297,16 +370,19 @@ class MultiSubscriptionManager {
 
       if (hasUpdates) {
         logger.info('检测到节点更新,准备触发测速...');
-        // 延迟3秒后触发测速,确保数据库操作完成
+        // 延迟5秒后触发测速,确保数据库操作完成
         setTimeout(() => {
           this.triggerSpeedTest();
-        }, 3000);
+        }, 5000);
       }
 
       return results;
     } catch (error) {
       logger.error('更新所有订阅失败', { error: error.message });
       throw error;
+    } finally {
+      // 释放锁
+      this.isUpdating = false;
     }
   }
 
@@ -393,6 +469,7 @@ class MultiSubscriptionManager {
   convertShadowsocksToClash(shadowsocksContent) {
     const lines = shadowsocksContent.split('\n').filter(line => line.trim());
     const proxies = [];
+    const proxyKeys = new Set(); // 用于去重
     
     logger.info(`开始解析Shadowsocks内容,共${lines.length}行`);
     
@@ -400,6 +477,7 @@ class MultiSubscriptionManager {
     let vmessCount = 0;
     let trojanCount = 0;
     let errorCount = 0;
+    let duplicateCount = 0;
     
     for (const line of lines) {
       if (line.startsWith('ss://')) {
@@ -407,6 +485,16 @@ class MultiSubscriptionManager {
         try {
           const proxy = this.parseShadowsocksUrl(line);
           if (proxy) {
+            const key = `${proxy.name}-${proxy.server}-${proxy.port}`;
+            
+            // 检查是否重复
+            if (proxyKeys.has(key)) {
+              duplicateCount++;
+              logger.debug(`跳过重复的Shadowsocks节点: ${proxy.name} (${proxy.server}:${proxy.port})`);
+              continue;
+            }
+            
+            proxyKeys.add(key);
             proxies.push(proxy);
           } else {
             errorCount++;
@@ -421,7 +509,7 @@ class MultiSubscriptionManager {
       }
     }
     
-    logger.info(`解析完成 - SS: ${ssCount}个, VMess: ${vmessCount}个, Trojan: ${trojanCount}个, 成功解析: ${proxies.length}个, 失败: ${errorCount}个`);
+    logger.info(`解析完成 - SS: ${ssCount}个, VMess: ${vmessCount}个, Trojan: ${trojanCount}个, 成功解析: ${proxies.length}个, 重复跳过: ${duplicateCount}个, 失败: ${errorCount}个`);
     
     if (proxies.length === 0) {
       logger.warn('没有找到任何有效的代理节点');

+ 0 - 2
src/core/speedTester.js

@@ -501,8 +501,6 @@ class SpeedTester {
     };
 
     try {
-      logger.info(`开始测试节点连通性: ${node.name} (${node.type})`);
-
       // 验证节点配置的有效性
       const validationResult = this.validateNode(node);
       if (!validationResult.isValid) {

+ 0 - 94
test_auto_speedtest.js

@@ -1,94 +0,0 @@
-const { Subscription, Node } = require('./src/models');
-const MultiSubscriptionManager = require('./src/core/multiSubscriptionManager');
-
-async function testAutoSpeedTest() {
-  try {
-    console.log('=== 自动测速功能测试 ===');
-    
-    // 创建测试订阅
-    const testSubscription = await Subscription.create({
-      name: '自动测速测试订阅',
-      url: 'http://so.xfxssr.me/api/v1/client/subscribe?token=7854d59f38ac51700730b9e782c5160c',
-      description: '用于测试自动测速功能的订阅',
-      speedTestConfig: {
-        testCount: 3,
-        timeout: 10000,
-        testUrls: ['https://www.google.com'],
-        speedTestEnabled: true
-      },
-      notifyConfig: {
-        enabled: true,
-        notifyOnSpeedTest: true,
-        notifyOnNodeUpdate: true,
-        webhookUrl: '',
-        emailConfig: null
-      }
-    });
-    
-    console.log('✅ 创建测试订阅成功');
-
-    // 创建多订阅管理器
-    const manager = new MultiSubscriptionManager();
-    
-    // 设置测速触发器(模拟)
-    let speedTestTriggered = false;
-    manager.setSpeedTestTrigger(() => {
-      console.log('🚀 测速触发器被调用!');
-      speedTestTriggered = true;
-    });
-
-    // 模拟一些测试节点
-    console.log('\n2. 创建测试节点...');
-    const testNodes = [
-      { name: '自动测速节点1', type: 'ss', server: 'auto1.example.com', port: 443, method: 'aes-256-gcm', password: 'password1', subscriptionId: testSubscription.id },
-      { name: '自动测速节点2', type: 'ss', server: 'auto2.example.com', port: 443, method: 'aes-256-gcm', password: 'password2', subscriptionId: testSubscription.id },
-      { name: '自动测速节点3', type: 'ss', server: 'auto3.example.com', port: 443, method: 'aes-256-gcm', password: 'password3', subscriptionId: testSubscription.id }
-    ];
-
-    for (const nodeData of testNodes) {
-      await Node.create({
-        ...nodeData,
-        isActive: true,
-        status: 'offline'
-      });
-    }
-
-    console.log(`✅ 创建了 ${testNodes.length} 个测试节点`);
-
-    // 更新订阅的节点数量
-    await testSubscription.update({
-      nodeCount: testNodes.length,
-      lastUpdateTime: new Date()
-    });
-
-    // 直接测试触发器功能
-    console.log('\n3. 直接测试测速触发器...');
-    manager.triggerSpeedTest();
-    
-    // 等待触发器执行
-    await new Promise(resolve => setTimeout(resolve, 1000));
-
-    // 检查触发器是否被调用
-    console.log('\n4. 检查触发器状态...');
-
-    if (speedTestTriggered) {
-      console.log('✅ 自动测速功能正常!');
-    } else {
-      console.log('❌ 自动测速功能未触发');
-    }
-
-    // 清理测试数据
-    console.log('\n5. 清理测试数据...');
-    await testSubscription.destroy();
-    console.log('✅ 测试数据清理完成');
-
-    console.log('\n=== 自动测速功能测试完成 ===');
-    
-  } catch (error) {
-    console.error('❌ 测试失败:', error.message);
-    console.error(error.stack);
-  }
-}
-
-// 运行测试
-testAutoSpeedTest(); 

+ 0 - 97
test_performance.js

@@ -1,97 +0,0 @@
-const { Subscription, Node } = require('./src/models');
-const MultiSubscriptionManager = require('./src/core/multiSubscriptionManager');
-
-async function testPerformance() {
-  try {
-    console.log('=== 性能测试 ===');
-    
-    // 创建测试订阅
-    const testSubscription = await Subscription.create({
-      name: '性能测试订阅',
-      url: 'http://so.xfxssr.me/api/v1/client/subscribe?token=7854d59f38ac51700730b9e782c5160c',
-      description: '用于测试性能的订阅',
-      speedTestConfig: {
-        testCount: 3,
-        timeout: 10000,
-        testUrls: ['https://www.google.com'],
-        speedTestEnabled: true
-      },
-      notifyConfig: {
-        enabled: true,
-        notifyOnSpeedTest: true,
-        notifyOnNodeUpdate: true,
-        webhookUrl: '',
-        emailConfig: null
-      }
-    });
-    
-    console.log('✅ 创建测试订阅成功');
-
-    // 创建大量测试节点
-    const nodeCount = 50;
-    console.log(`\n创建 ${nodeCount} 个测试节点...`);
-    
-    const testNodes = [];
-    for (let i = 1; i <= nodeCount; i++) {
-      testNodes.push({
-        name: `性能测试节点${i}`,
-        type: 'ss',
-        server: `server${i}.example.com`,
-        port: 443 + (i % 100),
-        method: 'aes-256-gcm',
-        password: `password${i}`,
-        subscriptionId: testSubscription.id,
-        isActive: true,
-        status: 'offline'
-      });
-    }
-
-    // 批量创建节点
-    const startTime = Date.now();
-    await Node.bulkCreate(testNodes);
-    const createTime = Date.now() - startTime;
-    
-    console.log(`✅ 批量创建 ${nodeCount} 个节点完成,耗时: ${createTime}ms`);
-
-    // 更新订阅节点数量
-    await testSubscription.update({
-      nodeCount: nodeCount,
-      lastUpdateTime: new Date()
-    });
-
-    // 测试多订阅管理器性能
-    console.log('\n测试多订阅管理器性能...');
-    const manager = new MultiSubscriptionManager();
-    
-    const managerStartTime = Date.now();
-    const activeSubscriptions = await manager.getActiveSubscriptions();
-    const managerTime = Date.now() - managerStartTime;
-    
-    console.log(`✅ 获取活跃订阅完成,耗时: ${managerTime}ms`);
-    console.log(`  活跃订阅数量: ${activeSubscriptions.length}`);
-
-    // 测试状态获取性能
-    const statusStartTime = Date.now();
-    const status = await manager.getStatus();
-    const statusTime = Date.now() - statusStartTime;
-    
-    console.log(`✅ 获取状态完成,耗时: ${statusTime}ms`);
-
-    // 清理测试数据
-    console.log('\n清理测试数据...');
-    await testSubscription.destroy();
-    console.log('✅ 测试数据清理完成');
-
-    console.log('\n=== 性能测试完成 ===');
-    console.log(`总结:`);
-    console.log(`  - 批量创建 ${nodeCount} 个节点: ${createTime}ms`);
-    console.log(`  - 获取活跃订阅: ${managerTime}ms`);
-    console.log(`  - 获取状态: ${statusTime}ms`);
-    
-  } catch (error) {
-    console.error('❌ 性能测试失败:', error.message);
-  }
-}
-
-// 运行测试
-testPerformance();