| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184 |
- package notification
- import (
- "bytes"
- "crypto/hmac"
- "crypto/sha256"
- "encoding/hex"
- "encoding/json"
- "fmt"
- "log"
- "net"
- "net/http"
- "net/url"
- "strings"
- "time"
- "spider/internal/model"
- "gorm.io/gorm"
- )
- // Event represents something that happened in the system.
- type Event struct {
- Type string `json:"type"` // task_completed, task_failed, new_hot_merchant, schedule_run
- Title string `json:"title"`
- Message string `json:"message"`
- Data interface{} `json:"data,omitempty"`
- }
- // Manager dispatches events to configured notification channels.
- type Manager struct {
- db *gorm.DB
- }
- // NewManager creates a new notification manager.
- func NewManager(db *gorm.DB) *Manager {
- return &Manager{db: db}
- }
- // Send dispatches an event to all matching enabled notification configs.
- func (m *Manager) Send(event Event) {
- var configs []model.NotificationConfig
- m.db.Where("event_type = ? AND enabled = ?", event.Type, true).Find(&configs)
- for _, cfg := range configs {
- go m.dispatch(cfg, event)
- }
- }
- func (m *Manager) dispatch(cfg model.NotificationConfig, event Event) {
- defer func() {
- if r := recover(); r != nil {
- log.Printf("[notification] panic dispatching to %s: %v", cfg.Name, r)
- }
- }()
- var configMap map[string]string
- json.Unmarshal(cfg.Config, &configMap)
- switch cfg.Channel {
- case "webhook":
- m.sendWebhook(configMap["url"], event)
- case "tg_bot":
- m.sendTgBot(configMap["bot_token"], configMap["chat_id"], event)
- }
- }
- // ValidateWebhookURL checks that a webhook URL is safe to call.
- func ValidateWebhookURL(rawURL string) error {
- if rawURL == "" {
- return fmt.Errorf("URL is empty")
- }
- u, err := url.Parse(rawURL)
- if err != nil {
- return fmt.Errorf("invalid URL: %w", err)
- }
- if u.Scheme != "https" && u.Scheme != "http" {
- return fmt.Errorf("URL scheme must be http or https")
- }
- // Block private/internal IPs
- host := u.Hostname()
- if ip := net.ParseIP(host); ip != nil {
- if ip.IsLoopback() || ip.IsPrivate() || ip.IsLinkLocalUnicast() {
- return fmt.Errorf("webhook cannot target private/internal IPs")
- }
- }
- // Block common internal hostnames
- lower := strings.ToLower(host)
- if lower == "localhost" || strings.HasSuffix(lower, ".local") || strings.HasSuffix(lower, ".internal") {
- return fmt.Errorf("webhook cannot target internal hostnames")
- }
- return nil
- }
- func (m *Manager) sendWebhook(webhookURL string, event Event) {
- if webhookURL == "" {
- return
- }
- if err := ValidateWebhookURL(webhookURL); err != nil {
- log.Printf("[notification] invalid webhook URL: %v", err)
- return
- }
- body, _ := json.Marshal(event)
- // Sign payload with HMAC-SHA256
- mac := hmac.New(sha256.New, []byte("spider-webhook-secret"))
- mac.Write(body)
- signature := hex.EncodeToString(mac.Sum(nil))
- req, _ := http.NewRequest("POST", webhookURL, bytes.NewReader(body))
- req.Header.Set("Content-Type", "application/json")
- req.Header.Set("X-Spider-Signature", signature)
- req.Header.Set("X-Spider-Event", event.Type)
- client := &http.Client{Timeout: 10 * time.Second}
- // Retry with exponential backoff (3 attempts)
- var lastErr error
- for attempt := 0; attempt < 3; attempt++ {
- if attempt > 0 {
- backoff := time.Duration(1<<uint(attempt-1)) * time.Second // 1s, 2s
- time.Sleep(backoff)
- req, _ = http.NewRequest("POST", webhookURL, bytes.NewReader(body))
- req.Header.Set("Content-Type", "application/json")
- req.Header.Set("X-Spider-Signature", signature)
- req.Header.Set("X-Spider-Event", event.Type)
- }
- resp, err := client.Do(req)
- if err != nil {
- lastErr = err
- log.Printf("[notification] webhook attempt %d failed: %v", attempt+1, err)
- continue
- }
- resp.Body.Close()
- if resp.StatusCode >= 200 && resp.StatusCode < 300 {
- return // success
- }
- lastErr = fmt.Errorf("HTTP %d", resp.StatusCode)
- log.Printf("[notification] webhook attempt %d returned %d", attempt+1, resp.StatusCode)
- }
- log.Printf("[notification] webhook failed after 3 attempts: %v", lastErr)
- }
- func (m *Manager) sendTgBot(botToken, chatID string, event Event) {
- if botToken == "" || chatID == "" {
- return
- }
- text := fmt.Sprintf("*%s*\n%s", event.Title, event.Message)
- payload := map[string]interface{}{
- "chat_id": chatID,
- "text": text,
- "parse_mode": "Markdown",
- }
- body, _ := json.Marshal(payload)
- url := fmt.Sprintf("https://api.telegram.org/bot%s/sendMessage", botToken)
- client := &http.Client{Timeout: 10 * time.Second}
- resp, err := client.Post(url, "application/json", bytes.NewReader(body))
- if err != nil {
- log.Printf("[notification] tg_bot error: %v", err)
- return
- }
- resp.Body.Close()
- }
- // SendTest sends a test notification to verify configuration.
- func (m *Manager) SendTest(cfg model.NotificationConfig) error {
- event := Event{
- Type: cfg.EventType,
- Title: "测试通知",
- Message: fmt.Sprintf("通知配置 [%s] 测试成功", cfg.Name),
- }
- var configMap map[string]string
- json.Unmarshal(cfg.Config, &configMap)
- switch cfg.Channel {
- case "webhook":
- m.sendWebhook(configMap["url"], event)
- case "tg_bot":
- m.sendTgBot(configMap["bot_token"], configMap["chat_id"], event)
- default:
- return fmt.Errorf("unsupported channel: %s", cfg.Channel)
- }
- return nil
- }
|