| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300 |
- package handler
- import (
- "context"
- "encoding/csv"
- "fmt"
- "strconv"
- "time"
- "spider/internal/store"
- "spider/internal/telegram"
- "github.com/gin-gonic/gin"
- "github.com/redis/go-redis/v9"
- )
- // GroupHandler handles group-member relationship queries.
- type GroupHandler struct {
- store *store.Store
- tgManager *telegram.AccountManager
- rdb *redis.Client
- }
- // ListGroups handles GET /groups — returns all groups with member counts.
- func (h *GroupHandler) ListGroups(c *gin.Context) {
- page, pageSize, _ := parsePage(c)
- search := c.Query("search")
- groups, total, err := h.store.ListGroups(page, pageSize, search)
- if err != nil {
- Fail(c, 500, err.Error())
- return
- }
- PageOK(c, groups, total, page, pageSize)
- }
- // SearchMembers handles GET /members/search — search members across all groups.
- func (h *GroupHandler) SearchMembers(c *gin.Context) {
- query := c.Query("q")
- if query == "" {
- Fail(c, 400, "搜索关键词不能为空")
- return
- }
- page, pageSize, _ := parsePage(c)
- members, total, err := h.store.SearchMembers(query, page, pageSize)
- if err != nil {
- Fail(c, 500, err.Error())
- return
- }
- PageOK(c, members, total, page, pageSize)
- }
- // ListMembers handles GET /groups/:username/members — members of a group.
- func (h *GroupHandler) ListMembers(c *gin.Context) {
- username := c.Param("username")
- search := c.Query("search")
- page, pageSize, _ := parsePage(c)
- members, total, err := h.store.ListMembersByGroup(username, page, pageSize, search)
- if err != nil {
- Fail(c, 500, err.Error())
- return
- }
- PageOK(c, members, total, page, pageSize)
- }
- // ListMemberGroups handles GET /merchants/:username/groups — groups a member belongs to.
- func (h *GroupHandler) ListMemberGroups(c *gin.Context) {
- username := c.Param("username")
- groups, err := h.store.ListGroupsByMember(username)
- if err != nil {
- Fail(c, 500, err.Error())
- return
- }
- OK(c, groups)
- }
- // ExportMembers handles GET /groups/:username/members/export — streams all members as CSV.
- func (h *GroupHandler) ExportMembers(c *gin.Context) {
- username := c.Param("username")
- members, _, err := h.store.ListMembersByGroup(username, 1, 100000, "")
- if err != nil {
- Fail(c, 500, err.Error())
- return
- }
- c.Header("Content-Disposition", fmt.Sprintf(`attachment; filename="members_%s.csv"`, username))
- c.Header("Content-Type", "text/csv; charset=utf-8")
- c.Writer.Write([]byte("\xef\xbb\xbf")) // UTF-8 BOM for Excel
- w := csv.NewWriter(c.Writer)
- _ = w.Write([]string{"用户名", "来源类型", "发现时间"})
- for _, m := range members {
- discoveredAt := ""
- if !m.DiscoveredAt.IsZero() {
- discoveredAt = m.DiscoveredAt.Format("2006-01-02 15:04:05")
- }
- _ = w.Write([]string{m.MemberUsername, m.SourceType, discoveredAt})
- }
- w.Flush()
- }
- // PrepareGroup handles POST /groups/:username/prepare
- // Has every enabled TG account attempt to join the group (no scraping).
- // Useful before a clone run so accounts "age" into the group first, which
- // reduces TG's anti-scrape restrictions. Best to wait hours/days after this
- // before calling CloneMembers.
- func (h *GroupHandler) PrepareGroup(c *gin.Context) {
- username := c.Param("username")
- if username == "" {
- Fail(c, 400, "群组用户名不能为空")
- return
- }
- if h.tgManager == nil {
- Fail(c, 500, "TG 账号管理器未初始化")
- return
- }
- ctx, cancel := context.WithTimeout(c.Request.Context(), 5*time.Minute)
- defer cancel()
- results, err := telegram.PrepareGroup(ctx, h.tgManager, username)
- if err != nil {
- Fail(c, 500, fmt.Sprintf("预热失败: %v", err))
- return
- }
- joined := 0
- for _, r := range results {
- if r.Joined {
- joined++
- }
- }
- LogAudit(h.store, c, "prepare_group", "group", username, gin.H{
- "joined": joined,
- "total": len(results),
- })
- OK(c, gin.H{
- "group_username": username,
- "total_accounts": len(results),
- "joined": joined,
- "results": results,
- })
- }
- // CloneByMessages handles POST /groups/:username/clone-by-messages?max=2000
- // Scrapes recent message senders (different API path than GetParticipants,
- // typically yields far more usernames for active groups).
- func (h *GroupHandler) CloneByMessages(c *gin.Context) {
- username := c.Param("username")
- if username == "" {
- Fail(c, 400, "群组用户名不能为空")
- return
- }
- if h.tgManager == nil || h.rdb == nil {
- Fail(c, 500, "依赖未初始化")
- return
- }
- maxMessages := 2000
- if v := c.Query("max"); v != "" {
- if n, err := strconv.Atoi(v); err == nil && n > 0 && n <= 20000 {
- maxMessages = n
- }
- }
- ctx, cancel := context.WithTimeout(c.Request.Context(), 10*time.Minute)
- defer cancel()
- res, err := telegram.CloneGroupMembersFromMessages(ctx, h.tgManager, h.rdb, username, maxMessages)
- if err != nil && res == nil {
- Fail(c, 500, fmt.Sprintf("消息爬取失败: %v", err))
- return
- }
- if res == nil {
- Fail(c, 500, "爬取返回空结果")
- return
- }
- // Persist with-username senders to DB (shared path with CloneMembers).
- var usernames []string
- for _, p := range res.Participants {
- if p.Username != "" && !p.IsBot {
- usernames = append(usernames, p.Username)
- }
- }
- groupTitle := res.GroupTitle
- if groupTitle == "" {
- groupTitle = username
- }
- created := 0
- if len(usernames) > 0 {
- created = h.store.BatchSaveGroupMembers(username, groupTitle, "tg_clone", usernames)
- }
- LogAudit(h.store, c, "clone_by_messages", "group", username, gin.H{
- "max_messages": maxMessages,
- "senders_found": len(res.Participants),
- "with_username": len(usernames),
- "new_saved": created,
- "partial": res.Partial,
- "status": res.Status,
- })
- OK(c, gin.H{
- "group_username": username,
- "group_title": groupTitle,
- "total_participants": len(res.Participants),
- "with_username": len(usernames),
- "new_saved": created,
- "partial": res.Partial,
- "status": res.Status,
- "max_messages": maxMessages,
- "progress": gin.H{
- "collected": len(res.Participants),
- "total": res.Total,
- "queries_done": res.QueriesDone,
- "queries_total": res.QueriesTotal,
- },
- })
- }
- // CloneMembers handles POST /groups/:username/clone-members
- // Runs a multi-account, FloodWait-aware, resumable clone. Progress lives in
- // Redis so a call that hits "all accounts cooling" can return Partial=true
- // and be retried later to continue where it left off.
- // Query param ?reset=1 discards prior progress and restarts from scratch.
- func (h *GroupHandler) CloneMembers(c *gin.Context) {
- username := c.Param("username")
- if username == "" {
- Fail(c, 400, "群组用户名不能为空")
- return
- }
- if h.tgManager == nil {
- Fail(c, 500, "TG 账号管理器未初始化")
- return
- }
- if h.rdb == nil {
- Fail(c, 500, "Redis 客户端未初始化")
- return
- }
- reset := c.Query("reset") == "1"
- ctx, cancel := context.WithTimeout(c.Request.Context(), 10*time.Minute)
- defer cancel()
- res, err := telegram.CloneGroupMembers(ctx, h.tgManager, h.rdb, username, reset)
- if err != nil && res == nil {
- Fail(c, 500, fmt.Sprintf("克隆失败: %v", err))
- return
- }
- if res == nil {
- Fail(c, 500, "克隆返回空结果")
- return
- }
- // Filter + persist all currently-known participants with usernames.
- var usernames []string
- for _, p := range res.Participants {
- if p.Username != "" && !p.IsBot {
- usernames = append(usernames, p.Username)
- }
- }
- groupTitle := res.GroupTitle
- if groupTitle == "" {
- groupTitle = username
- }
- created := 0
- if len(usernames) > 0 {
- created = h.store.BatchSaveGroupMembers(username, groupTitle, "tg_clone", usernames)
- }
- LogAudit(h.store, c, "clone_members", "group", username, gin.H{
- "total_participants": len(res.Participants),
- "with_username": len(usernames),
- "new_saved": created,
- "partial": res.Partial,
- "status": res.Status,
- "queries_done": res.QueriesDone,
- "queries_total": res.QueriesTotal,
- "reset": reset,
- })
- OK(c, gin.H{
- "group_username": username,
- "group_title": groupTitle,
- "total_participants": len(res.Participants),
- "with_username": len(usernames),
- "new_saved": created,
- "partial": res.Partial,
- "status": res.Status,
- "progress": gin.H{
- "collected": len(res.Participants),
- "total": res.Total,
- "queries_done": res.QueriesDone,
- "queries_total": res.QueriesTotal,
- },
- })
- }
|