detail_logger.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. package task
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "strings"
  6. "sync/atomic"
  7. "time"
  8. "gorm.io/gorm"
  9. "spider/internal/model"
  10. "spider/internal/plugin"
  11. )
  12. // DetailLogger records every operation within a task execution to the database.
  13. // Thread-safe: can be called concurrently.
  14. type DetailLogger struct {
  15. db *gorm.DB
  16. taskID uint
  17. seq atomic.Int64
  18. buf chan model.TaskDetail
  19. done chan struct{}
  20. }
  21. // NewDetailLogger creates a new logger that batches writes to the database.
  22. func NewDetailLogger(db *gorm.DB, taskID uint) *DetailLogger {
  23. dl := &DetailLogger{
  24. db: db,
  25. taskID: taskID,
  26. buf: make(chan model.TaskDetail, 500),
  27. done: make(chan struct{}),
  28. }
  29. go dl.flushLoop()
  30. return dl
  31. }
  32. func (dl *DetailLogger) write(action, url, parentURL string, depth int, input, output, status, extra string, duration time.Duration) {
  33. seq := int(dl.seq.Add(1))
  34. detail := model.TaskDetail{
  35. TaskID: dl.taskID,
  36. Seq: seq,
  37. Action: action,
  38. URL: truncStr(url, 2000),
  39. ParentURL: truncStr(parentURL, 2000),
  40. Depth: depth,
  41. Input: truncStr(input, 50000),
  42. Output: truncStr(output, 50000),
  43. Status: status,
  44. Duration: int(duration.Milliseconds()),
  45. Extra: truncStr(extra, 5000),
  46. }
  47. select {
  48. case dl.buf <- detail:
  49. default:
  50. dl.db.Create(&detail)
  51. }
  52. }
  53. // LogSearchResult records each individual serper result.
  54. func (dl *DetailLogger) LogSearchResult(query string, position int, title, link, snippet string) {
  55. input := fmt.Sprintf("query: %s\nposition: %d", query, position)
  56. output := fmt.Sprintf("title: %s\nlink: %s\nsnippet: %s", title, link, snippet)
  57. dl.write("search_result", link, "", 0, input, output, "ok", "", 0)
  58. }
  59. // LogCrawlPage records a page fetch with content summary.
  60. func (dl *DetailLogger) LogCrawlPage(url, parentURL string, depth int, htmlSummary string, tgLinks []string, allLinksCount int, err error, dur time.Duration) {
  61. status := "ok"
  62. extra := ""
  63. if err != nil {
  64. status = "error"
  65. extra = err.Error()
  66. }
  67. output := fmt.Sprintf("tg_links_found: %d\nall_links_found: %d", len(tgLinks), allLinksCount)
  68. if len(tgLinks) > 0 {
  69. output += "\ntg_links:\n " + strings.Join(tgLinks, "\n ")
  70. }
  71. dl.write("crawl", url, parentURL, depth, htmlSummary, output, status, extra, dur)
  72. }
  73. // LogSnippetExtract records extraction from a snippet.
  74. func (dl *DetailLogger) LogSnippetExtract(sourceURL, rawText string, extracted []string) {
  75. status := "ok"
  76. if len(extracted) == 0 {
  77. status = "empty"
  78. }
  79. dl.write("snippet_extract", sourceURL, "", 0, rawText, toJSON(extracted), status, "", 0)
  80. }
  81. // LogPageExtract records extraction from a crawled page.
  82. func (dl *DetailLogger) LogPageExtract(pageURL, parentURL string, depth int, contentSample string, extracted []string) {
  83. status := "ok"
  84. if len(extracted) == 0 {
  85. status = "empty"
  86. }
  87. dl.write("page_extract", pageURL, parentURL, depth, contentSample, toJSON(extracted), status, "", 0)
  88. }
  89. // LogMerchantFound records a merchant being produced with full data.
  90. func (dl *DetailLogger) LogMerchantFound(data plugin.MerchantData, sourceAction string, depth int, parentURL string) {
  91. dl.write("merchant_found", data.SourceURL, parentURL, depth,
  92. fmt.Sprintf("tg: @%s\nname: %s\nwebsite: %s\nemail: %s\nphone: %s\nindustry: %s",
  93. data.TgUsername, data.MerchantName, data.Website, data.Email, data.Phone, data.IndustryTag),
  94. sourceAction,
  95. "ok",
  96. fmt.Sprintf("source_type: %s\nsource_name: %s", data.SourceType, data.SourceName),
  97. 0)
  98. }
  99. // LogCleanStep records a cleaning pipeline decision.
  100. func (dl *DetailLogger) LogCleanStep(tgUsername, step, decision, reason string) {
  101. dl.write("clean_"+step, "", "", 0,
  102. fmt.Sprintf("@%s", tgUsername),
  103. fmt.Sprintf("decision: %s\nreason: %s", decision, reason),
  104. "ok", "", 0)
  105. }
  106. // LogSkip records a skipped operation.
  107. func (dl *DetailLogger) LogSkip(action, url, reason string) {
  108. dl.write(action, url, "", 0, "", reason, "skip", "", 0)
  109. }
  110. // LogError records an error.
  111. func (dl *DetailLogger) LogError(action, url, errMsg string) {
  112. dl.write(action, url, "", 0, "", "", "error", errMsg, 0)
  113. }
  114. // Close flushes remaining logs and stops the background writer.
  115. func (dl *DetailLogger) Close() {
  116. close(dl.buf)
  117. <-dl.done
  118. }
  119. func (dl *DetailLogger) flushLoop() {
  120. defer close(dl.done)
  121. batch := make([]model.TaskDetail, 0, 50)
  122. ticker := time.NewTicker(2 * time.Second)
  123. defer ticker.Stop()
  124. flush := func() {
  125. if len(batch) == 0 {
  126. return
  127. }
  128. dl.db.CreateInBatches(batch, 50)
  129. batch = batch[:0]
  130. }
  131. for {
  132. select {
  133. case detail, ok := <-dl.buf:
  134. if !ok {
  135. flush()
  136. return
  137. }
  138. batch = append(batch, detail)
  139. if len(batch) >= 50 {
  140. flush()
  141. }
  142. case <-ticker.C:
  143. flush()
  144. }
  145. }
  146. }
  147. func truncStr(s string, maxLen int) string {
  148. if len(s) <= maxLen {
  149. return s
  150. }
  151. return s[:maxLen]
  152. }
  153. func toJSON(v any) string {
  154. b, _ := json.Marshal(v)
  155. return string(b)
  156. }