package githubcollector import ( "context" "encoding/json" "fmt" "io" "log" "net/http" "net/url" "regexp" "strings" "sync" "sync/atomic" "time" "spider/internal/extractor" "spider/internal/model" "spider/internal/plugin" proxypool "spider/internal/proxy" "spider/internal/store" ) // Collector implements plugin.Collector for GitHub README mining. // Searches GitHub repos by keywords, extracts t.me links from READMEs. type Collector struct { token string // GitHub token (optional) store *store.Store http *http.Client stopped atomic.Bool logger plugin.TaskLogger proxyPool *proxypool.Pool mu sync.Mutex currentProxy string // current proxy URL for health reporting } // New creates a new GitHub collector. func New(token string, s *store.Store) *Collector { return &Collector{ token: token, store: s, http: &http.Client{Timeout: 15 * time.Second}, logger: plugin.NopLogger(), } } func (c *Collector) Name() string { return "github_collector" } func (c *Collector) SetLogger(l plugin.TaskLogger) { c.logger = l } func (c *Collector) Stop() error { c.stopped.Store(true) return nil } // Run searches GitHub for repos matching keywords, extracts t.me links from READMEs. // // cfg keys: // - "keywords": []string — search keywords // - "repos_limit": int — max repos to process (default 50) func (c *Collector) Run(ctx context.Context, cfg map[string]any, callback func(plugin.MerchantData)) error { c.stopped.Store(false) // Apply proxy pool or single proxy if pool, ok := cfg["proxy_pool"].(*proxypool.Pool); ok && pool != nil { c.proxyPool = pool log.Printf("[github_collector] using proxy pool with %d proxies", pool.Size()) // Use a single Transport with dynamic proxy function c.http.Transport = &http.Transport{ Proxy: func(req *http.Request) (*url.URL, error) { c.mu.Lock() p := c.currentProxy c.mu.Unlock() if p == "" { return nil, nil } return url.Parse(p) }, MaxIdleConnsPerHost: 2, IdleConnTimeout: 30 * time.Second, } c.rotateProxy() } else if proxyURL, ok := cfg["proxy_url"].(string); ok && proxyURL != "" { pURL, err := url.Parse(proxyURL) if err == nil { c.http.Transport = &http.Transport{Proxy: http.ProxyURL(pURL)} log.Printf("[github_collector] using proxy: %s", proxyURL) } } keywords, _ := cfg["keywords"].([]string) if len(keywords) == 0 { log.Println("[github_collector] no keywords provided") return nil } reposLimit := 50 if v, ok := cfg["repos_limit"].(int); ok && v > 0 { reposLimit = v } queries := make([]string, 0, len(keywords)*2) for _, kw := range keywords { queries = append(queries, fmt.Sprintf("%s telegram", kw)) queries = append(queries, fmt.Sprintf("%s t.me", kw)) } reposPerQuery := 1 if len(queries) > 0 { reposPerQuery = reposLimit/len(queries) + 1 } found := 0 for _, query := range queries { if c.stopped.Load() || ctx.Err() != nil { break } log.Printf("[github_collector] searching: %s", query) repos, err := c.searchRepos(ctx, query, reposPerQuery) if err != nil { log.Printf("[github_collector] search error: %v", err) c.logger.LogError("search", "", err.Error()) continue } for i, repo := range repos { c.logger.LogSearchResult(query, i+1, repo, fmt.Sprintf("https://github.com/%s", repo), "") } for _, repo := range repos { if c.stopped.Load() || ctx.Err() != nil { break } c.rotateProxy() t1 := time.Now() readme, err := c.fetchReadme(ctx, repo) if err != nil { c.logger.LogCrawlPage("github://"+repo+"/README.md", "", 0, "", nil, 0, err, time.Since(t1)) continue } // Relaxed: only skip READMEs that have no t.me links AND no Chinese preview := readme if len(preview) > 5000 { preview = preview[:5000] } links := extractTMeLinks(readme) readmeSample := readme if len(readmeSample) > 2000 { readmeSample = readmeSample[:2000] } c.logger.LogCrawlPage("github://"+repo+"/README.md", "", 0, readmeSample, links, 0, nil, time.Since(t1)) if len(links) == 0 && !extractor.ContainsChinese(preview, 0) { c.logger.LogSkip("crawl", "github://"+repo, "no_tme_no_chinese") continue } // Process t.me links for _, link := range links { username := extractTGUsername(link) if username == "" { continue } // Save channel to DB c.store.UpsertChannel(&model.Channel{ Username: username, Source: "github", Status: "pending", }) md := plugin.MerchantData{ TgUsername: username, TgLink: "https://t.me/" + username, SourceType: "github", SourceName: repo, SourceURL: fmt.Sprintf("https://github.com/%s", repo), } c.logger.LogMerchantFound(md, "github_readme", 0, "") callback(md) found++ } // Delay between repos select { case <-ctx.Done(): return nil case <-time.After(2 * time.Second): } } // Delay between queries select { case <-ctx.Done(): return nil case <-time.After(5 * time.Second): } } log.Printf("[github_collector] done: %d channels found", found) return nil } // rotateProxy switches to the next proxy in the pool. // Returns the proxy URL being used for health reporting. func (c *Collector) rotateProxy() string { if c.proxyPool == nil { return "" } next := c.proxyPool.Next() c.mu.Lock() c.currentProxy = next c.mu.Unlock() if next != "" { log.Printf("[github_collector] rotated to proxy: %s", next) } return next } func (c *Collector) searchRepos(ctx context.Context, query string, limit int) ([]string, error) { perPage := limit if perPage > 30 { perPage = 30 } apiURL := fmt.Sprintf("https://api.github.com/search/repositories?q=%s&sort=stars&per_page=%d", url.QueryEscape(query), perPage) req, err := http.NewRequestWithContext(ctx, "GET", apiURL, nil) if err != nil { return nil, err } req.Header.Set("Accept", "application/vnd.github.v3+json") if c.token != "" { req.Header.Set("Authorization", "token "+c.token) } resp, err := c.http.Do(req) if err != nil { c.reportResult(err) return nil, err } defer resp.Body.Close() c.reportResult(nil) var result struct { Items []struct { FullName string `json:"full_name"` } `json:"items"` } if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { return nil, err } var repos []string for _, item := range result.Items { repos = append(repos, item.FullName) } return repos, nil } // reportResult reports proxy health to the pool. func (c *Collector) reportResult(err error) { if c.proxyPool == nil { return } c.mu.Lock() proxy := c.currentProxy c.mu.Unlock() if proxy == "" { return } if err != nil { c.proxyPool.ReportFailure(proxy) } else { c.proxyPool.ReportSuccess(proxy) } } func (c *Collector) fetchReadme(ctx context.Context, fullName string) (string, error) { rawURL := fmt.Sprintf("https://raw.githubusercontent.com/%s/main/README.md", fullName) req, err := http.NewRequestWithContext(ctx, "GET", rawURL, nil) if err != nil { return "", err } if c.token != "" { req.Header.Set("Authorization", "token "+c.token) } resp, err := c.http.Do(req) if err != nil { return "", err } defer resp.Body.Close() if resp.StatusCode == 404 { masterURL := strings.Replace(rawURL, "/main/", "/master/", 1) req2, err := http.NewRequestWithContext(ctx, "GET", masterURL, nil) if err != nil { return "", err } if c.token != "" { req2.Header.Set("Authorization", "token "+c.token) } resp2, err := c.http.Do(req2) if err != nil { return "", err } defer resp2.Body.Close() data, _ := io.ReadAll(resp2.Body) return string(data), nil } data, _ := io.ReadAll(resp.Body) return string(data), nil } var reTMeLink = regexp.MustCompile(`https?://t(?:elegram)?\.me/[a-zA-Z][a-zA-Z0-9_]{4,31}`) var reTGUsername = regexp.MustCompile(`t(?:elegram)?\.me/([a-zA-Z][a-zA-Z0-9_]{4,31})`) func extractTMeLinks(text string) []string { return reTMeLink.FindAllString(text, -1) } func extractTGUsername(link string) string { m := reTGUsername.FindStringSubmatch(link) if len(m) > 1 { return m[1] } return "" }