package core import ( "context" "crypto/tls" "fmt" "io" "net" "net/http" "net/url" "regexp" "strconv" "strings" "sync" "time" "clash-speed-test/internal/config" "clash-speed-test/internal/database" applogger "clash-speed-test/internal/logger" ) type SpeedTester struct { config *config.Config testURLs []string timeout time.Duration concurrency int httpClient *http.Client } type SpeedTestResult struct { Latency int `json:"latency"` DownloadSpeed float64 `json:"download_speed"` // Mbps UploadSpeed float64 `json:"upload_speed"` // Mbps IPAddress string `json:"ip_address"` Location string `json:"location"` Success bool `json:"success"` ErrorMessage string `json:"error_message"` } func NewSpeedTester(cfg *config.Config) *SpeedTester { // 创建HTTP客户端 httpClient := &http.Client{ Timeout: cfg.Speed.Timeout, Transport: &http.Transport{ TLSClientConfig: &tls.Config{ InsecureSkipVerify: false, }, DisableKeepAlives: true, MaxIdleConns: 100, IdleConnTimeout: 30 * time.Second, }, } return &SpeedTester{ config: cfg, testURLs: cfg.Speed.TestURLs, timeout: cfg.Speed.Timeout, concurrency: cfg.Speed.Concurrency, httpClient: httpClient, } } // 测试单个节点 func (st *SpeedTester) TestNode(node database.Node) (*database.TestResult, error) { startTime := time.Now() result := &database.TestResult{ NodeID: node.ID, TestTime: time.Now(), IsSuccess: false, } applogger.Info("开始测试节点", map[string]interface{}{ "name": node.Name, "type": node.Type, "server": node.Server, "port": node.Port, }) // 验证节点配置的有效性 if err := st.validateNode(node); err != nil { result.ErrorMessage = fmt.Sprintf("节点配置无效: %v", err) applogger.Warn("节点配置无效", map[string]interface{}{ "node": node.Name, "error": err.Error(), }) return result, nil } // 测试连接性和速度 speedResult, err := st.testNodeSpeed(node) if err != nil { result.ErrorMessage = fmt.Sprintf("测速失败: %v", err) applogger.Warn("节点测试失败", map[string]interface{}{ "node": node.Name, "error": err.Error(), }) } else if speedResult.Success { result.IsSuccess = true result.Latency = &speedResult.Latency result.IPAddress = speedResult.IPAddress result.Location = speedResult.Location result.TestURL = st.testURLs[0] // 如果有速度数据,保存到扩展字段 if speedResult.DownloadSpeed > 0 { downloadSpeed := int(speedResult.DownloadSpeed) result.DownloadSpeed = &downloadSpeed } if speedResult.UploadSpeed > 0 { uploadSpeed := int(speedResult.UploadSpeed) result.UploadSpeed = &uploadSpeed } applogger.Info("节点测试成功", map[string]interface{}{ "node": node.Name, "latency": speedResult.Latency, "download_speed": speedResult.DownloadSpeed, "upload_speed": speedResult.UploadSpeed, "ip": speedResult.IPAddress, "location": speedResult.Location, }) } else { result.ErrorMessage = speedResult.ErrorMessage applogger.Warn("节点测试失败", map[string]interface{}{ "node": node.Name, "error": speedResult.ErrorMessage, }) } // 计算测试时长 duration := int(time.Since(startTime).Milliseconds()) result.TestDuration = &duration // 保存测试结果 if err := database.SaveTestResult(result); err != nil { applogger.Error("保存测试结果失败", map[string]interface{}{ "node": node.Name, "error": err.Error(), }) } return result, nil } // 验证节点配置的有效性 func (st *SpeedTester) validateNode(node database.Node) error { // 检查必需的字段 if node.Name == "" { return fmt.Errorf("节点名称不能为空") } if node.Type == "" { return fmt.Errorf("节点类型不能为空") } if node.Server == "" { return fmt.Errorf("服务器地址不能为空") } if node.Port <= 0 || node.Port > 65535 { return fmt.Errorf("端口号无效: %d", node.Port) } // 验证server字段是否为有效的域名或IP地址 server := strings.TrimSpace(node.Server) // 只检查明显无效的情况,而不是过于严格的验证 // 检查是否包含中文字符(通常表示这是描述而不是有效地址) for _, r := range server { if r >= 0x4e00 && r <= 0x9fff { return fmt.Errorf("服务器地址包含中文字符,无效: %s", server) } } // 检查是否是URL格式(不应该作为server地址) if strings.HasPrefix(server, "http://") || strings.HasPrefix(server, "https://") { return fmt.Errorf("服务器地址不能是URL格式: %s", server) } // 检查是否包含明显的无效字符(如空格、特殊符号等) invalidChars := regexp.MustCompile(`[<>:"\\|?*]`) if invalidChars.MatchString(server) { return fmt.Errorf("服务器地址包含无效字符: %s", server) } // 检查是否为空或只包含空白字符 if server == "" { return fmt.Errorf("服务器地址不能为空") } // 对于SS/SSR节点,检查密码 if (node.Type == "ss" || node.Type == "ssr") && node.Password == "" { return fmt.Errorf("SS/SSR节点必须设置密码") } // 对于Vmess节点,检查UUID if node.Type == "vmess" && node.UUID == "" { return fmt.Errorf("Vmess节点必须设置UUID") } // 对于Trojan节点,检查密码 if node.Type == "trojan" && node.Password == "" { return fmt.Errorf("Trojan节点必须设置密码") } return nil } // 批量测试节点 func (st *SpeedTester) TestNodes(nodes []database.Node) []*database.TestResult { var results []*database.TestResult var mu sync.Mutex var wg sync.WaitGroup // 创建信号量控制并发数 semaphore := make(chan struct{}, st.concurrency) for _, node := range nodes { wg.Add(1) go func(n database.Node) { defer wg.Done() // 获取信号量 semaphore <- struct{}{} defer func() { <-semaphore }() result, err := st.TestNode(n) if err != nil { applogger.Error("节点测试异常", map[string]interface{}{ "node": n.Name, "error": err.Error(), }) return } mu.Lock() results = append(results, result) mu.Unlock() }(node) } wg.Wait() return results } // 测试节点速度 func (st *SpeedTester) testNodeSpeed(node database.Node) (*SpeedTestResult, error) { result := &SpeedTestResult{ Success: false, } // 根据代理类型选择测试策略 switch node.Type { case "http", "https": return st.testHTTPProxy(node) case "socks5": return st.testSOCKS5Proxy(node) case "ss", "ssr", "vmess", "trojan": // 对于高级代理,尝试多种测试方法 return st.testAdvancedProxy(node) default: return result, fmt.Errorf("暂不支持代理类型: %s", node.Type) } } // 测试HTTP代理 func (st *SpeedTester) testHTTPProxy(node database.Node) (*SpeedTestResult, error) { result := &SpeedTestResult{} // 构建代理URL proxyURL := fmt.Sprintf("http://%s:%d", node.Server, node.Port) if node.Username != "" && node.Password != "" { proxyURL = fmt.Sprintf("http://%s:%s@%s:%d", node.Username, node.Password, node.Server, node.Port) } // 创建代理客户端 proxyURLParsed, err := url.Parse(proxyURL) if err != nil { return result, fmt.Errorf("解析代理URL失败: %w", err) } proxyClient := &http.Client{ Timeout: st.timeout, Transport: &http.Transport{ Proxy: http.ProxyURL(proxyURLParsed), TLSClientConfig: &tls.Config{ InsecureSkipVerify: false, }, DisableKeepAlives: true, }, } // 测试延迟 latency, ipAddress, location, err := st.testLatency(proxyClient) if err != nil { return result, fmt.Errorf("延迟测试失败: %w", err) } result.Latency = latency result.IPAddress = ipAddress result.Location = location // 测试下载速度 downloadSpeed, err := st.testDownloadSpeed(proxyClient) if err != nil { applogger.Warn("下载速度测试失败", map[string]interface{}{ "node": node.Name, "error": err.Error(), }) } else { result.DownloadSpeed = downloadSpeed } // 测试上传速度 uploadSpeed, err := st.testUploadSpeed(proxyClient) if err != nil { applogger.Warn("上传速度测试失败", map[string]interface{}{ "node": node.Name, "error": err.Error(), }) } else { result.UploadSpeed = uploadSpeed } result.Success = true return result, nil } // 测试SOCKS5代理 func (st *SpeedTester) testSOCKS5Proxy(node database.Node) (*SpeedTestResult, error) { result := &SpeedTestResult{} // 构建SOCKS5代理URL proxyURL := fmt.Sprintf("socks5://%s:%d", node.Server, node.Port) if node.Username != "" && node.Password != "" { proxyURL = fmt.Sprintf("socks5://%s:%s@%s:%d", node.Username, node.Password, node.Server, node.Port) } // 创建代理客户端 proxyURLParsed, err := url.Parse(proxyURL) if err != nil { return result, fmt.Errorf("解析SOCKS5代理URL失败: %w", err) } proxyClient := &http.Client{ Timeout: st.timeout, Transport: &http.Transport{ Proxy: http.ProxyURL(proxyURLParsed), TLSClientConfig: &tls.Config{ InsecureSkipVerify: false, }, DisableKeepAlives: true, }, } // 测试延迟 latency, ipAddress, location, err := st.testLatency(proxyClient) if err != nil { return result, fmt.Errorf("延迟测试失败: %w", err) } result.Latency = latency result.IPAddress = ipAddress result.Location = location // 测试下载速度 downloadSpeed, err := st.testDownloadSpeed(proxyClient) if err != nil { applogger.Warn("下载速度测试失败", map[string]interface{}{ "node": node.Name, "error": err.Error(), }) } else { result.DownloadSpeed = downloadSpeed } // 测试上传速度 uploadSpeed, err := st.testUploadSpeed(proxyClient) if err != nil { applogger.Warn("上传速度测试失败", map[string]interface{}{ "node": node.Name, "error": err.Error(), }) } else { result.UploadSpeed = uploadSpeed } result.Success = true return result, nil } // 测试高级代理(Shadowsocks、Vmess等) func (st *SpeedTester) testAdvancedProxy(node database.Node) (*SpeedTestResult, error) { result := &SpeedTestResult{} // 对于高级代理,我们需要通过本地Clash代理来测试 // 但首先需要验证本地代理是否可用,以及是否真的配置了这个节点 // 对于高级代理,我们尝试通过本地代理端口进行测试 // 如果本地代理不可用,会在测试过程中失败,这是正常的 // 尝试常见的本地代理端口 localProxyPorts := []int{7890, 7891, 1080, 8080, 8118} for _, port := range localProxyPorts { proxyURL := fmt.Sprintf("http://127.0.0.1:%d", port) proxyURLParsed, err := url.Parse(proxyURL) if err != nil { continue } proxyClient := &http.Client{ Timeout: st.timeout, Transport: &http.Transport{ Proxy: http.ProxyURL(proxyURLParsed), TLSClientConfig: &tls.Config{ InsecureSkipVerify: false, }, DisableKeepAlives: true, }, } // 测试延迟 latency, ipAddress, location, err := st.testLatency(proxyClient) if err == nil { result.Latency = latency result.IPAddress = ipAddress result.Location = location result.Success = true // 测试下载速度 if downloadSpeed, err := st.testDownloadSpeed(proxyClient); err == nil { result.DownloadSpeed = downloadSpeed } // 测试上传速度 if uploadSpeed, err := st.testUploadSpeed(proxyClient); err == nil { result.UploadSpeed = uploadSpeed } applogger.Info("通过本地代理测试成功", map[string]interface{}{ "node": node.Name, "port": port, }) return result, nil } } return result, fmt.Errorf("无法通过本地代理测试节点: %s", node.Name) } // 测试延迟 func (st *SpeedTester) testLatency(client *http.Client) (int, string, string, error) { startTime := time.Now() // 尝试多个测试URL for _, testURL := range st.testURLs { req, err := http.NewRequest("HEAD", testURL, nil) if err != nil { continue } req.Header.Set("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36") resp, err := client.Do(req) if err != nil { continue } defer resp.Body.Close() latency := int(time.Since(startTime).Milliseconds()) ipAddress := st.extractIPAddress(resp) location := st.extractLocation(resp) return latency, ipAddress, location, nil } return 0, "", "", fmt.Errorf("所有测试URL都无法访问") } // 测试下载速度 func (st *SpeedTester) testDownloadSpeed(client *http.Client) (float64, error) { // 使用1MB的测试文件 testURL := "https://httpbin.org/bytes/1048576" startTime := time.Now() req, err := http.NewRequest("GET", testURL, nil) if err != nil { return 0, fmt.Errorf("创建请求失败: %w", err) } resp, err := client.Do(req) if err != nil { return 0, fmt.Errorf("下载测试失败: %w", err) } defer resp.Body.Close() // 读取响应体 body, err := io.ReadAll(resp.Body) if err != nil { return 0, fmt.Errorf("读取响应失败: %w", err) } duration := time.Since(startTime).Seconds() fileSize := len(body) // 计算下载速度 (Mbps) speedBps := float64(fileSize) / duration speedMbps := (speedBps * 8) / (1024 * 1024) return speedMbps, nil } // 测试上传速度 func (st *SpeedTester) testUploadSpeed(client *http.Client) (float64, error) { // 使用1MB的测试数据 testData := strings.Repeat("A", 1024*1024) startTime := time.Now() req, err := http.NewRequest("POST", "https://httpbin.org/post", strings.NewReader(testData)) if err != nil { return 0, fmt.Errorf("创建请求失败: %w", err) } req.Header.Set("Content-Type", "application/octet-stream") resp, err := client.Do(req) if err != nil { return 0, fmt.Errorf("上传测试失败: %w", err) } defer resp.Body.Close() duration := time.Since(startTime).Seconds() fileSize := len(testData) // 计算上传速度 (Mbps) speedBps := float64(fileSize) / duration speedMbps := (speedBps * 8) / (1024 * 1024) return speedMbps, nil } // 提取IP地址 func (st *SpeedTester) extractIPAddress(resp *http.Response) string { if ip := resp.Header.Get("X-Forwarded-For"); ip != "" { return strings.Split(ip, ",")[0] } if ip := resp.Header.Get("X-Real-IP"); ip != "" { return ip } if ip := resp.Header.Get("CF-Connecting-IP"); ip != "" { return ip } return "unknown" } // 提取位置信息 func (st *SpeedTester) extractLocation(resp *http.Response) string { if resp.Header.Get("CF-Ray") != "" { return "Cloudflare" } if country := resp.Header.Get("CF-IPCountry"); country != "" { return country } return "unknown" }