main.go 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "log"
  6. "net/http"
  7. "os"
  8. "os/signal"
  9. "syscall"
  10. "time"
  11. "spider/internal/config"
  12. "spider/internal/handler"
  13. "spider/internal/llm"
  14. "spider/internal/model"
  15. "spider/internal/notification"
  16. "spider/internal/plugin"
  17. "spider/internal/plugins/githubcollector"
  18. "spider/internal/plugins/tgcollector"
  19. "spider/internal/plugins/webcollector"
  20. "spider/internal/processor"
  21. "spider/internal/search"
  22. "spider/internal/store"
  23. "spider/internal/task"
  24. "spider/internal/telegram"
  25. "github.com/redis/go-redis/v9"
  26. "golang.org/x/crypto/bcrypt"
  27. "gorm.io/driver/mysql"
  28. "gorm.io/gorm"
  29. )
  30. func main() {
  31. // 1. Load config
  32. cfg, err := config.Load("configs/config.yaml")
  33. if err != nil {
  34. log.Fatalf("load config: %v", err)
  35. }
  36. // 2. Connect MySQL
  37. dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=True&loc=Local",
  38. cfg.MySQL.User, cfg.MySQL.Password, cfg.MySQL.Host, cfg.MySQL.Port, cfg.MySQL.Database)
  39. db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})
  40. if err != nil {
  41. log.Fatalf("connect mysql: %v", err)
  42. }
  43. sqlDB, err := db.DB()
  44. if err != nil {
  45. log.Fatalf("get sql.DB: %v", err)
  46. }
  47. sqlDB.SetMaxOpenConns(25)
  48. sqlDB.SetMaxIdleConns(10)
  49. sqlDB.SetConnMaxLifetime(5 * time.Minute)
  50. sqlDB.SetConnMaxIdleTime(3 * time.Minute)
  51. // 3. AutoMigrate
  52. err = db.AutoMigrate(
  53. &model.Keyword{},
  54. &model.Channel{},
  55. &model.MerchantRaw{},
  56. &model.MerchantClean{},
  57. &model.TaskLog{},
  58. &model.TaskDetail{},
  59. &model.Setting{},
  60. &model.GroupMember{},
  61. &model.User{},
  62. &model.TgAccount{},
  63. &model.ScheduleJob{},
  64. &model.MerchantNote{},
  65. &model.AuditLog{},
  66. &model.NotificationConfig{},
  67. &model.MerchantArchived{},
  68. &model.RolePermission{},
  69. &model.Proxy{},
  70. )
  71. if err != nil {
  72. log.Fatalf("automigrate: %v", err)
  73. }
  74. log.Println("MySQL tables migrated")
  75. // 4. Create default admin user if no users exist
  76. var userCount int64
  77. db.Model(&model.User{}).Count(&userCount)
  78. if userCount == 0 {
  79. hashed, _ := bcrypt.GenerateFromPassword([]byte("admin123"), bcrypt.DefaultCost)
  80. db.Create(&model.User{
  81. Username: "admin",
  82. Password: string(hashed),
  83. Nickname: "管理员",
  84. Role: "admin",
  85. Enabled: true,
  86. MustChangePassword: true,
  87. })
  88. log.Println("Default admin user created (admin / admin123)")
  89. }
  90. // 4b. Seed default role permissions if none exist
  91. var permCount int64
  92. db.Model(&model.RolePermission{}).Count(&permCount)
  93. if permCount == 0 {
  94. for role, perm := range model.DefaultPermissions() {
  95. db.Create(&model.RolePermission{
  96. Role: role,
  97. Menus: perm.Menus,
  98. Actions: perm.Actions,
  99. })
  100. }
  101. log.Println("Default role permissions created")
  102. }
  103. // 5. Connect Redis
  104. rdb := redis.NewClient(&redis.Options{
  105. Addr: fmt.Sprintf("%s:%d", cfg.Redis.Host, cfg.Redis.Port),
  106. Password: cfg.Redis.Password,
  107. DB: cfg.Redis.DB,
  108. })
  109. ctx := context.Background()
  110. if err := rdb.Ping(ctx).Err(); err != nil {
  111. log.Fatalf("redis ping: %v", err)
  112. }
  113. log.Println("Redis connected")
  114. // 6. Clean up stale "running" tasks
  115. db.Model(&model.TaskLog{}).
  116. Where("status = ?", "running").
  117. Updates(map[string]any{
  118. "status": "failed",
  119. "detail": "服务重启,任务中断",
  120. "finished_at": time.Now(),
  121. })
  122. // 7. Initialize store
  123. s := store.New(db)
  124. // 8. Initialize external clients
  125. var llmClient *llm.Client
  126. if cfg.LLM.APIKey != "" {
  127. llmClient = llm.New(cfg.LLM.BaseURL, cfg.LLM.APIKey, cfg.LLM.Model, 30*time.Second)
  128. }
  129. var serperClient *search.SerperClient
  130. if cfg.Serper.APIKey != "" {
  131. serperClient = search.NewSerperClient(cfg.Serper.APIKey, cfg.Serper.ResultsPerPage, cfg.Serper.MaxPages)
  132. }
  133. // 8b. Construct TG crypto helper (required, fails fast on missing/invalid key).
  134. tgCrypto, err := telegram.NewCrypto(cfg.Telegram.SecretKey)
  135. if err != nil {
  136. log.Fatalf("TG_SECRET_KEY invalid: %v — set a 32-byte base64 value in env", err)
  137. }
  138. sessionsDir := cfg.Telegram.SessionsDir
  139. if sessionsDir == "" {
  140. sessionsDir = "/app/sessions"
  141. }
  142. if err := os.MkdirAll(sessionsDir, 0o755); err != nil {
  143. log.Fatalf("create sessions dir %s: %v", sessionsDir, err)
  144. }
  145. // 9. Load TG accounts from DB (fall back to config for backward compatibility)
  146. var dbTgAccounts []model.TgAccount
  147. db.Where("enabled = ?", true).Find(&dbTgAccounts)
  148. tgAccounts := make([]telegram.Account, 0)
  149. if len(dbTgAccounts) > 0 {
  150. for _, a := range dbTgAccounts {
  151. tgAccounts = append(tgAccounts, telegram.Account{
  152. Phone: a.Phone,
  153. SessionFile: a.SessionFile,
  154. AppID: a.AppID,
  155. AppHash: a.AppHash,
  156. })
  157. }
  158. log.Printf("Loaded %d TG accounts from database", len(tgAccounts))
  159. } else {
  160. // Fallback: load from config.yaml
  161. for _, a := range cfg.Telegram.Accounts {
  162. tgAccounts = append(tgAccounts, telegram.Account{
  163. Phone: a.Phone,
  164. SessionFile: a.SessionFile,
  165. AppID: cfg.Telegram.AppID,
  166. AppHash: cfg.Telegram.AppHash,
  167. })
  168. }
  169. if len(tgAccounts) > 0 {
  170. log.Printf("Loaded %d TG accounts from config", len(tgAccounts))
  171. }
  172. }
  173. tgManager := telegram.NewAccountManager(tgAccounts, rdb)
  174. // 10. Register plugins
  175. registry := plugin.NewRegistry()
  176. registry.Register(webcollector.New(serperClient, rdb))
  177. registry.Register(tgcollector.New(tgManager, llmClient, s))
  178. registry.Register(githubcollector.New(cfg.GitHub.Token, s))
  179. // 11. Initialize processor, notifier & task manager
  180. proc := processor.NewProcessor(s)
  181. notifMgr := notification.NewManager(db)
  182. taskMgr := task.NewManager(db, rdb, registry, s, proc)
  183. taskMgr.SetNotifier(notifMgr)
  184. // 12. Setup scheduler
  185. scheduler := task.NewScheduler(db, taskMgr)
  186. scheduler.Start()
  187. defer scheduler.Stop()
  188. // 13. Setup HTTP server
  189. tgAccountHandler := handler.NewTgAccountHandler(s, tgManager, tgCrypto, sessionsDir)
  190. r := handler.SetupRouter(s, taskMgr, rdb, tgManager, tgAccountHandler)
  191. // Wire scheduler into the schedule handler
  192. if sh := handler.GetScheduleHandler(); sh != nil {
  193. sh.SetScheduler(scheduler)
  194. }
  195. addr := handler.ServerAddr(cfg.Server.Port)
  196. srv := &http.Server{
  197. Addr: addr,
  198. Handler: r,
  199. ReadTimeout: 30 * time.Second,
  200. WriteTimeout: 60 * time.Second,
  201. IdleTimeout: 120 * time.Second,
  202. }
  203. go func() {
  204. log.Printf("Server starting on %s", addr)
  205. if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
  206. log.Fatalf("listen: %v", err)
  207. }
  208. }()
  209. // Graceful shutdown
  210. quit := make(chan os.Signal, 1)
  211. signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
  212. <-quit
  213. log.Println("Shutting down server...")
  214. taskMgr.StopAll()
  215. shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
  216. defer cancel()
  217. if err := srv.Shutdown(shutdownCtx); err != nil {
  218. log.Fatalf("server forced to shutdown: %v", err)
  219. }
  220. rdb.Close()
  221. sqlDB.Close()
  222. log.Println("Server exited")
  223. }