package task import ( "encoding/json" "fmt" "strings" "sync/atomic" "time" "gorm.io/gorm" "spider/internal/model" "spider/internal/plugin" ) // DetailLogger records every operation within a task execution to the database. // Thread-safe: can be called concurrently. type DetailLogger struct { db *gorm.DB taskID uint seq atomic.Int64 buf chan model.TaskDetail done chan struct{} } // NewDetailLogger creates a new logger that batches writes to the database. func NewDetailLogger(db *gorm.DB, taskID uint) *DetailLogger { dl := &DetailLogger{ db: db, taskID: taskID, buf: make(chan model.TaskDetail, 500), done: make(chan struct{}), } go dl.flushLoop() return dl } func (dl *DetailLogger) write(action, url, parentURL string, depth int, input, output, status, extra string, duration time.Duration) { seq := int(dl.seq.Add(1)) detail := model.TaskDetail{ TaskID: dl.taskID, Seq: seq, Action: action, URL: truncStr(url, 2000), ParentURL: truncStr(parentURL, 2000), Depth: depth, Input: truncStr(input, 50000), Output: truncStr(output, 50000), Status: status, Duration: int(duration.Milliseconds()), Extra: truncStr(extra, 5000), } select { case dl.buf <- detail: default: dl.db.Create(&detail) } } // LogSearchResult records each individual serper result. func (dl *DetailLogger) LogSearchResult(query string, position int, title, link, snippet string) { input := fmt.Sprintf("query: %s\nposition: %d", query, position) output := fmt.Sprintf("title: %s\nlink: %s\nsnippet: %s", title, link, snippet) dl.write("search_result", link, "", 0, input, output, "ok", "", 0) } // LogCrawlPage records a page fetch with content summary. func (dl *DetailLogger) LogCrawlPage(url, parentURL string, depth int, htmlSummary string, tgLinks []string, allLinksCount int, err error, dur time.Duration) { status := "ok" extra := "" if err != nil { status = "error" extra = err.Error() } output := fmt.Sprintf("tg_links_found: %d\nall_links_found: %d", len(tgLinks), allLinksCount) if len(tgLinks) > 0 { output += "\ntg_links:\n " + strings.Join(tgLinks, "\n ") } dl.write("crawl", url, parentURL, depth, htmlSummary, output, status, extra, dur) } // LogSnippetExtract records extraction from a snippet. func (dl *DetailLogger) LogSnippetExtract(sourceURL, rawText string, extracted []string) { status := "ok" if len(extracted) == 0 { status = "empty" } dl.write("snippet_extract", sourceURL, "", 0, rawText, toJSON(extracted), status, "", 0) } // LogPageExtract records extraction from a crawled page. func (dl *DetailLogger) LogPageExtract(pageURL, parentURL string, depth int, contentSample string, extracted []string) { status := "ok" if len(extracted) == 0 { status = "empty" } dl.write("page_extract", pageURL, parentURL, depth, contentSample, toJSON(extracted), status, "", 0) } // LogMerchantFound records a merchant being produced with full data. func (dl *DetailLogger) LogMerchantFound(data plugin.MerchantData, sourceAction string, depth int, parentURL string) { dl.write("merchant_found", data.SourceURL, parentURL, depth, fmt.Sprintf("tg: @%s\nname: %s\nwebsite: %s\nemail: %s\nphone: %s\nindustry: %s", data.TgUsername, data.MerchantName, data.Website, data.Email, data.Phone, data.IndustryTag), sourceAction, "ok", fmt.Sprintf("source_type: %s\nsource_name: %s", data.SourceType, data.SourceName), 0) } // LogCleanStep records a cleaning pipeline decision. func (dl *DetailLogger) LogCleanStep(tgUsername, step, decision, reason string) { dl.write("clean_"+step, "", "", 0, fmt.Sprintf("@%s", tgUsername), fmt.Sprintf("decision: %s\nreason: %s", decision, reason), "ok", "", 0) } // LogSkip records a skipped operation. func (dl *DetailLogger) LogSkip(action, url, reason string) { dl.write(action, url, "", 0, "", reason, "skip", "", 0) } // LogError records an error. func (dl *DetailLogger) LogError(action, url, errMsg string) { dl.write(action, url, "", 0, "", "", "error", errMsg, 0) } // Close flushes remaining logs and stops the background writer. func (dl *DetailLogger) Close() { close(dl.buf) <-dl.done } func (dl *DetailLogger) flushLoop() { defer close(dl.done) batch := make([]model.TaskDetail, 0, 50) ticker := time.NewTicker(2 * time.Second) defer ticker.Stop() flush := func() { if len(batch) == 0 { return } dl.db.CreateInBatches(batch, 50) batch = batch[:0] } for { select { case detail, ok := <-dl.buf: if !ok { flush() return } batch = append(batch, detail) if len(batch) >= 50 { flush() } case <-ticker.C: flush() } } } func truncStr(s string, maxLen int) string { if len(s) <= maxLen { return s } return s[:maxLen] } func toJSON(v any) string { b, _ := json.Marshal(v) return string(b) }