notifier.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. package notification
  2. import (
  3. "bytes"
  4. "crypto/hmac"
  5. "crypto/sha256"
  6. "encoding/hex"
  7. "encoding/json"
  8. "fmt"
  9. "log"
  10. "net"
  11. "net/http"
  12. "net/url"
  13. "strings"
  14. "time"
  15. "spider/internal/model"
  16. "gorm.io/gorm"
  17. )
  18. // Event represents something that happened in the system.
  19. type Event struct {
  20. Type string `json:"type"` // task_completed, task_failed, new_hot_merchant, schedule_run
  21. Title string `json:"title"`
  22. Message string `json:"message"`
  23. Data interface{} `json:"data,omitempty"`
  24. }
  25. // Manager dispatches events to configured notification channels.
  26. type Manager struct {
  27. db *gorm.DB
  28. }
  29. // NewManager creates a new notification manager.
  30. func NewManager(db *gorm.DB) *Manager {
  31. return &Manager{db: db}
  32. }
  33. // Send dispatches an event to all matching enabled notification configs.
  34. func (m *Manager) Send(event Event) {
  35. var configs []model.NotificationConfig
  36. m.db.Where("event_type = ? AND enabled = ?", event.Type, true).Find(&configs)
  37. for _, cfg := range configs {
  38. go m.dispatch(cfg, event)
  39. }
  40. }
  41. func (m *Manager) dispatch(cfg model.NotificationConfig, event Event) {
  42. defer func() {
  43. if r := recover(); r != nil {
  44. log.Printf("[notification] panic dispatching to %s: %v", cfg.Name, r)
  45. }
  46. }()
  47. var configMap map[string]string
  48. json.Unmarshal(cfg.Config, &configMap)
  49. switch cfg.Channel {
  50. case "webhook":
  51. m.sendWebhook(configMap["url"], event)
  52. case "tg_bot":
  53. m.sendTgBot(configMap["bot_token"], configMap["chat_id"], event)
  54. }
  55. }
  56. // ValidateWebhookURL checks that a webhook URL is safe to call.
  57. func ValidateWebhookURL(rawURL string) error {
  58. if rawURL == "" {
  59. return fmt.Errorf("URL is empty")
  60. }
  61. u, err := url.Parse(rawURL)
  62. if err != nil {
  63. return fmt.Errorf("invalid URL: %w", err)
  64. }
  65. if u.Scheme != "https" && u.Scheme != "http" {
  66. return fmt.Errorf("URL scheme must be http or https")
  67. }
  68. // Block private/internal IPs
  69. host := u.Hostname()
  70. if ip := net.ParseIP(host); ip != nil {
  71. if ip.IsLoopback() || ip.IsPrivate() || ip.IsLinkLocalUnicast() {
  72. return fmt.Errorf("webhook cannot target private/internal IPs")
  73. }
  74. }
  75. // Block common internal hostnames
  76. lower := strings.ToLower(host)
  77. if lower == "localhost" || strings.HasSuffix(lower, ".local") || strings.HasSuffix(lower, ".internal") {
  78. return fmt.Errorf("webhook cannot target internal hostnames")
  79. }
  80. return nil
  81. }
  82. func (m *Manager) sendWebhook(webhookURL string, event Event) {
  83. if webhookURL == "" {
  84. return
  85. }
  86. if err := ValidateWebhookURL(webhookURL); err != nil {
  87. log.Printf("[notification] invalid webhook URL: %v", err)
  88. return
  89. }
  90. body, _ := json.Marshal(event)
  91. // Sign payload with HMAC-SHA256
  92. mac := hmac.New(sha256.New, []byte("spider-webhook-secret"))
  93. mac.Write(body)
  94. signature := hex.EncodeToString(mac.Sum(nil))
  95. req, _ := http.NewRequest("POST", webhookURL, bytes.NewReader(body))
  96. req.Header.Set("Content-Type", "application/json")
  97. req.Header.Set("X-Spider-Signature", signature)
  98. req.Header.Set("X-Spider-Event", event.Type)
  99. client := &http.Client{Timeout: 10 * time.Second}
  100. // Retry with exponential backoff (3 attempts)
  101. var lastErr error
  102. for attempt := 0; attempt < 3; attempt++ {
  103. if attempt > 0 {
  104. backoff := time.Duration(1<<uint(attempt-1)) * time.Second // 1s, 2s
  105. time.Sleep(backoff)
  106. req, _ = http.NewRequest("POST", webhookURL, bytes.NewReader(body))
  107. req.Header.Set("Content-Type", "application/json")
  108. req.Header.Set("X-Spider-Signature", signature)
  109. req.Header.Set("X-Spider-Event", event.Type)
  110. }
  111. resp, err := client.Do(req)
  112. if err != nil {
  113. lastErr = err
  114. log.Printf("[notification] webhook attempt %d failed: %v", attempt+1, err)
  115. continue
  116. }
  117. resp.Body.Close()
  118. if resp.StatusCode >= 200 && resp.StatusCode < 300 {
  119. return // success
  120. }
  121. lastErr = fmt.Errorf("HTTP %d", resp.StatusCode)
  122. log.Printf("[notification] webhook attempt %d returned %d", attempt+1, resp.StatusCode)
  123. }
  124. log.Printf("[notification] webhook failed after 3 attempts: %v", lastErr)
  125. }
  126. func (m *Manager) sendTgBot(botToken, chatID string, event Event) {
  127. if botToken == "" || chatID == "" {
  128. return
  129. }
  130. text := fmt.Sprintf("*%s*\n%s", event.Title, event.Message)
  131. payload := map[string]interface{}{
  132. "chat_id": chatID,
  133. "text": text,
  134. "parse_mode": "Markdown",
  135. }
  136. body, _ := json.Marshal(payload)
  137. url := fmt.Sprintf("https://api.telegram.org/bot%s/sendMessage", botToken)
  138. client := &http.Client{Timeout: 10 * time.Second}
  139. resp, err := client.Post(url, "application/json", bytes.NewReader(body))
  140. if err != nil {
  141. log.Printf("[notification] tg_bot error: %v", err)
  142. return
  143. }
  144. resp.Body.Close()
  145. }
  146. // SendTest sends a test notification to verify configuration.
  147. func (m *Manager) SendTest(cfg model.NotificationConfig) error {
  148. event := Event{
  149. Type: cfg.EventType,
  150. Title: "测试通知",
  151. Message: fmt.Sprintf("通知配置 [%s] 测试成功", cfg.Name),
  152. }
  153. var configMap map[string]string
  154. json.Unmarshal(cfg.Config, &configMap)
  155. switch cfg.Channel {
  156. case "webhook":
  157. m.sendWebhook(configMap["url"], event)
  158. case "tg_bot":
  159. m.sendTgBot(configMap["bot_token"], configMap["chat_id"], event)
  160. default:
  161. return fmt.Errorf("unsupported channel: %s", cfg.Channel)
  162. }
  163. return nil
  164. }