group.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300
  1. package handler
  2. import (
  3. "context"
  4. "encoding/csv"
  5. "fmt"
  6. "strconv"
  7. "time"
  8. "spider/internal/store"
  9. "spider/internal/telegram"
  10. "github.com/gin-gonic/gin"
  11. "github.com/redis/go-redis/v9"
  12. )
  13. // GroupHandler handles group-member relationship queries.
  14. type GroupHandler struct {
  15. store *store.Store
  16. tgManager *telegram.AccountManager
  17. rdb *redis.Client
  18. }
  19. // ListGroups handles GET /groups — returns all groups with member counts.
  20. func (h *GroupHandler) ListGroups(c *gin.Context) {
  21. page, pageSize, _ := parsePage(c)
  22. search := c.Query("search")
  23. groups, total, err := h.store.ListGroups(page, pageSize, search)
  24. if err != nil {
  25. Fail(c, 500, err.Error())
  26. return
  27. }
  28. PageOK(c, groups, total, page, pageSize)
  29. }
  30. // SearchMembers handles GET /members/search — search members across all groups.
  31. func (h *GroupHandler) SearchMembers(c *gin.Context) {
  32. query := c.Query("q")
  33. if query == "" {
  34. Fail(c, 400, "搜索关键词不能为空")
  35. return
  36. }
  37. page, pageSize, _ := parsePage(c)
  38. members, total, err := h.store.SearchMembers(query, page, pageSize)
  39. if err != nil {
  40. Fail(c, 500, err.Error())
  41. return
  42. }
  43. PageOK(c, members, total, page, pageSize)
  44. }
  45. // ListMembers handles GET /groups/:username/members — members of a group.
  46. func (h *GroupHandler) ListMembers(c *gin.Context) {
  47. username := c.Param("username")
  48. search := c.Query("search")
  49. page, pageSize, _ := parsePage(c)
  50. members, total, err := h.store.ListMembersByGroup(username, page, pageSize, search)
  51. if err != nil {
  52. Fail(c, 500, err.Error())
  53. return
  54. }
  55. PageOK(c, members, total, page, pageSize)
  56. }
  57. // ListMemberGroups handles GET /merchants/:username/groups — groups a member belongs to.
  58. func (h *GroupHandler) ListMemberGroups(c *gin.Context) {
  59. username := c.Param("username")
  60. groups, err := h.store.ListGroupsByMember(username)
  61. if err != nil {
  62. Fail(c, 500, err.Error())
  63. return
  64. }
  65. OK(c, groups)
  66. }
  67. // ExportMembers handles GET /groups/:username/members/export — streams all members as CSV.
  68. func (h *GroupHandler) ExportMembers(c *gin.Context) {
  69. username := c.Param("username")
  70. members, _, err := h.store.ListMembersByGroup(username, 1, 100000, "")
  71. if err != nil {
  72. Fail(c, 500, err.Error())
  73. return
  74. }
  75. c.Header("Content-Disposition", fmt.Sprintf(`attachment; filename="members_%s.csv"`, username))
  76. c.Header("Content-Type", "text/csv; charset=utf-8")
  77. c.Writer.Write([]byte("\xef\xbb\xbf")) // UTF-8 BOM for Excel
  78. w := csv.NewWriter(c.Writer)
  79. _ = w.Write([]string{"用户名", "来源类型", "发现时间"})
  80. for _, m := range members {
  81. discoveredAt := ""
  82. if !m.DiscoveredAt.IsZero() {
  83. discoveredAt = m.DiscoveredAt.Format("2006-01-02 15:04:05")
  84. }
  85. _ = w.Write([]string{m.MemberUsername, m.SourceType, discoveredAt})
  86. }
  87. w.Flush()
  88. }
  89. // PrepareGroup handles POST /groups/:username/prepare
  90. // Has every enabled TG account attempt to join the group (no scraping).
  91. // Useful before a clone run so accounts "age" into the group first, which
  92. // reduces TG's anti-scrape restrictions. Best to wait hours/days after this
  93. // before calling CloneMembers.
  94. func (h *GroupHandler) PrepareGroup(c *gin.Context) {
  95. username := c.Param("username")
  96. if username == "" {
  97. Fail(c, 400, "群组用户名不能为空")
  98. return
  99. }
  100. if h.tgManager == nil {
  101. Fail(c, 500, "TG 账号管理器未初始化")
  102. return
  103. }
  104. ctx, cancel := context.WithTimeout(c.Request.Context(), 5*time.Minute)
  105. defer cancel()
  106. results, err := telegram.PrepareGroup(ctx, h.tgManager, username)
  107. if err != nil {
  108. Fail(c, 500, fmt.Sprintf("预热失败: %v", err))
  109. return
  110. }
  111. joined := 0
  112. for _, r := range results {
  113. if r.Joined {
  114. joined++
  115. }
  116. }
  117. LogAudit(h.store, c, "prepare_group", "group", username, gin.H{
  118. "joined": joined,
  119. "total": len(results),
  120. })
  121. OK(c, gin.H{
  122. "group_username": username,
  123. "total_accounts": len(results),
  124. "joined": joined,
  125. "results": results,
  126. })
  127. }
  128. // CloneByMessages handles POST /groups/:username/clone-by-messages?max=2000
  129. // Scrapes recent message senders (different API path than GetParticipants,
  130. // typically yields far more usernames for active groups).
  131. func (h *GroupHandler) CloneByMessages(c *gin.Context) {
  132. username := c.Param("username")
  133. if username == "" {
  134. Fail(c, 400, "群组用户名不能为空")
  135. return
  136. }
  137. if h.tgManager == nil || h.rdb == nil {
  138. Fail(c, 500, "依赖未初始化")
  139. return
  140. }
  141. maxMessages := 2000
  142. if v := c.Query("max"); v != "" {
  143. if n, err := strconv.Atoi(v); err == nil && n > 0 && n <= 20000 {
  144. maxMessages = n
  145. }
  146. }
  147. ctx, cancel := context.WithTimeout(c.Request.Context(), 10*time.Minute)
  148. defer cancel()
  149. res, err := telegram.CloneGroupMembersFromMessages(ctx, h.tgManager, h.rdb, username, maxMessages)
  150. if err != nil && res == nil {
  151. Fail(c, 500, fmt.Sprintf("消息爬取失败: %v", err))
  152. return
  153. }
  154. if res == nil {
  155. Fail(c, 500, "爬取返回空结果")
  156. return
  157. }
  158. // Persist with-username senders to DB (shared path with CloneMembers).
  159. var usernames []string
  160. for _, p := range res.Participants {
  161. if p.Username != "" && !p.IsBot {
  162. usernames = append(usernames, p.Username)
  163. }
  164. }
  165. groupTitle := res.GroupTitle
  166. if groupTitle == "" {
  167. groupTitle = username
  168. }
  169. created := 0
  170. if len(usernames) > 0 {
  171. created = h.store.BatchSaveGroupMembers(username, groupTitle, "tg_clone", usernames)
  172. }
  173. LogAudit(h.store, c, "clone_by_messages", "group", username, gin.H{
  174. "max_messages": maxMessages,
  175. "senders_found": len(res.Participants),
  176. "with_username": len(usernames),
  177. "new_saved": created,
  178. "partial": res.Partial,
  179. "status": res.Status,
  180. })
  181. OK(c, gin.H{
  182. "group_username": username,
  183. "group_title": groupTitle,
  184. "total_participants": len(res.Participants),
  185. "with_username": len(usernames),
  186. "new_saved": created,
  187. "partial": res.Partial,
  188. "status": res.Status,
  189. "max_messages": maxMessages,
  190. "progress": gin.H{
  191. "collected": len(res.Participants),
  192. "total": res.Total,
  193. "queries_done": res.QueriesDone,
  194. "queries_total": res.QueriesTotal,
  195. },
  196. })
  197. }
  198. // CloneMembers handles POST /groups/:username/clone-members
  199. // Runs a multi-account, FloodWait-aware, resumable clone. Progress lives in
  200. // Redis so a call that hits "all accounts cooling" can return Partial=true
  201. // and be retried later to continue where it left off.
  202. // Query param ?reset=1 discards prior progress and restarts from scratch.
  203. func (h *GroupHandler) CloneMembers(c *gin.Context) {
  204. username := c.Param("username")
  205. if username == "" {
  206. Fail(c, 400, "群组用户名不能为空")
  207. return
  208. }
  209. if h.tgManager == nil {
  210. Fail(c, 500, "TG 账号管理器未初始化")
  211. return
  212. }
  213. if h.rdb == nil {
  214. Fail(c, 500, "Redis 客户端未初始化")
  215. return
  216. }
  217. reset := c.Query("reset") == "1"
  218. ctx, cancel := context.WithTimeout(c.Request.Context(), 10*time.Minute)
  219. defer cancel()
  220. res, err := telegram.CloneGroupMembers(ctx, h.tgManager, h.rdb, username, reset)
  221. if err != nil && res == nil {
  222. Fail(c, 500, fmt.Sprintf("克隆失败: %v", err))
  223. return
  224. }
  225. if res == nil {
  226. Fail(c, 500, "克隆返回空结果")
  227. return
  228. }
  229. // Filter + persist all currently-known participants with usernames.
  230. var usernames []string
  231. for _, p := range res.Participants {
  232. if p.Username != "" && !p.IsBot {
  233. usernames = append(usernames, p.Username)
  234. }
  235. }
  236. groupTitle := res.GroupTitle
  237. if groupTitle == "" {
  238. groupTitle = username
  239. }
  240. created := 0
  241. if len(usernames) > 0 {
  242. created = h.store.BatchSaveGroupMembers(username, groupTitle, "tg_clone", usernames)
  243. }
  244. LogAudit(h.store, c, "clone_members", "group", username, gin.H{
  245. "total_participants": len(res.Participants),
  246. "with_username": len(usernames),
  247. "new_saved": created,
  248. "partial": res.Partial,
  249. "status": res.Status,
  250. "queries_done": res.QueriesDone,
  251. "queries_total": res.QueriesTotal,
  252. "reset": reset,
  253. })
  254. OK(c, gin.H{
  255. "group_username": username,
  256. "group_title": groupTitle,
  257. "total_participants": len(res.Participants),
  258. "with_username": len(usernames),
  259. "new_saved": created,
  260. "partial": res.Partial,
  261. "status": res.Status,
  262. "progress": gin.H{
  263. "collected": len(res.Participants),
  264. "total": res.Total,
  265. "queries_done": res.QueriesDone,
  266. "queries_total": res.QueriesTotal,
  267. },
  268. })
  269. }