collector.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334
  1. package githubcollector
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "log"
  8. "net/http"
  9. "net/url"
  10. "regexp"
  11. "strings"
  12. "sync"
  13. "sync/atomic"
  14. "time"
  15. "spider/internal/extractor"
  16. "spider/internal/model"
  17. "spider/internal/plugin"
  18. proxypool "spider/internal/proxy"
  19. "spider/internal/store"
  20. )
  21. // Collector implements plugin.Collector for GitHub README mining.
  22. // Searches GitHub repos by keywords, extracts t.me links from READMEs.
  23. type Collector struct {
  24. token string // GitHub token (optional)
  25. store *store.Store
  26. http *http.Client
  27. stopped atomic.Bool
  28. logger plugin.TaskLogger
  29. proxyPool *proxypool.Pool
  30. mu sync.Mutex
  31. currentProxy string // current proxy URL for health reporting
  32. }
  33. // New creates a new GitHub collector.
  34. func New(token string, s *store.Store) *Collector {
  35. return &Collector{
  36. token: token,
  37. store: s,
  38. http: &http.Client{Timeout: 15 * time.Second},
  39. logger: plugin.NopLogger(),
  40. }
  41. }
  42. func (c *Collector) Name() string { return "github_collector" }
  43. func (c *Collector) SetLogger(l plugin.TaskLogger) { c.logger = l }
  44. func (c *Collector) Stop() error {
  45. c.stopped.Store(true)
  46. return nil
  47. }
  48. // Run searches GitHub for repos matching keywords, extracts t.me links from READMEs.
  49. //
  50. // cfg keys:
  51. // - "keywords": []string — search keywords
  52. // - "repos_limit": int — max repos to process (default 50)
  53. func (c *Collector) Run(ctx context.Context, cfg map[string]any, callback func(plugin.MerchantData)) error {
  54. c.stopped.Store(false)
  55. // Apply proxy pool or single proxy
  56. if pool, ok := cfg["proxy_pool"].(*proxypool.Pool); ok && pool != nil {
  57. c.proxyPool = pool
  58. log.Printf("[github_collector] using proxy pool with %d proxies", pool.Size())
  59. // Use a single Transport with dynamic proxy function
  60. c.http.Transport = &http.Transport{
  61. Proxy: func(req *http.Request) (*url.URL, error) {
  62. c.mu.Lock()
  63. p := c.currentProxy
  64. c.mu.Unlock()
  65. if p == "" {
  66. return nil, nil
  67. }
  68. return url.Parse(p)
  69. },
  70. MaxIdleConnsPerHost: 2,
  71. IdleConnTimeout: 30 * time.Second,
  72. }
  73. c.rotateProxy()
  74. } else if proxyURL, ok := cfg["proxy_url"].(string); ok && proxyURL != "" {
  75. pURL, err := url.Parse(proxyURL)
  76. if err == nil {
  77. c.http.Transport = &http.Transport{Proxy: http.ProxyURL(pURL)}
  78. log.Printf("[github_collector] using proxy: %s", proxyURL)
  79. }
  80. }
  81. keywords, _ := cfg["keywords"].([]string)
  82. if len(keywords) == 0 {
  83. log.Println("[github_collector] no keywords provided")
  84. return nil
  85. }
  86. reposLimit := 50
  87. if v, ok := cfg["repos_limit"].(int); ok && v > 0 {
  88. reposLimit = v
  89. }
  90. queries := make([]string, 0, len(keywords)*2)
  91. for _, kw := range keywords {
  92. queries = append(queries, fmt.Sprintf("%s telegram", kw))
  93. queries = append(queries, fmt.Sprintf("%s t.me", kw))
  94. }
  95. reposPerQuery := 1
  96. if len(queries) > 0 {
  97. reposPerQuery = reposLimit/len(queries) + 1
  98. }
  99. found := 0
  100. for _, query := range queries {
  101. if c.stopped.Load() || ctx.Err() != nil {
  102. break
  103. }
  104. log.Printf("[github_collector] searching: %s", query)
  105. repos, err := c.searchRepos(ctx, query, reposPerQuery)
  106. if err != nil {
  107. log.Printf("[github_collector] search error: %v", err)
  108. c.logger.LogError("search", "", err.Error())
  109. continue
  110. }
  111. for i, repo := range repos {
  112. c.logger.LogSearchResult(query, i+1, repo, fmt.Sprintf("https://github.com/%s", repo), "")
  113. }
  114. for _, repo := range repos {
  115. if c.stopped.Load() || ctx.Err() != nil {
  116. break
  117. }
  118. c.rotateProxy()
  119. t1 := time.Now()
  120. readme, err := c.fetchReadme(ctx, repo)
  121. if err != nil {
  122. c.logger.LogCrawlPage("github://"+repo+"/README.md", "", 0, "", nil, 0, err, time.Since(t1))
  123. continue
  124. }
  125. // Relaxed: only skip READMEs that have no t.me links AND no Chinese
  126. preview := readme
  127. if len(preview) > 5000 {
  128. preview = preview[:5000]
  129. }
  130. links := extractTMeLinks(readme)
  131. readmeSample := readme
  132. if len(readmeSample) > 2000 {
  133. readmeSample = readmeSample[:2000]
  134. }
  135. c.logger.LogCrawlPage("github://"+repo+"/README.md", "", 0, readmeSample, links, 0, nil, time.Since(t1))
  136. if len(links) == 0 && !extractor.ContainsChinese(preview, 0) {
  137. c.logger.LogSkip("crawl", "github://"+repo, "no_tme_no_chinese")
  138. continue
  139. }
  140. // Process t.me links
  141. for _, link := range links {
  142. username := extractTGUsername(link)
  143. if username == "" {
  144. continue
  145. }
  146. // Save channel to DB
  147. c.store.UpsertChannel(&model.Channel{
  148. Username: username,
  149. Source: "github",
  150. Status: "pending",
  151. })
  152. md := plugin.MerchantData{
  153. TgUsername: username,
  154. TgLink: "https://t.me/" + username,
  155. SourceType: "github",
  156. SourceName: repo,
  157. SourceURL: fmt.Sprintf("https://github.com/%s", repo),
  158. }
  159. c.logger.LogMerchantFound(md, "github_readme", 0, "")
  160. callback(md)
  161. found++
  162. }
  163. // Delay between repos
  164. select {
  165. case <-ctx.Done():
  166. return nil
  167. case <-time.After(2 * time.Second):
  168. }
  169. }
  170. // Delay between queries
  171. select {
  172. case <-ctx.Done():
  173. return nil
  174. case <-time.After(5 * time.Second):
  175. }
  176. }
  177. log.Printf("[github_collector] done: %d channels found", found)
  178. return nil
  179. }
  180. // rotateProxy switches to the next proxy in the pool.
  181. // Returns the proxy URL being used for health reporting.
  182. func (c *Collector) rotateProxy() string {
  183. if c.proxyPool == nil {
  184. return ""
  185. }
  186. next := c.proxyPool.Next()
  187. c.mu.Lock()
  188. c.currentProxy = next
  189. c.mu.Unlock()
  190. if next != "" {
  191. log.Printf("[github_collector] rotated to proxy: %s", next)
  192. }
  193. return next
  194. }
  195. func (c *Collector) searchRepos(ctx context.Context, query string, limit int) ([]string, error) {
  196. perPage := limit
  197. if perPage > 30 {
  198. perPage = 30
  199. }
  200. apiURL := fmt.Sprintf("https://api.github.com/search/repositories?q=%s&sort=stars&per_page=%d",
  201. url.QueryEscape(query), perPage)
  202. req, err := http.NewRequestWithContext(ctx, "GET", apiURL, nil)
  203. if err != nil {
  204. return nil, err
  205. }
  206. req.Header.Set("Accept", "application/vnd.github.v3+json")
  207. if c.token != "" {
  208. req.Header.Set("Authorization", "token "+c.token)
  209. }
  210. resp, err := c.http.Do(req)
  211. if err != nil {
  212. c.reportResult(err)
  213. return nil, err
  214. }
  215. defer resp.Body.Close()
  216. c.reportResult(nil)
  217. var result struct {
  218. Items []struct {
  219. FullName string `json:"full_name"`
  220. } `json:"items"`
  221. }
  222. if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
  223. return nil, err
  224. }
  225. var repos []string
  226. for _, item := range result.Items {
  227. repos = append(repos, item.FullName)
  228. }
  229. return repos, nil
  230. }
  231. // reportResult reports proxy health to the pool.
  232. func (c *Collector) reportResult(err error) {
  233. if c.proxyPool == nil {
  234. return
  235. }
  236. c.mu.Lock()
  237. proxy := c.currentProxy
  238. c.mu.Unlock()
  239. if proxy == "" {
  240. return
  241. }
  242. if err != nil {
  243. c.proxyPool.ReportFailure(proxy)
  244. } else {
  245. c.proxyPool.ReportSuccess(proxy)
  246. }
  247. }
  248. func (c *Collector) fetchReadme(ctx context.Context, fullName string) (string, error) {
  249. rawURL := fmt.Sprintf("https://raw.githubusercontent.com/%s/main/README.md", fullName)
  250. req, err := http.NewRequestWithContext(ctx, "GET", rawURL, nil)
  251. if err != nil {
  252. return "", err
  253. }
  254. if c.token != "" {
  255. req.Header.Set("Authorization", "token "+c.token)
  256. }
  257. resp, err := c.http.Do(req)
  258. if err != nil {
  259. return "", err
  260. }
  261. defer resp.Body.Close()
  262. if resp.StatusCode == 404 {
  263. masterURL := strings.Replace(rawURL, "/main/", "/master/", 1)
  264. req2, err := http.NewRequestWithContext(ctx, "GET", masterURL, nil)
  265. if err != nil {
  266. return "", err
  267. }
  268. if c.token != "" {
  269. req2.Header.Set("Authorization", "token "+c.token)
  270. }
  271. resp2, err := c.http.Do(req2)
  272. if err != nil {
  273. return "", err
  274. }
  275. defer resp2.Body.Close()
  276. data, _ := io.ReadAll(resp2.Body)
  277. return string(data), nil
  278. }
  279. data, _ := io.ReadAll(resp.Body)
  280. return string(data), nil
  281. }
  282. var reTMeLink = regexp.MustCompile(`https?://t(?:elegram)?\.me/[a-zA-Z][a-zA-Z0-9_]{4,31}`)
  283. var reTGUsername = regexp.MustCompile(`t(?:elegram)?\.me/([a-zA-Z][a-zA-Z0-9_]{4,31})`)
  284. func extractTMeLinks(text string) []string {
  285. return reTMeLink.FindAllString(text, -1)
  286. }
  287. func extractTGUsername(link string) string {
  288. m := reTGUsername.FindStringSubmatch(link)
  289. if len(m) > 1 {
  290. return m[1]
  291. }
  292. return ""
  293. }