| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492 |
- package handler
- import (
- "context"
- "fmt"
- "io"
- "mime/multipart"
- "os"
- "path"
- "path/filepath"
- "strconv"
- "strings"
- "time"
- "spider/internal/model"
- "spider/internal/store"
- "spider/internal/telegram"
- "spider/internal/telegram/sessionimport"
- "github.com/gin-gonic/gin"
- "github.com/google/uuid"
- )
- // TgAccountHandler handles Telegram account management.
- type TgAccountHandler struct {
- store *store.Store
- tgManager *telegram.AccountManager
- crypto *telegram.Crypto
- sessionsDir string
- tmpDir string // <sessionsDir>/.tmp — staging area for multipart uploads
- }
- // NewTgAccountHandler builds the handler with dependencies injected from main.
- func NewTgAccountHandler(s *store.Store, tgMgr *telegram.AccountManager, crypto *telegram.Crypto, sessionsDir string) *TgAccountHandler {
- tmp := filepath.Join(sessionsDir, ".tmp")
- _ = os.MkdirAll(tmp, 0o755)
- h := &TgAccountHandler{
- store: s,
- tgManager: tgMgr,
- crypto: crypto,
- sessionsDir: sessionsDir,
- tmpDir: tmp,
- }
- go h.tmpCleanupLoop()
- return h
- }
- // tmpCleanupLoop deletes tmp subdirs older than 30 minutes every 10 minutes.
- func (h *TgAccountHandler) tmpCleanupLoop() {
- t := time.NewTicker(10 * time.Minute)
- defer t.Stop()
- for range t.C {
- entries, err := os.ReadDir(h.tmpDir)
- if err != nil {
- continue
- }
- cutoff := time.Now().Add(-30 * time.Minute)
- for _, e := range entries {
- if !e.IsDir() {
- continue
- }
- info, err := e.Info()
- if err != nil || info.ModTime().After(cutoff) {
- continue
- }
- _ = os.RemoveAll(filepath.Join(h.tmpDir, e.Name()))
- }
- }
- }
- // List handles GET /tg-accounts
- func (h *TgAccountHandler) List(c *gin.Context) {
- var accounts []model.TgAccount
- h.store.DB.Order("id ASC").Find(&accounts)
- // Enrich with live status from account manager
- statuses := h.tgManager.GetStatuses()
- for i := range accounts {
- if s, ok := statuses[accounts[i].Phone]; ok {
- accounts[i].Status = s
- }
- }
- OK(c, accounts)
- }
- // Create handles POST /tg-accounts
- func (h *TgAccountHandler) Create(c *gin.Context) {
- var req struct {
- Phone string `json:"phone" binding:"required"`
- SessionFile string `json:"session_file" binding:"required"`
- AppID int `json:"app_id" binding:"required"`
- AppHash string `json:"app_hash" binding:"required"`
- Remark string `json:"remark"`
- }
- if err := c.ShouldBindJSON(&req); err != nil {
- Fail(c, 400, err.Error())
- return
- }
- // Check duplicate
- var count int64
- h.store.DB.Model(&model.TgAccount{}).Where("phone = ?", req.Phone).Count(&count)
- if count > 0 {
- Fail(c, 409, "该手机号已存在")
- return
- }
- acc := model.TgAccount{
- Phone: req.Phone,
- SessionFile: req.SessionFile,
- AppID: req.AppID,
- AppHash: req.AppHash,
- Remark: req.Remark,
- Enabled: true,
- Status: "idle",
- }
- if err := h.store.DB.Create(&acc).Error; err != nil {
- Fail(c, 500, err.Error())
- return
- }
- // Reload account manager
- h.reloadAccounts()
- LogAudit(h.store, c, "create", "tg_account", fmt.Sprintf("%d", acc.ID), gin.H{"phone": acc.Phone})
- OK(c, acc)
- }
- // Update handles PUT /tg-accounts/:id
- func (h *TgAccountHandler) Update(c *gin.Context) {
- id, err := strconv.ParseUint(c.Param("id"), 10, 64)
- if err != nil {
- Fail(c, 400, "invalid id")
- return
- }
- var acc model.TgAccount
- if err := h.store.DB.First(&acc, id).Error; err != nil {
- Fail(c, 404, "账号不存在")
- return
- }
- var req struct {
- SessionFile *string `json:"session_file"`
- AppID *int `json:"app_id"`
- AppHash *string `json:"app_hash"`
- Remark *string `json:"remark"`
- Enabled *bool `json:"enabled"`
- }
- if err := c.ShouldBindJSON(&req); err != nil {
- Fail(c, 400, err.Error())
- return
- }
- updates := map[string]any{}
- if req.SessionFile != nil {
- updates["session_file"] = *req.SessionFile
- }
- if req.AppID != nil {
- updates["app_id"] = *req.AppID
- }
- if req.AppHash != nil {
- updates["app_hash"] = *req.AppHash
- }
- if req.Remark != nil {
- updates["remark"] = *req.Remark
- }
- if req.Enabled != nil {
- updates["enabled"] = *req.Enabled
- }
- h.store.DB.Model(&acc).Updates(updates)
- h.store.DB.First(&acc, id)
- h.reloadAccounts()
- LogAudit(h.store, c, "update", "tg_account", fmt.Sprintf("%d", id), updates)
- OK(c, acc)
- }
- // Delete handles DELETE /tg-accounts/:id
- func (h *TgAccountHandler) Delete(c *gin.Context) {
- id, err := strconv.ParseUint(c.Param("id"), 10, 64)
- if err != nil {
- Fail(c, 400, "invalid id")
- return
- }
- if err := h.store.DB.Delete(&model.TgAccount{}, id).Error; err != nil {
- Fail(c, 500, err.Error())
- return
- }
- h.reloadAccounts()
- LogAudit(h.store, c, "delete", "tg_account", fmt.Sprintf("%d", id), nil)
- OK(c, gin.H{"message": "已删除"})
- }
- // Test handles POST /tg-accounts/:id/test — tests if the account can connect.
- func (h *TgAccountHandler) Test(c *gin.Context) {
- id, err := strconv.ParseUint(c.Param("id"), 10, 64)
- if err != nil {
- Fail(c, 400, "invalid id")
- return
- }
- var acc model.TgAccount
- if err := h.store.DB.First(&acc, id).Error; err != nil {
- Fail(c, 404, "账号不存在")
- return
- }
- client := telegram.New(telegram.Account{
- Phone: acc.Phone,
- SessionFile: acc.SessionFile,
- AppID: acc.AppID,
- AppHash: acc.AppHash,
- })
- ctx, cancel := context.WithTimeout(c.Request.Context(), 30*time.Second)
- defer cancel()
- if err := client.Connect(ctx); err != nil {
- if fwe, ok := err.(*telegram.FloodWaitError); ok {
- OK(c, gin.H{"status": "cooling", "message": fmt.Sprintf("FloodWait: 需等待 %d 秒", fwe.Seconds)})
- return
- }
- OK(c, gin.H{"status": "fail", "error": err.Error()})
- return
- }
- client.Disconnect()
- OK(c, gin.H{"status": "ok", "message": "连接成功"})
- }
- // Import handles POST /tg-accounts/import — admin uploads a 协议号 folder.
- func (h *TgAccountHandler) Import(c *gin.Context) {
- form, err := c.MultipartForm()
- if err != nil {
- Fail(c, 400, "invalid multipart form: "+err.Error())
- return
- }
- files := form.File["files"]
- if len(files) == 0 {
- Fail(c, 400, "no files uploaded (expected field name: files)")
- return
- }
- // Stage uploaded files to tmp/<uuid>/.
- stageID := uuid.NewString()
- stageDir := filepath.Join(h.tmpDir, stageID)
- if err := os.MkdirAll(stageDir, 0o755); err != nil {
- Fail(c, 500, "create stage dir: "+err.Error())
- return
- }
- // Top-level folder name in webkitRelativePath (e.g. "13252753163"). Used to
- // strip the prefix so RelPath inside the Result is relative to the folder.
- topPrefix := detectTopPrefix(files)
- staged := make([]sessionimport.StagedFile, 0, len(files))
- for _, fh := range files {
- // Gin stores the relative path in fh.Filename (browsers send it as the
- // form part filename when webkitdirectory is used).
- rel := normalizeRel(fh.Filename, topPrefix)
- if rel == "" {
- continue
- }
- abs := filepath.Join(stageDir, rel)
- if err := os.MkdirAll(filepath.Dir(abs), 0o755); err != nil {
- Fail(c, 500, "mkdir stage: "+err.Error())
- return
- }
- if err := saveUpload(fh, abs); err != nil {
- Fail(c, 500, "save upload: "+err.Error())
- return
- }
- staged = append(staged, sessionimport.StagedFile{RelPath: rel, AbsPath: abs})
- }
- ctx, cancel := context.WithTimeout(c.Request.Context(), 60*time.Second)
- defer cancel()
- res, err := sessionimport.Import(ctx, h.sessionsDir, staged)
- if err != nil {
- detail := gin.H{"error": err.Error()}
- if res != nil {
- detail["pyrogram_err"] = res.PyrogramErr
- detail["tdata_err"] = res.TdataErr
- detail["tmp_dir"] = stageDir
- }
- c.JSON(422, gin.H{
- "code": 422,
- "message": err.Error(),
- "data": detail,
- })
- return
- }
- cleanupOnFail := func() {
- if res != nil {
- if res.SessionFile != "" {
- _ = os.Remove(res.SessionFile)
- }
- if res.OriginDir != "" {
- _ = os.RemoveAll(res.OriginDir)
- }
- }
- }
- // Duplicate check.
- var count int64
- h.store.DB.Model(&model.TgAccount{}).Where("phone = ?", res.Meta.Phone).Count(&count)
- if count > 0 {
- cleanupOnFail()
- Fail(c, 409, "该手机号已存在")
- return
- }
- // Encrypt 2FA if present.
- var twoFAEnc []byte
- if res.TwoFAPlaintext != "" && h.crypto != nil {
- twoFAEnc, err = h.crypto.Encrypt(res.TwoFAPlaintext)
- if err != nil {
- cleanupOnFail()
- Fail(c, 500, "encrypt 2fa: "+err.Error())
- return
- }
- }
- acc := model.TgAccount{
- Phone: res.Meta.Phone,
- SessionFile: res.SessionFile,
- AppID: res.Meta.APIID,
- AppHash: res.Meta.APIHash,
- Enabled: true,
- Status: "idle",
- TwoFAEnc: twoFAEnc,
- Device: res.Meta.Device,
- AppVersion: res.Meta.AppVersion,
- SDK: res.Meta.SDK,
- LangPack: res.Meta.LangPack,
- LangCode: res.Meta.LangCode,
- SystemLangCode: res.Meta.SystemLangCode,
- FirstName: res.Meta.FirstName,
- LastName: res.Meta.LastName,
- TgUsername: res.Meta.Username,
- OriginDir: res.OriginDir,
- ImportStatus: "ok",
- Source: "protocol",
- }
- if err := h.store.DB.Create(&acc).Error; err != nil {
- if strings.Contains(strings.ToLower(err.Error()), "duplicate") {
- cleanupOnFail()
- Fail(c, 409, "该手机号已存在")
- return
- }
- cleanupOnFail()
- Fail(c, 500, "db create: "+err.Error())
- return
- }
- // Rebuild account manager so new account is Acquireable.
- h.reloadAccounts()
- // Auto-test — detach from the HTTP request context so a client disconnect
- // cannot falsely mark a freshly imported account as dead.
- bgCtx, bgCancel := context.WithTimeout(context.Background(), 45*time.Second)
- defer bgCancel()
- testResult := h.runTest(bgCtx, &acc)
- if testResult["status"] == "fail" {
- h.store.DB.Model(&acc).Updates(map[string]any{
- "enabled": false,
- "import_status": "dead",
- })
- }
- LogAudit(h.store, c, "import", "tg_account", fmt.Sprintf("%d", acc.ID), gin.H{
- "phone": acc.Phone,
- "source": res.UsedSource,
- })
- // Reload original so JSON response reflects updates.
- h.store.DB.First(&acc, acc.ID)
- // Cleanup stage dir on success.
- _ = os.RemoveAll(stageDir)
- OK(c, gin.H{"account": acc, "test_result": testResult})
- }
- // runTest calls Client.Connect + waits for ready — no GetMe, kept minimal.
- func (h *TgAccountHandler) runTest(ctx context.Context, acc *model.TgAccount) gin.H {
- client := telegram.New(telegram.Account{
- Phone: acc.Phone,
- SessionFile: acc.SessionFile,
- AppID: acc.AppID,
- AppHash: acc.AppHash,
- Device: acc.Device,
- AppVersion: acc.AppVersion,
- SystemVersion: acc.SDK,
- LangPack: acc.LangPack,
- LangCode: acc.LangCode,
- SystemLangCode: acc.SystemLangCode,
- })
- tctx, cancel := context.WithTimeout(ctx, 30*time.Second)
- defer cancel()
- if err := client.Connect(tctx); err != nil {
- if fwe, ok := err.(*telegram.FloodWaitError); ok {
- return gin.H{"status": "cooling", "message": fmt.Sprintf("FloodWait: 需等待 %d 秒", fwe.Seconds)}
- }
- return gin.H{"status": "fail", "error": err.Error()}
- }
- client.Disconnect()
- return gin.H{"status": "ok", "message": "连接成功"}
- }
- func detectTopPrefix(files []*multipart.FileHeader) string {
- if len(files) == 0 {
- return ""
- }
- first := files[0].Filename
- idx := strings.IndexAny(first, "/\\")
- if idx <= 0 {
- return ""
- }
- return first[:idx]
- }
- func normalizeRel(filename, topPrefix string) string {
- // Convert backslashes to forward slashes for consistent path handling.
- p := strings.ReplaceAll(filename, "\\", "/")
- // Strip the top-level directory name so e.g. "13252753163/tdata/key_datas"
- // becomes "tdata/key_datas".
- if topPrefix != "" && strings.HasPrefix(p, topPrefix+"/") {
- p = strings.TrimPrefix(p, topPrefix+"/")
- }
- // Use path.Clean (forward-slash aware) to resolve . and .. components,
- // then reject any result that escapes the staging root.
- cleaned := path.Clean(p)
- if cleaned == "." || cleaned == ".." {
- return ""
- }
- if strings.HasPrefix(cleaned, "../") {
- return ""
- }
- if filepath.IsAbs(cleaned) || strings.HasPrefix(cleaned, "/") {
- return ""
- }
- return cleaned
- }
- func saveUpload(fh *multipart.FileHeader, dst string) error {
- src, err := fh.Open()
- if err != nil {
- return err
- }
- defer src.Close()
- out, err := os.Create(dst)
- if err != nil {
- return err
- }
- defer out.Close()
- _, err = io.Copy(out, src)
- return err
- }
- // reloadAccounts loads enabled TG accounts from DB and reinitializes the account manager.
- func (h *TgAccountHandler) reloadAccounts() {
- var dbAccounts []model.TgAccount
- h.store.DB.Where("enabled = ?", true).Find(&dbAccounts)
- accounts := make([]telegram.Account, 0, len(dbAccounts))
- for _, a := range dbAccounts {
- accounts = append(accounts, telegram.Account{
- Phone: a.Phone,
- SessionFile: a.SessionFile,
- AppID: a.AppID,
- AppHash: a.AppHash,
- Device: a.Device,
- AppVersion: a.AppVersion,
- SystemVersion: a.SDK,
- LangPack: a.LangPack,
- LangCode: a.LangCode,
- SystemLangCode: a.SystemLangCode,
- })
- }
- h.tgManager.Init(accounts)
- }
|