日期: 2026-04-09 目标: 用 Go + React 完全重写现有 Python TG Lead Scraper 系统
从 Telegram 频道、搜索引擎、GitHub、导航网站自动挖掘商户联系方式,经过清洗去重验证后按质量打分,输出可用商户清单。
技术栈:
单体服务 + 异步任务队列。API 和 Worker 在同一个 Go 进程内。
┌─────────────────────────────────────┐
│ Go 单体服务 │
│ ┌──────────┐ ┌──────────────────┐ │
│ │ HTTP API │ │ Task Worker │ │
│ │ (Gin) │ │ (7阶段Pipeline) │ │
│ └──────────┘ └──────────────────┘ │
│ ↕ ↕ │
│ ┌──────────┐ ┌──────────────────┐ │
│ │ MySQL │ │ Redis │ │
│ │ (持久化) │ │ (队列+缓存+锁) │ │
│ └──────────┘ └──────────────────┘ │
└─────────────────────────────────────┘
↑
React SPA (Nginx)
spider/
├── cmd/
│ └── server/
│ └── main.go # 入口,启动 API + Worker
├── internal/
│ ├── config/ # 配置加载 (YAML + DB managed_settings)
│ │ └── config.go
│ ├── model/ # MySQL 表结构 (GORM)
│ │ ├── seed.go
│ │ ├── keyword.go
│ │ ├── setting.go
│ │ ├── channel.go
│ │ ├── nav_site.go
│ │ ├── merchant_raw.go
│ │ ├── merchant_clean.go
│ │ ├── task.go
│ │ └── config_revision.go
│ ├── handler/ # HTTP handler (Gin)
│ │ ├── task.go
│ │ ├── merchant.go
│ │ ├── channel.go
│ │ ├── nav_site.go
│ │ ├── seed.go
│ │ ├── keyword.go
│ │ ├── config.go
│ │ └── dashboard.go
│ ├── service/ # 业务逻辑层
│ │ ├── task_service.go
│ │ ├── merchant_service.go
│ │ └── config_service.go
│ ├── pipeline/ # 7阶段 Pipeline 调度器
│ │ ├── pipeline.go # 调度器主逻辑
│ │ ├── phase1_discover.go # TG 频道裂变
│ │ ├── phase2_search.go # Serper 搜索
│ │ ├── phase3_github.go # GitHub README 挖掘
│ │ ├── phase4_scrape.go # TG 消息采集
│ │ ├── phase5_crawl.go # 网页爬取
│ │ ├── phase6_clean.go # 清洗三关
│ │ └── phase7_score.go # 评分
│ ├── telegram/ # gotd/td 封装
│ │ ├── client.go # TG 客户端封装
│ │ └── account_manager.go # 多账号轮换 + FloodWait
│ ├── search/ # 搜索引擎封装
│ │ └── serper.go # Serper API
│ ├── crawler/ # 网页爬取
│ │ ├── static.go # colly 静态爬取
│ │ └── dynamic.go # chromedp JS 渲染
│ ├── llm/ # LLM 统一接口
│ │ └── client.go # OpenAI 兼容接口封装
│ ├── extractor/ # 联系方式提取
│ │ ├── regex.go # 正则提取
│ │ └── llm_extractor.go # LLM 辅助提取
│ └── worker/ # asynq 任务 Worker
│ └── worker.go
├── web/ # React 前端
│ ├── src/
│ │ ├── pages/
│ │ │ ├── Dashboard.tsx # 总览仪表盘
│ │ │ ├── Tasks.tsx # 任务管理
│ │ │ ├── MerchantsRaw.tsx # 原始商户表
│ │ │ ├── MerchantsClean.tsx # 清洗商户表
│ │ │ ├── Channels.tsx # 频道管理
│ │ │ ├── NavSites.tsx # 导航网页
│ │ │ ├── Seeds.tsx # 种子管理
│ │ │ ├── Keywords.tsx # 关键词管理
│ │ │ └── Settings.tsx # 系统配置
│ │ ├── components/
│ │ │ ├── Layout.tsx
│ │ │ ├── TaskControl.tsx # 7阶段独立启动按钮
│ │ │ └── DataTable.tsx # 通用表格组件
│ │ ├── api/
│ │ │ └── index.ts # axios 封装
│ │ └── store/
│ │ └── index.ts # zustand 状态管理
│ ├── package.json
│ └── vite.config.ts
├── deploy/
│ ├── docker-compose.yml
│ ├── Dockerfile.api
│ ├── Dockerfile.web
│ └── nginx.conf
├── configs/
│ └── config.yaml # 默认配置
└── go.mod
CREATE DATABASE IF NOT EXISTS spider DEFAULT CHARACTER SET utf8mb4;
USE spider;
-- 种子管理
CREATE TABLE managed_seeds (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
channel_name VARCHAR(255) NOT NULL UNIQUE,
status ENUM('active','inactive') DEFAULT 'active',
note VARCHAR(500),
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
-- 关键词管理
CREATE TABLE managed_keywords (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
keyword VARCHAR(255) NOT NULL UNIQUE,
category VARCHAR(100),
status ENUM('active','inactive') DEFAULT 'active',
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
-- 运行参数
CREATE TABLE managed_settings (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
key_name VARCHAR(255) NOT NULL UNIQUE,
value TEXT NOT NULL,
value_type ENUM('int','float','bool','string','json') NOT NULL,
effect_level ENUM('runtime','new_task') DEFAULT 'runtime',
description VARCHAR(500),
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
-- 发现的 TG 频道
CREATE TABLE channels (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
username VARCHAR(255) NOT NULL UNIQUE,
title VARCHAR(500),
member_count INT DEFAULT 0,
about TEXT,
source ENUM('seed','snowball','search','github') NOT NULL,
source_detail VARCHAR(500),
status ENUM('pending','scraped','failed','skipped') DEFAULT 'pending',
last_message_id INT DEFAULT 0,
relevance_score FLOAT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_status (status),
INDEX idx_source (source)
);
-- 候选导航网页
CREATE TABLE nav_sites (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
url VARCHAR(2048) NOT NULL,
domain VARCHAR(255),
source VARCHAR(100),
status ENUM('pending','scraped','filtered','failed') DEFAULT 'pending',
filter_reason VARCHAR(255),
merchant_count INT DEFAULT 0,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
INDEX idx_status (status),
UNIQUE INDEX idx_url (url(500))
);
-- 原始商户
CREATE TABLE merchants_raw (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
merchant_name VARCHAR(500),
tg_username VARCHAR(255),
website VARCHAR(2048),
email VARCHAR(255),
phone VARCHAR(100),
industry VARCHAR(100),
source_type ENUM('tg_scrape','web_crawl','github') NOT NULL,
source_id VARCHAR(500),
original_message TEXT,
status ENUM('raw','glm_parsed') DEFAULT 'raw',
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
INDEX idx_status (status),
INDEX idx_tg_username (tg_username)
);
-- 清洗后商户
CREATE TABLE merchants_clean (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
raw_id BIGINT,
merchant_name VARCHAR(500),
tg_username VARCHAR(255),
website VARCHAR(2048),
email VARCHAR(255),
phone VARCHAR(100),
industry VARCHAR(100),
status ENUM('valid','invalid','bot','duplicate','group') NOT NULL,
tg_first_name VARCHAR(255),
tg_last_name VARCHAR(255),
is_premium TINYINT(1) DEFAULT 0,
last_online DATETIME,
active_level ENUM('active','moderate','inactive'),
member_count INT DEFAULT 0,
quality_score FLOAT DEFAULT 0,
source_count INT DEFAULT 1,
source_links JSON,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE INDEX idx_tg_username (tg_username),
INDEX idx_status (status),
INDEX idx_quality (quality_score),
INDEX idx_industry (industry)
);
-- 任务记录
CREATE TABLE tasks (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
task_type ENUM('full','discover','search','github','scrape','crawl','clean','score') NOT NULL,
status ENUM('pending','running','completed','failed','stopped') DEFAULT 'pending',
params JSON,
progress JSON,
result JSON,
error_msg TEXT,
started_at DATETIME,
finished_at DATETIME,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
INDEX idx_status (status)
);
-- 配置变更审计
CREATE TABLE config_revisions (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
setting_key VARCHAR(255) NOT NULL,
old_value TEXT,
new_value TEXT,
changed_by VARCHAR(100) DEFAULT 'admin',
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
| Key 模式 | 类型 | 用途 | TTL |
|---|---|---|---|
spider:task:queue:* |
asynq 内部 | 任务队列 | asynq 管理 |
spider:task:progress:{id} |
Hash | 实时进度 | 24h |
spider:task:lock:{type} |
String | 同类型任务互斥锁 | 自动释放 |
spider:tg:floodwait:{account} |
String | 账号冷却截止时间 | 按实际冷却 |
spider:cache:channel:{username} |
Hash | 频道信息缓存 | 24h |
spider:cache:settings |
Hash | managed_settings 热缓存 | 5min |
spider:dedup:merchant:{username} |
String | 采集时快速去重 | 7d |
所有 key 加 spider: 前缀避免和其他服务冲突。
前缀: /api/v1
POST /tasks/start
Body: {
"task_type": "full|discover|search|github|scrape|crawl|clean|score",
"target": "可选, 频道名或关键词",
"test_run": { "item_limit": 10, "message_limit": 50 }, // 可选
"skip_phases": ["scrape"] // 可选
}
Response: { "task_id": 1, "status": "pending" }
POST /tasks/:id/stop
Body: { "force": false }
GET /tasks
Query: ?status=running&page=1&page_size=20
Response: { "items": [...], "total": 100 }
GET /tasks/:id
Response: { "id":1, "task_type":"full", "status":"running", "progress": {...} }
GET /tasks/:id/logs
WebSocket 实时推送日志
GET /merchants/raw
Query: ?status=raw&source_type=tg_scrape&page=1&page_size=20
默认排序: created_at DESC
GET /merchants/clean
Query: ?status=valid&industry=机场&min_score=60&sort=quality_score&order=desc&page=1&page_size=20
GET /merchants/:id
Response: 商户详情 (raw 或 clean 自动判断)
GET /merchants/stats
Response: { "raw_total":1946, "clean_total":125, "valid":62, "by_source":{...}, "by_industry":{...} }
GET /channels
Query: ?status=pending&source=snowball&page=1&page_size=20
GET /channels/stats
Response: { "total":1907, "by_status":{...}, "by_source":{...} }
GET /nav-sites
Query: ?status=pending&page=1&page_size=20
GET /seeds
POST /seeds Body: { "channel_name": "@bbs3000", "note": "综合频道" }
PUT /seeds/:id Body: { "status": "inactive", "note": "..." }
DELETE /seeds/:id
GET /keywords Query: ?category=机场&status=active
POST /keywords Body: { "keywords": ["机场推荐","发卡网"], "category": "机场" } // 支持批量
PUT /keywords/:id Body: { "keyword": "...", "category": "..." }
DELETE /keywords/:id
GET /config/settings
PUT /config/settings/:key Body: { "value": "500" } // 自动写审计日志
GET /dashboard
Response: {
"channels_total": 1907,
"merchants_raw_total": 1946,
"merchants_clean_total": 125,
"merchants_valid": 62,
"nav_sites_total": 646,
"recent_tasks": [...最近5个任务],
"running_task": null | {...}
}
输入: managed_seeds (status=active) 输出: channels 表
FloodWait 处理: 由 AccountManager 统一管理
输入: managed_keywords (status=active) 输出: channels + nav_sites
t.me/xxx → 写 channels 表, source='search'输入: 预设 query 列表 (从 managed_keywords 生成) 输出: channels 表
注意: 加 GitHub token 到配置,提升到 30 req/min
输入: channels 表 (status=pending) 输出: merchants_raw
输入: nav_sites 表 (status=pending) 输出: merchants_raw
输入: merchants_raw (status=raw) 输出: merchants_clean
第一关: 黑名单过滤 (本地, 毫秒级)
第二关: 去重 (本地, 毫秒级)
第三关: TG 真实性验证 (需 TG API)
输入: merchants_clean (status=valid) 输出: 更新 merchants_clean.quality_score
6 维度加权打分 (总权重 1.0):
| 维度 | 权重 | 规则 |
|---|---|---|
| member_count | 0.25 | <100→10, <1k→30, <1w→50, <10w→80, ≥10w→100 |
| premium | 0.15 | is_premium=true→100, false→0 |
| activity | 0.25 | active→100, moderate→50, inactive→20 |
| multi_source | 0.20 | source_count≥4→100, 3→70, 2→40, 1→10 |
| has_website | 0.10 | website非空→100, 空→0 |
| has_email | 0.05 | email非空→100, 空→0 |
quality_score = 各维度得分 × 权重 之和 (0-100)
// AccountManager 管理多个 TG 账号的连接和限速
type AccountManager struct {
accounts []TGAccount
mu sync.Mutex
redis *redis.Client
}
type TGAccount struct {
Phone string
SessionFile string
Client *telegram.Client // gotd/td client
CoolUntil time.Time
}
// Acquire 获取一个当前可用的账号
// 跳过正在冷却的账号,全部冷却则返回错误
func (m *AccountManager) Acquire(ctx context.Context) (*TGAccount, error)
// Release 归还账号,如果触发了 FloodWait 则标记冷却
// 冷却截止时间同步到 Redis,重启不丢失
func (m *AccountManager) Release(acc *TGAccount, floodWait time.Duration)
// FloodWait 策略:
// ≤60s → 当前账号等待后重试
// >60s → Release + 切换账号
// >300s → 整轮 break,标记所有账号最小冷却 300s
// LLMClient 封装 OpenAI 兼容接口
type LLMClient struct {
client *openai.Client
model string // 可配置: gpt-4o / claude-3-sonnet / glm-4 等
baseURL string // 可配置: 指向不同提供商
timeout time.Duration
}
// 用途 1: 频道相关性评估
func (c *LLMClient) EvalChannelRelevance(name, about string, memberCount int) (float64, error)
// 用途 2: 消息商户解析
func (c *LLMClient) ParseMerchant(message string) (*MerchantInfo, error)
// 用途 3: 行业分类
func (c *LLMClient) ClassifyIndustry(name, about string) (string, error)
// 用途 4: 导航站判断
func (c *LLMClient) IsNavSite(url string) (bool, float64, error)
配置示例 (config.yaml):
llm:
provider: "openai" # openai / claude / glm
base_url: "https://api.openai.com/v1"
api_key: "sk-xxx"
model: "gpt-4o-mini"
timeout: 30s
// 正则提取 (优先, 零成本)
func ExtractByRegex(text string) *ContactInfo
// 正则模式:
// TG 用户名: @[a-zA-Z][a-zA-Z0-9_]{4,31}
// TG 链接: t\.me/[a-zA-Z0-9_]{5,32}
// 变体: t点me, t . me, tg: 等
// 邮箱: 标准 email 正则
// 电话: +国际区号格式
// 网址: https?://...
// 微信变体: 加V, 加v, vx, wx, 微信 后跟联系方式
// LLM 提取 (正则没提取到但文本可能含非标准联系方式时)
func ExtractByLLM(client *LLMClient, text string) *ContactInfo
请求优先级:
1. colly (静态 HTTP, 最快)
2. chromedp (无头浏览器, 处理 JS 渲染)
预过滤规则引擎:
- 黑名单域名: t.me, twitter.com, google.com, facebook.com ... (80+)
- 黑名单扩展名: .apk, .zip, .pdf, .exe ... (40+)
- 黑名单路径: /api/, /login/, ?ref= ...
- 正向信号: URL 含 nav/directory/catalog/list → 直接通过
- 不确定 → LLM 二次过滤 (置信度 ≥0.6 放行)
t.me 死号预检:
- 抓 t.me/{username} 网页
- 没有头像元素 → 判定死号, 直接丢弃
- 有头像 → 活号, 继续入库
使用 React + Ant Design + zustand + vite
| 页面 | 路径 | 功能 |
|---|---|---|
| 仪表盘 | / | 各表计数、运行中任务、最近任务 |
| 任务管理 | /tasks | 7阶段独立启动按钮、full pipeline、任务列表、实时进度、停止 |
| 原始商户 | /merchants/raw | 表格 (分页/过滤/按created_at desc) |
| 清洗商户 | /merchants/clean | 表格 (分页/过滤/排序by score) |
| 频道列表 | /channels | 表格 (分页/状态过滤/来源过滤) |
| 导航网页 | /nav-sites | 表格 (分页/状态过滤) |
| 种子管理 | /seeds | CRUD 表格 |
| 关键词管理 | /keywords | CRUD 表格 (支持批量添加) |
| 系统配置 | /settings | 三个 tab: 种子/关键词/流水线参数 |
| 系统日志 | /logs | 实时运行日志 (WebSocket) |
┌────────────────────────────────────┐
│ Logo 商户查找系统 Admin │ ← 顶栏
├──────┬─────────────────────────────┤
│ │ │
│ 仪表盘│ 内容区域 │
│ 任务 │ │
│ 商户 │ │
│ 频道 │ │
│ 网页 │ │
│ 配置 │ │
│ 日志 │ │
│ │ │
├──────┴─────────────────────────────┤
└────────────────────────────────────┘
← 侧边栏导航
# configs/config.yaml
server:
port: 8080
mysql:
host: "mysql" # Docker 容器名
port: 3306
user: "root"
password: "root123"
database: "spider"
redis:
host: "redis" # Docker 容器名
port: 6379
password: ""
db: 3
telegram:
accounts:
- phone: "+1234567890"
session_file: "sessions/account_01.session"
- phone: "+0987654321"
session_file: "sessions/account_02.session"
app_id: 12345
app_hash: "abcdef1234567890"
llm:
provider: "openai"
base_url: "https://api.openai.com/v1"
api_key: "sk-xxx"
model: "gpt-4o-mini"
timeout: 30s
serper:
api_key: "xxx"
results_per_page: 10
max_pages: 3
github:
token: "" # 可选, 有则 30 req/min
# deploy/docker-compose.yml
version: "3.8"
services:
api:
build:
context: ..
dockerfile: deploy/Dockerfile.api
ports:
- "8080:8080"
volumes:
- ../configs:/app/configs
- ../sessions:/app/sessions
depends_on: []
restart: unless-stopped
networks:
- default
- external_db # 连接已有 MySQL/Redis 的网络
web:
build:
context: ..
dockerfile: deploy/Dockerfile.web
ports:
- "80:80"
depends_on:
- api
restart: unless-stopped
networks:
external_db:
external: true # 已有 MySQL/Redis 所在的 Docker network
name: <现有网络名> # 需要确认
| key | value | type | effect_level | description |
|---|---|---|---|---|
| pipeline.skip_phases | [] | json | new_task | 默认跳过的阶段 |
| pipeline.checkpoint_interval | 30 | int | runtime | 进度上报间隔(秒) |
| tg_scraper.message_limit_per_channel | 500 | int | runtime | 每频道最大消息数 |
| tg_scraper.delay_per_message | 1.0 | float | runtime | 消息间延迟(秒) |
| tg_scraper.delay_per_channel | 5.0 | float | runtime | 频道间延迟(秒) |
| tg_scraper.delay_per_verify | 3.0 | float | runtime | 验证间延迟(秒) |
| clean.timeout_seconds | 3600 | int | runtime | 清洗阶段超时 |
| search.timeout_seconds | 3600 | int | runtime | 搜索阶段超时 |
| snowball.max_channels_per_layer | 200 | int | runtime | 每层最大频道数 |
| snowball.max_channels_total | 500 | int | runtime | 总最大频道数 |
| tme_validator.enabled | true | bool | runtime | 启用t.me死号预检 |
| tme_validator.rate_per_min | 60 | int | runtime | 预检限速 |
| tme_validator.concurrency | 10 | int | runtime | 预检并发数 |