|
@@ -2,6 +2,7 @@ package webcollector
|
|
|
|
|
|
|
|
import (
|
|
import (
|
|
|
"context"
|
|
"context"
|
|
|
|
|
+ "encoding/json"
|
|
|
"log"
|
|
"log"
|
|
|
"net/url"
|
|
"net/url"
|
|
|
"regexp"
|
|
"regexp"
|
|
@@ -9,6 +10,8 @@ import (
|
|
|
"sync/atomic"
|
|
"sync/atomic"
|
|
|
"time"
|
|
"time"
|
|
|
|
|
|
|
|
|
|
+ "github.com/redis/go-redis/v9"
|
|
|
|
|
+
|
|
|
"spider/internal/crawler"
|
|
"spider/internal/crawler"
|
|
|
"spider/internal/extractor"
|
|
"spider/internal/extractor"
|
|
|
"spider/internal/plugin"
|
|
"spider/internal/plugin"
|
|
@@ -16,6 +19,8 @@ import (
|
|
|
"spider/internal/search"
|
|
"spider/internal/search"
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
|
|
+const snapshotKey = "spider:webcollector:snapshot"
|
|
|
|
|
+
|
|
|
// Collector implements plugin.Collector for web-based merchant collection.
|
|
// Collector implements plugin.Collector for web-based merchant collection.
|
|
|
type Collector struct {
|
|
type Collector struct {
|
|
|
serper *search.SerperClient
|
|
serper *search.SerperClient
|
|
@@ -25,15 +30,17 @@ type Collector struct {
|
|
|
stopped atomic.Bool
|
|
stopped atomic.Bool
|
|
|
logger plugin.TaskLogger
|
|
logger plugin.TaskLogger
|
|
|
proxyPool *proxypool.Pool
|
|
proxyPool *proxypool.Pool
|
|
|
|
|
+ rdb *redis.Client
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func New(serper *search.SerperClient) *Collector {
|
|
|
|
|
|
|
+func New(serper *search.SerperClient, rdb *redis.Client) *Collector {
|
|
|
return &Collector{
|
|
return &Collector{
|
|
|
- serper: serper,
|
|
|
|
|
- static: crawler.NewStaticCrawler(),
|
|
|
|
|
- dynamic: crawler.NewDynamicCrawler(),
|
|
|
|
|
|
|
+ serper: serper,
|
|
|
|
|
+ static: crawler.NewStaticCrawler(),
|
|
|
|
|
+ dynamic: crawler.NewDynamicCrawler(),
|
|
|
tmeValidator: crawler.NewTMeValidator(),
|
|
tmeValidator: crawler.NewTMeValidator(),
|
|
|
- logger: plugin.NopLogger(),
|
|
|
|
|
|
|
+ logger: plugin.NopLogger(),
|
|
|
|
|
+ rdb: rdb,
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -67,12 +74,50 @@ func (c *Collector) Run(ctx context.Context, cfg map[string]any, callback func(p
|
|
|
return nil
|
|
return nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ // Stop conditions
|
|
|
|
|
+ maxMerchants, _ := cfg["max_merchants"].(int)
|
|
|
|
|
+ maxDurationMins, _ := cfg["max_duration_mins"].(int)
|
|
|
|
|
+ resumeSnapshot, _ := cfg["resume_snapshot"].(bool)
|
|
|
|
|
+
|
|
|
|
|
+ var deadline time.Time
|
|
|
|
|
+ if maxDurationMins > 0 {
|
|
|
|
|
+ deadline = time.Now().Add(time.Duration(maxDurationMins) * time.Minute)
|
|
|
|
|
+ log.Printf("[web_collector] will stop after %d minutes", maxDurationMins)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // Load or init snapshot
|
|
|
|
|
+ snapshot := c.loadSnapshot(ctx)
|
|
|
|
|
+ if !resumeSnapshot {
|
|
|
|
|
+ snapshot = map[string]bool{}
|
|
|
|
|
+ log.Println("[web_collector] starting fresh (snapshot cleared)")
|
|
|
|
|
+ } else if len(snapshot) > 0 {
|
|
|
|
|
+ log.Printf("[web_collector] resuming from snapshot, %d queries already done", len(snapshot))
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ merchantCount := 0
|
|
|
|
|
+ wrappedCallback := func(md plugin.MerchantData) {
|
|
|
|
|
+ callback(md)
|
|
|
|
|
+ merchantCount++
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
queries := expandSearchQueries(keywords)
|
|
queries := expandSearchQueries(keywords)
|
|
|
|
|
|
|
|
for _, q := range queries {
|
|
for _, q := range queries {
|
|
|
if c.stopped.Load() || ctx.Err() != nil {
|
|
if c.stopped.Load() || ctx.Err() != nil {
|
|
|
break
|
|
break
|
|
|
}
|
|
}
|
|
|
|
|
+ if maxMerchants > 0 && merchantCount >= maxMerchants {
|
|
|
|
|
+ log.Printf("[web_collector] reached max_merchants limit (%d), stopping", maxMerchants)
|
|
|
|
|
+ break
|
|
|
|
|
+ }
|
|
|
|
|
+ if !deadline.IsZero() && time.Now().After(deadline) {
|
|
|
|
|
+ log.Printf("[web_collector] reached max_duration limit, stopping")
|
|
|
|
|
+ break
|
|
|
|
|
+ }
|
|
|
|
|
+ if snapshot[q] {
|
|
|
|
|
+ log.Printf("[web_collector] skipping (snapshot): %s", q)
|
|
|
|
|
+ continue
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
// Rotate proxy for each query if using pool
|
|
// Rotate proxy for each query if using pool
|
|
|
c.rotateProxy()
|
|
c.rotateProxy()
|
|
@@ -90,7 +135,7 @@ func (c *Collector) Run(ctx context.Context, cfg map[string]any, callback func(p
|
|
|
for i, r := range results {
|
|
for i, r := range results {
|
|
|
c.logger.LogSearchResult(q+" [organic]", i+1, r.Title, r.URL, r.Snippet)
|
|
c.logger.LogSearchResult(q+" [organic]", i+1, r.Title, r.URL, r.Snippet)
|
|
|
}
|
|
}
|
|
|
- c.processResults(ctx, results, q, callback)
|
|
|
|
|
|
|
+ c.processResults(ctx, results, q, wrappedCallback)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
time.Sleep(1 * time.Second)
|
|
time.Sleep(1 * time.Second)
|
|
@@ -106,9 +151,13 @@ func (c *Collector) Run(ctx context.Context, cfg map[string]any, callback func(p
|
|
|
for i, r := range videoResults {
|
|
for i, r := range videoResults {
|
|
|
c.logger.LogSearchResult(q+" [video]", i+1, r.Title, r.URL, r.Snippet)
|
|
c.logger.LogSearchResult(q+" [video]", i+1, r.Title, r.URL, r.Snippet)
|
|
|
}
|
|
}
|
|
|
- c.processResults(ctx, videoResults, q, callback)
|
|
|
|
|
|
|
+ c.processResults(ctx, videoResults, q, wrappedCallback)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ // Mark query as done in snapshot
|
|
|
|
|
+ snapshot[q] = true
|
|
|
|
|
+ c.saveSnapshot(ctx, snapshot)
|
|
|
|
|
+
|
|
|
select {
|
|
select {
|
|
|
case <-ctx.Done():
|
|
case <-ctx.Done():
|
|
|
return nil
|
|
return nil
|
|
@@ -116,10 +165,50 @@ func (c *Collector) Run(ctx context.Context, cfg map[string]any, callback func(p
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- log.Println("[web_collector] done")
|
|
|
|
|
|
|
+ // If all queries done naturally, clear snapshot for next full run
|
|
|
|
|
+ allDone := true
|
|
|
|
|
+ for _, q := range queries {
|
|
|
|
|
+ if !snapshot[q] {
|
|
|
|
|
+ allDone = false
|
|
|
|
|
+ break
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ if allDone && c.rdb != nil {
|
|
|
|
|
+ c.rdb.Del(ctx, snapshotKey)
|
|
|
|
|
+ log.Println("[web_collector] all queries done, snapshot cleared")
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ log.Printf("[web_collector] done, collected %d merchants", merchantCount)
|
|
|
return nil
|
|
return nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+func (c *Collector) loadSnapshot(ctx context.Context) map[string]bool {
|
|
|
|
|
+ if c.rdb == nil {
|
|
|
|
|
+ return map[string]bool{}
|
|
|
|
|
+ }
|
|
|
|
|
+ data, err := c.rdb.Get(ctx, snapshotKey).Bytes()
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return map[string]bool{}
|
|
|
|
|
+ }
|
|
|
|
|
+ var snapshot map[string]bool
|
|
|
|
|
+ if err := json.Unmarshal(data, &snapshot); err != nil {
|
|
|
|
|
+ return map[string]bool{}
|
|
|
|
|
+ }
|
|
|
|
|
+ return snapshot
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (c *Collector) saveSnapshot(ctx context.Context, snapshot map[string]bool) {
|
|
|
|
|
+ if c.rdb == nil {
|
|
|
|
|
+ return
|
|
|
|
|
+ }
|
|
|
|
|
+ data, err := json.Marshal(snapshot)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return
|
|
|
|
|
+ }
|
|
|
|
|
+ // Keep snapshot for 7 days
|
|
|
|
|
+ c.rdb.Set(ctx, snapshotKey, data, 7*24*time.Hour)
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
// processResults handles search results with full logging at every node.
|
|
// processResults handles search results with full logging at every node.
|
|
|
func (c *Collector) processResults(ctx context.Context, results []search.SearchResult, query string, callback func(plugin.MerchantData)) {
|
|
func (c *Collector) processResults(ctx context.Context, results []search.SearchResult, query string, callback func(plugin.MerchantData)) {
|
|
|
for _, r := range results {
|
|
for _, r := range results {
|