package notification import ( "bytes" "crypto/hmac" "crypto/sha256" "encoding/hex" "encoding/json" "fmt" "log" "net" "net/http" "net/url" "strings" "time" "spider/internal/model" "gorm.io/gorm" ) // Event represents something that happened in the system. type Event struct { Type string `json:"type"` // task_completed, task_failed, new_hot_merchant, schedule_run Title string `json:"title"` Message string `json:"message"` Data interface{} `json:"data,omitempty"` } // Manager dispatches events to configured notification channels. type Manager struct { db *gorm.DB } // NewManager creates a new notification manager. func NewManager(db *gorm.DB) *Manager { return &Manager{db: db} } // Send dispatches an event to all matching enabled notification configs. func (m *Manager) Send(event Event) { var configs []model.NotificationConfig m.db.Where("event_type = ? AND enabled = ?", event.Type, true).Find(&configs) for _, cfg := range configs { go m.dispatch(cfg, event) } } func (m *Manager) dispatch(cfg model.NotificationConfig, event Event) { defer func() { if r := recover(); r != nil { log.Printf("[notification] panic dispatching to %s: %v", cfg.Name, r) } }() var configMap map[string]string json.Unmarshal(cfg.Config, &configMap) switch cfg.Channel { case "webhook": m.sendWebhook(configMap["url"], event) case "tg_bot": m.sendTgBot(configMap["bot_token"], configMap["chat_id"], event) } } // ValidateWebhookURL checks that a webhook URL is safe to call. func ValidateWebhookURL(rawURL string) error { if rawURL == "" { return fmt.Errorf("URL is empty") } u, err := url.Parse(rawURL) if err != nil { return fmt.Errorf("invalid URL: %w", err) } if u.Scheme != "https" && u.Scheme != "http" { return fmt.Errorf("URL scheme must be http or https") } // Block private/internal IPs host := u.Hostname() if ip := net.ParseIP(host); ip != nil { if ip.IsLoopback() || ip.IsPrivate() || ip.IsLinkLocalUnicast() { return fmt.Errorf("webhook cannot target private/internal IPs") } } // Block common internal hostnames lower := strings.ToLower(host) if lower == "localhost" || strings.HasSuffix(lower, ".local") || strings.HasSuffix(lower, ".internal") { return fmt.Errorf("webhook cannot target internal hostnames") } return nil } func (m *Manager) sendWebhook(webhookURL string, event Event) { if webhookURL == "" { return } if err := ValidateWebhookURL(webhookURL); err != nil { log.Printf("[notification] invalid webhook URL: %v", err) return } body, _ := json.Marshal(event) // Sign payload with HMAC-SHA256 mac := hmac.New(sha256.New, []byte("spider-webhook-secret")) mac.Write(body) signature := hex.EncodeToString(mac.Sum(nil)) req, _ := http.NewRequest("POST", webhookURL, bytes.NewReader(body)) req.Header.Set("Content-Type", "application/json") req.Header.Set("X-Spider-Signature", signature) req.Header.Set("X-Spider-Event", event.Type) client := &http.Client{Timeout: 10 * time.Second} // Retry with exponential backoff (3 attempts) var lastErr error for attempt := 0; attempt < 3; attempt++ { if attempt > 0 { backoff := time.Duration(1<= 200 && resp.StatusCode < 300 { return // success } lastErr = fmt.Errorf("HTTP %d", resp.StatusCode) log.Printf("[notification] webhook attempt %d returned %d", attempt+1, resp.StatusCode) } log.Printf("[notification] webhook failed after 3 attempts: %v", lastErr) } func (m *Manager) sendTgBot(botToken, chatID string, event Event) { if botToken == "" || chatID == "" { return } text := fmt.Sprintf("*%s*\n%s", event.Title, event.Message) payload := map[string]interface{}{ "chat_id": chatID, "text": text, "parse_mode": "Markdown", } body, _ := json.Marshal(payload) url := fmt.Sprintf("https://api.telegram.org/bot%s/sendMessage", botToken) client := &http.Client{Timeout: 10 * time.Second} resp, err := client.Post(url, "application/json", bytes.NewReader(body)) if err != nil { log.Printf("[notification] tg_bot error: %v", err) return } resp.Body.Close() } // SendTest sends a test notification to verify configuration. func (m *Manager) SendTest(cfg model.NotificationConfig) error { event := Event{ Type: cfg.EventType, Title: "测试通知", Message: fmt.Sprintf("通知配置 [%s] 测试成功", cfg.Name), } var configMap map[string]string json.Unmarshal(cfg.Config, &configMap) switch cfg.Channel { case "webhook": m.sendWebhook(configMap["url"], event) case "tg_bot": m.sendTgBot(configMap["bot_token"], configMap["chat_id"], event) default: return fmt.Errorf("unsupported channel: %s", cfg.Channel) } return nil }