| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251 |
- package main
- import (
- "context"
- "fmt"
- "log"
- "net/http"
- "os"
- "os/signal"
- "syscall"
- "time"
- "spider/internal/config"
- "spider/internal/handler"
- "spider/internal/llm"
- "spider/internal/model"
- "spider/internal/notification"
- "spider/internal/plugin"
- "spider/internal/plugins/githubcollector"
- "spider/internal/plugins/tgcollector"
- "spider/internal/plugins/webcollector"
- "spider/internal/processor"
- "spider/internal/search"
- "spider/internal/store"
- "spider/internal/task"
- "spider/internal/telegram"
- "github.com/redis/go-redis/v9"
- "golang.org/x/crypto/bcrypt"
- "gorm.io/driver/mysql"
- "gorm.io/gorm"
- )
- func main() {
- // 1. Load config
- cfg, err := config.Load("configs/config.yaml")
- if err != nil {
- log.Fatalf("load config: %v", err)
- }
- // 2. Connect MySQL
- dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=True&loc=Local",
- cfg.MySQL.User, cfg.MySQL.Password, cfg.MySQL.Host, cfg.MySQL.Port, cfg.MySQL.Database)
- db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})
- if err != nil {
- log.Fatalf("connect mysql: %v", err)
- }
- sqlDB, err := db.DB()
- if err != nil {
- log.Fatalf("get sql.DB: %v", err)
- }
- sqlDB.SetMaxOpenConns(25)
- sqlDB.SetMaxIdleConns(10)
- sqlDB.SetConnMaxLifetime(5 * time.Minute)
- sqlDB.SetConnMaxIdleTime(3 * time.Minute)
- // 3. AutoMigrate
- err = db.AutoMigrate(
- &model.Keyword{},
- &model.Channel{},
- &model.MerchantRaw{},
- &model.MerchantClean{},
- &model.TaskLog{},
- &model.TaskDetail{},
- &model.Setting{},
- &model.GroupMember{},
- &model.User{},
- &model.TgAccount{},
- &model.ScheduleJob{},
- &model.MerchantNote{},
- &model.AuditLog{},
- &model.NotificationConfig{},
- &model.MerchantArchived{},
- &model.RolePermission{},
- &model.Proxy{},
- )
- if err != nil {
- log.Fatalf("automigrate: %v", err)
- }
- log.Println("MySQL tables migrated")
- // 4. Create default admin user if no users exist
- var userCount int64
- db.Model(&model.User{}).Count(&userCount)
- if userCount == 0 {
- hashed, _ := bcrypt.GenerateFromPassword([]byte("admin123"), bcrypt.DefaultCost)
- db.Create(&model.User{
- Username: "admin",
- Password: string(hashed),
- Nickname: "管理员",
- Role: "admin",
- Enabled: true,
- MustChangePassword: true,
- })
- log.Println("Default admin user created (admin / admin123)")
- }
- // 4b. Seed default role permissions if none exist
- var permCount int64
- db.Model(&model.RolePermission{}).Count(&permCount)
- if permCount == 0 {
- for role, perm := range model.DefaultPermissions() {
- db.Create(&model.RolePermission{
- Role: role,
- Menus: perm.Menus,
- Actions: perm.Actions,
- })
- }
- log.Println("Default role permissions created")
- }
- // 5. Connect Redis
- rdb := redis.NewClient(&redis.Options{
- Addr: fmt.Sprintf("%s:%d", cfg.Redis.Host, cfg.Redis.Port),
- Password: cfg.Redis.Password,
- DB: cfg.Redis.DB,
- })
- ctx := context.Background()
- if err := rdb.Ping(ctx).Err(); err != nil {
- log.Fatalf("redis ping: %v", err)
- }
- log.Println("Redis connected")
- // 6. Clean up stale "running" tasks
- db.Model(&model.TaskLog{}).
- Where("status = ?", "running").
- Updates(map[string]any{
- "status": "failed",
- "detail": "服务重启,任务中断",
- "finished_at": time.Now(),
- })
- // 7. Initialize store
- s := store.New(db)
- // 8. Initialize external clients
- var llmClient *llm.Client
- if cfg.LLM.APIKey != "" {
- llmClient = llm.New(cfg.LLM.BaseURL, cfg.LLM.APIKey, cfg.LLM.Model, 30*time.Second)
- }
- var serperClient *search.SerperClient
- if cfg.Serper.APIKey != "" {
- serperClient = search.NewSerperClient(cfg.Serper.APIKey, cfg.Serper.ResultsPerPage, cfg.Serper.MaxPages)
- }
- // 8b. Construct TG crypto helper (required, fails fast on missing/invalid key).
- tgCrypto, err := telegram.NewCrypto(cfg.Telegram.SecretKey)
- if err != nil {
- log.Fatalf("TG_SECRET_KEY invalid: %v — set a 32-byte base64 value in env", err)
- }
- sessionsDir := cfg.Telegram.SessionsDir
- if sessionsDir == "" {
- sessionsDir = "/app/sessions"
- }
- if err := os.MkdirAll(sessionsDir, 0o755); err != nil {
- log.Fatalf("create sessions dir %s: %v", sessionsDir, err)
- }
- // 9. Load TG accounts from DB (fall back to config for backward compatibility)
- var dbTgAccounts []model.TgAccount
- db.Where("enabled = ?", true).Find(&dbTgAccounts)
- tgAccounts := make([]telegram.Account, 0)
- if len(dbTgAccounts) > 0 {
- for _, a := range dbTgAccounts {
- tgAccounts = append(tgAccounts, telegram.Account{
- Phone: a.Phone,
- SessionFile: a.SessionFile,
- AppID: a.AppID,
- AppHash: a.AppHash,
- })
- }
- log.Printf("Loaded %d TG accounts from database", len(tgAccounts))
- } else {
- // Fallback: load from config.yaml
- for _, a := range cfg.Telegram.Accounts {
- tgAccounts = append(tgAccounts, telegram.Account{
- Phone: a.Phone,
- SessionFile: a.SessionFile,
- AppID: cfg.Telegram.AppID,
- AppHash: cfg.Telegram.AppHash,
- })
- }
- if len(tgAccounts) > 0 {
- log.Printf("Loaded %d TG accounts from config", len(tgAccounts))
- }
- }
- tgManager := telegram.NewAccountManager(tgAccounts, rdb)
- // 10. Register plugins
- registry := plugin.NewRegistry()
- registry.Register(webcollector.New(serperClient, rdb))
- registry.Register(tgcollector.New(tgManager, llmClient, s))
- registry.Register(githubcollector.New(cfg.GitHub.Token, s))
- // 11. Initialize processor, notifier & task manager
- proc := processor.NewProcessor(s)
- notifMgr := notification.NewManager(db)
- taskMgr := task.NewManager(db, rdb, registry, s, proc)
- taskMgr.SetNotifier(notifMgr)
- // 12. Setup scheduler
- scheduler := task.NewScheduler(db, taskMgr)
- scheduler.Start()
- defer scheduler.Stop()
- // 13. Setup HTTP server
- tgAccountHandler := handler.NewTgAccountHandler(s, tgManager, tgCrypto, sessionsDir)
- r := handler.SetupRouter(s, taskMgr, rdb, tgManager, tgAccountHandler)
- // Wire scheduler into the schedule handler
- if sh := handler.GetScheduleHandler(); sh != nil {
- sh.SetScheduler(scheduler)
- }
- addr := handler.ServerAddr(cfg.Server.Port)
- srv := &http.Server{
- Addr: addr,
- Handler: r,
- ReadTimeout: 30 * time.Second,
- WriteTimeout: 60 * time.Second,
- IdleTimeout: 120 * time.Second,
- }
- go func() {
- log.Printf("Server starting on %s", addr)
- if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
- log.Fatalf("listen: %v", err)
- }
- }()
- // Graceful shutdown
- quit := make(chan os.Signal, 1)
- signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
- <-quit
- log.Println("Shutting down server...")
- taskMgr.StopAll()
- shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
- defer cancel()
- if err := srv.Shutdown(shutdownCtx); err != nil {
- log.Fatalf("server forced to shutdown: %v", err)
- }
- rdb.Close()
- sqlDB.Close()
- log.Println("Server exited")
- }
|