| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176 |
- package task
- import (
- "encoding/json"
- "fmt"
- "strings"
- "sync/atomic"
- "time"
- "gorm.io/gorm"
- "spider/internal/model"
- "spider/internal/plugin"
- )
- // DetailLogger records every operation within a task execution to the database.
- // Thread-safe: can be called concurrently.
- type DetailLogger struct {
- db *gorm.DB
- taskID uint
- seq atomic.Int64
- buf chan model.TaskDetail
- done chan struct{}
- }
- // NewDetailLogger creates a new logger that batches writes to the database.
- func NewDetailLogger(db *gorm.DB, taskID uint) *DetailLogger {
- dl := &DetailLogger{
- db: db,
- taskID: taskID,
- buf: make(chan model.TaskDetail, 500),
- done: make(chan struct{}),
- }
- go dl.flushLoop()
- return dl
- }
- func (dl *DetailLogger) write(action, url, parentURL string, depth int, input, output, status, extra string, duration time.Duration) {
- seq := int(dl.seq.Add(1))
- detail := model.TaskDetail{
- TaskID: dl.taskID,
- Seq: seq,
- Action: action,
- URL: truncStr(url, 2000),
- ParentURL: truncStr(parentURL, 2000),
- Depth: depth,
- Input: truncStr(input, 50000),
- Output: truncStr(output, 50000),
- Status: status,
- Duration: int(duration.Milliseconds()),
- Extra: truncStr(extra, 5000),
- }
- select {
- case dl.buf <- detail:
- default:
- dl.db.Create(&detail)
- }
- }
- // LogSearchResult records each individual serper result.
- func (dl *DetailLogger) LogSearchResult(query string, position int, title, link, snippet string) {
- input := fmt.Sprintf("query: %s\nposition: %d", query, position)
- output := fmt.Sprintf("title: %s\nlink: %s\nsnippet: %s", title, link, snippet)
- dl.write("search_result", link, "", 0, input, output, "ok", "", 0)
- }
- // LogCrawlPage records a page fetch with content summary.
- func (dl *DetailLogger) LogCrawlPage(url, parentURL string, depth int, htmlSummary string, tgLinks []string, allLinksCount int, err error, dur time.Duration) {
- status := "ok"
- extra := ""
- if err != nil {
- status = "error"
- extra = err.Error()
- }
- output := fmt.Sprintf("tg_links_found: %d\nall_links_found: %d", len(tgLinks), allLinksCount)
- if len(tgLinks) > 0 {
- output += "\ntg_links:\n " + strings.Join(tgLinks, "\n ")
- }
- dl.write("crawl", url, parentURL, depth, htmlSummary, output, status, extra, dur)
- }
- // LogSnippetExtract records extraction from a snippet.
- func (dl *DetailLogger) LogSnippetExtract(sourceURL, rawText string, extracted []string) {
- status := "ok"
- if len(extracted) == 0 {
- status = "empty"
- }
- dl.write("snippet_extract", sourceURL, "", 0, rawText, toJSON(extracted), status, "", 0)
- }
- // LogPageExtract records extraction from a crawled page.
- func (dl *DetailLogger) LogPageExtract(pageURL, parentURL string, depth int, contentSample string, extracted []string) {
- status := "ok"
- if len(extracted) == 0 {
- status = "empty"
- }
- dl.write("page_extract", pageURL, parentURL, depth, contentSample, toJSON(extracted), status, "", 0)
- }
- // LogMerchantFound records a merchant being produced with full data.
- func (dl *DetailLogger) LogMerchantFound(data plugin.MerchantData, sourceAction string, depth int, parentURL string) {
- dl.write("merchant_found", data.SourceURL, parentURL, depth,
- fmt.Sprintf("tg: @%s\nname: %s\nwebsite: %s\nemail: %s\nphone: %s\nindustry: %s",
- data.TgUsername, data.MerchantName, data.Website, data.Email, data.Phone, data.IndustryTag),
- sourceAction,
- "ok",
- fmt.Sprintf("source_type: %s\nsource_name: %s", data.SourceType, data.SourceName),
- 0)
- }
- // LogCleanStep records a cleaning pipeline decision.
- func (dl *DetailLogger) LogCleanStep(tgUsername, step, decision, reason string) {
- dl.write("clean_"+step, "", "", 0,
- fmt.Sprintf("@%s", tgUsername),
- fmt.Sprintf("decision: %s\nreason: %s", decision, reason),
- "ok", "", 0)
- }
- // LogSkip records a skipped operation.
- func (dl *DetailLogger) LogSkip(action, url, reason string) {
- dl.write(action, url, "", 0, "", reason, "skip", "", 0)
- }
- // LogError records an error.
- func (dl *DetailLogger) LogError(action, url, errMsg string) {
- dl.write(action, url, "", 0, "", "", "error", errMsg, 0)
- }
- // Close flushes remaining logs and stops the background writer.
- func (dl *DetailLogger) Close() {
- close(dl.buf)
- <-dl.done
- }
- func (dl *DetailLogger) flushLoop() {
- defer close(dl.done)
- batch := make([]model.TaskDetail, 0, 50)
- ticker := time.NewTicker(2 * time.Second)
- defer ticker.Stop()
- flush := func() {
- if len(batch) == 0 {
- return
- }
- dl.db.CreateInBatches(batch, 50)
- batch = batch[:0]
- }
- for {
- select {
- case detail, ok := <-dl.buf:
- if !ok {
- flush()
- return
- }
- batch = append(batch, detail)
- if len(batch) >= 50 {
- flush()
- }
- case <-ticker.C:
- flush()
- }
- }
- }
- func truncStr(s string, maxLen int) string {
- if len(s) <= maxLen {
- return s
- }
- return s[:maxLen]
- }
- func toJSON(v any) string {
- b, _ := json.Marshal(v)
- return string(b)
- }
|