package handler import ( "context" "encoding/csv" "fmt" "strconv" "time" "spider/internal/store" "spider/internal/telegram" "github.com/gin-gonic/gin" "github.com/redis/go-redis/v9" ) // GroupHandler handles group-member relationship queries. type GroupHandler struct { store *store.Store tgManager *telegram.AccountManager rdb *redis.Client } // ListGroups handles GET /groups — returns all groups with member counts. func (h *GroupHandler) ListGroups(c *gin.Context) { page, pageSize, _ := parsePage(c) search := c.Query("search") groups, total, err := h.store.ListGroups(page, pageSize, search) if err != nil { Fail(c, 500, err.Error()) return } PageOK(c, groups, total, page, pageSize) } // SearchMembers handles GET /members/search — search members across all groups. func (h *GroupHandler) SearchMembers(c *gin.Context) { query := c.Query("q") if query == "" { Fail(c, 400, "搜索关键词不能为空") return } page, pageSize, _ := parsePage(c) members, total, err := h.store.SearchMembers(query, page, pageSize) if err != nil { Fail(c, 500, err.Error()) return } PageOK(c, members, total, page, pageSize) } // ListMembers handles GET /groups/:username/members — members of a group. func (h *GroupHandler) ListMembers(c *gin.Context) { username := c.Param("username") search := c.Query("search") page, pageSize, _ := parsePage(c) members, total, err := h.store.ListMembersByGroup(username, page, pageSize, search) if err != nil { Fail(c, 500, err.Error()) return } PageOK(c, members, total, page, pageSize) } // ListMemberGroups handles GET /merchants/:username/groups — groups a member belongs to. func (h *GroupHandler) ListMemberGroups(c *gin.Context) { username := c.Param("username") groups, err := h.store.ListGroupsByMember(username) if err != nil { Fail(c, 500, err.Error()) return } OK(c, groups) } // ExportMembers handles GET /groups/:username/members/export — streams all members as CSV. func (h *GroupHandler) ExportMembers(c *gin.Context) { username := c.Param("username") members, _, err := h.store.ListMembersByGroup(username, 1, 100000, "") if err != nil { Fail(c, 500, err.Error()) return } c.Header("Content-Disposition", fmt.Sprintf(`attachment; filename="members_%s.csv"`, username)) c.Header("Content-Type", "text/csv; charset=utf-8") c.Writer.Write([]byte("\xef\xbb\xbf")) // UTF-8 BOM for Excel w := csv.NewWriter(c.Writer) _ = w.Write([]string{"用户名", "来源类型", "发现时间"}) for _, m := range members { discoveredAt := "" if !m.DiscoveredAt.IsZero() { discoveredAt = m.DiscoveredAt.Format("2006-01-02 15:04:05") } _ = w.Write([]string{m.MemberUsername, m.SourceType, discoveredAt}) } w.Flush() } // PrepareGroup handles POST /groups/:username/prepare // Has every enabled TG account attempt to join the group (no scraping). // Useful before a clone run so accounts "age" into the group first, which // reduces TG's anti-scrape restrictions. Best to wait hours/days after this // before calling CloneMembers. func (h *GroupHandler) PrepareGroup(c *gin.Context) { username := c.Param("username") if username == "" { Fail(c, 400, "群组用户名不能为空") return } if h.tgManager == nil { Fail(c, 500, "TG 账号管理器未初始化") return } ctx, cancel := context.WithTimeout(c.Request.Context(), 5*time.Minute) defer cancel() results, err := telegram.PrepareGroup(ctx, h.tgManager, username) if err != nil { Fail(c, 500, fmt.Sprintf("预热失败: %v", err)) return } joined := 0 for _, r := range results { if r.Joined { joined++ } } LogAudit(h.store, c, "prepare_group", "group", username, gin.H{ "joined": joined, "total": len(results), }) OK(c, gin.H{ "group_username": username, "total_accounts": len(results), "joined": joined, "results": results, }) } // CloneByMessages handles POST /groups/:username/clone-by-messages?max=2000 // Scrapes recent message senders (different API path than GetParticipants, // typically yields far more usernames for active groups). func (h *GroupHandler) CloneByMessages(c *gin.Context) { username := c.Param("username") if username == "" { Fail(c, 400, "群组用户名不能为空") return } if h.tgManager == nil || h.rdb == nil { Fail(c, 500, "依赖未初始化") return } maxMessages := 2000 if v := c.Query("max"); v != "" { if n, err := strconv.Atoi(v); err == nil && n > 0 && n <= 20000 { maxMessages = n } } ctx, cancel := context.WithTimeout(c.Request.Context(), 10*time.Minute) defer cancel() res, err := telegram.CloneGroupMembersFromMessages(ctx, h.tgManager, h.rdb, username, maxMessages) if err != nil && res == nil { Fail(c, 500, fmt.Sprintf("消息爬取失败: %v", err)) return } if res == nil { Fail(c, 500, "爬取返回空结果") return } // Persist with-username senders to DB (shared path with CloneMembers). var usernames []string for _, p := range res.Participants { if p.Username != "" && !p.IsBot { usernames = append(usernames, p.Username) } } groupTitle := res.GroupTitle if groupTitle == "" { groupTitle = username } created := 0 if len(usernames) > 0 { created = h.store.BatchSaveGroupMembers(username, groupTitle, "tg_clone", usernames) } LogAudit(h.store, c, "clone_by_messages", "group", username, gin.H{ "max_messages": maxMessages, "senders_found": len(res.Participants), "with_username": len(usernames), "new_saved": created, "partial": res.Partial, "status": res.Status, }) OK(c, gin.H{ "group_username": username, "group_title": groupTitle, "total_participants": len(res.Participants), "with_username": len(usernames), "new_saved": created, "partial": res.Partial, "status": res.Status, "max_messages": maxMessages, "progress": gin.H{ "collected": len(res.Participants), "total": res.Total, "queries_done": res.QueriesDone, "queries_total": res.QueriesTotal, }, }) } // CloneMembers handles POST /groups/:username/clone-members // Runs a multi-account, FloodWait-aware, resumable clone. Progress lives in // Redis so a call that hits "all accounts cooling" can return Partial=true // and be retried later to continue where it left off. // Query param ?reset=1 discards prior progress and restarts from scratch. func (h *GroupHandler) CloneMembers(c *gin.Context) { username := c.Param("username") if username == "" { Fail(c, 400, "群组用户名不能为空") return } if h.tgManager == nil { Fail(c, 500, "TG 账号管理器未初始化") return } if h.rdb == nil { Fail(c, 500, "Redis 客户端未初始化") return } reset := c.Query("reset") == "1" ctx, cancel := context.WithTimeout(c.Request.Context(), 10*time.Minute) defer cancel() res, err := telegram.CloneGroupMembers(ctx, h.tgManager, h.rdb, username, reset) if err != nil && res == nil { Fail(c, 500, fmt.Sprintf("克隆失败: %v", err)) return } if res == nil { Fail(c, 500, "克隆返回空结果") return } // Filter + persist all currently-known participants with usernames. var usernames []string for _, p := range res.Participants { if p.Username != "" && !p.IsBot { usernames = append(usernames, p.Username) } } groupTitle := res.GroupTitle if groupTitle == "" { groupTitle = username } created := 0 if len(usernames) > 0 { created = h.store.BatchSaveGroupMembers(username, groupTitle, "tg_clone", usernames) } LogAudit(h.store, c, "clone_members", "group", username, gin.H{ "total_participants": len(res.Participants), "with_username": len(usernames), "new_saved": created, "partial": res.Partial, "status": res.Status, "queries_done": res.QueriesDone, "queries_total": res.QueriesTotal, "reset": reset, }) OK(c, gin.H{ "group_username": username, "group_title": groupTitle, "total_participants": len(res.Participants), "with_username": len(usernames), "new_saved": created, "partial": res.Partial, "status": res.Status, "progress": gin.H{ "collected": len(res.Participants), "total": res.Total, "queries_done": res.QueriesDone, "queries_total": res.QueriesTotal, }, }) }