Эх сурвалжийг харах

feat(handler): TG protocol-number import endpoint with device/2FA fields

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
dot 2 долоо хоног өмнө
parent
commit
7ea0fb57c7

+ 455 - 0
internal/handler/tg_account.go

@@ -0,0 +1,455 @@
+package handler
+
+import (
+	"context"
+	"fmt"
+	"io"
+	"mime/multipart"
+	"os"
+	"path/filepath"
+	"strconv"
+	"strings"
+	"time"
+
+	"spider/internal/model"
+	"spider/internal/store"
+	"spider/internal/telegram"
+	"spider/internal/telegram/sessionimport"
+
+	"github.com/gin-gonic/gin"
+	"github.com/google/uuid"
+)
+
+// TgAccountHandler handles Telegram account management.
+type TgAccountHandler struct {
+	store       *store.Store
+	tgManager   *telegram.AccountManager
+	crypto      *telegram.Crypto
+	sessionsDir string
+	tmpDir      string // <sessionsDir>/.tmp — staging area for multipart uploads
+}
+
+// NewTgAccountHandler builds the handler with dependencies injected from main.
+func NewTgAccountHandler(s *store.Store, tgMgr *telegram.AccountManager, crypto *telegram.Crypto, sessionsDir string) *TgAccountHandler {
+	tmp := filepath.Join(sessionsDir, ".tmp")
+	_ = os.MkdirAll(tmp, 0o755)
+	h := &TgAccountHandler{
+		store:       s,
+		tgManager:   tgMgr,
+		crypto:      crypto,
+		sessionsDir: sessionsDir,
+		tmpDir:      tmp,
+	}
+	go h.tmpCleanupLoop()
+	return h
+}
+
+// tmpCleanupLoop deletes tmp subdirs older than 30 minutes every 10 minutes.
+func (h *TgAccountHandler) tmpCleanupLoop() {
+	t := time.NewTicker(10 * time.Minute)
+	defer t.Stop()
+	for range t.C {
+		entries, err := os.ReadDir(h.tmpDir)
+		if err != nil {
+			continue
+		}
+		cutoff := time.Now().Add(-30 * time.Minute)
+		for _, e := range entries {
+			if !e.IsDir() {
+				continue
+			}
+			info, err := e.Info()
+			if err != nil || info.ModTime().After(cutoff) {
+				continue
+			}
+			_ = os.RemoveAll(filepath.Join(h.tmpDir, e.Name()))
+		}
+	}
+}
+
+// List handles GET /tg-accounts
+func (h *TgAccountHandler) List(c *gin.Context) {
+	var accounts []model.TgAccount
+	h.store.DB.Order("id ASC").Find(&accounts)
+
+	// Enrich with live status from account manager
+	statuses := h.tgManager.GetStatuses()
+	for i := range accounts {
+		if s, ok := statuses[accounts[i].Phone]; ok {
+			accounts[i].Status = s
+		}
+	}
+
+	OK(c, accounts)
+}
+
+// Create handles POST /tg-accounts
+func (h *TgAccountHandler) Create(c *gin.Context) {
+	var req struct {
+		Phone       string `json:"phone" binding:"required"`
+		SessionFile string `json:"session_file" binding:"required"`
+		AppID       int    `json:"app_id" binding:"required"`
+		AppHash     string `json:"app_hash" binding:"required"`
+		Remark      string `json:"remark"`
+	}
+	if err := c.ShouldBindJSON(&req); err != nil {
+		Fail(c, 400, err.Error())
+		return
+	}
+
+	// Check duplicate
+	var count int64
+	h.store.DB.Model(&model.TgAccount{}).Where("phone = ?", req.Phone).Count(&count)
+	if count > 0 {
+		Fail(c, 409, "该手机号已存在")
+		return
+	}
+
+	acc := model.TgAccount{
+		Phone:       req.Phone,
+		SessionFile: req.SessionFile,
+		AppID:       req.AppID,
+		AppHash:     req.AppHash,
+		Remark:      req.Remark,
+		Enabled:     true,
+		Status:      "idle",
+	}
+	if err := h.store.DB.Create(&acc).Error; err != nil {
+		Fail(c, 500, err.Error())
+		return
+	}
+
+	// Reload account manager
+	h.reloadAccounts()
+
+	LogAudit(h.store, c, "create", "tg_account", fmt.Sprintf("%d", acc.ID), gin.H{"phone": acc.Phone})
+	OK(c, acc)
+}
+
+// Update handles PUT /tg-accounts/:id
+func (h *TgAccountHandler) Update(c *gin.Context) {
+	id, err := strconv.ParseUint(c.Param("id"), 10, 64)
+	if err != nil {
+		Fail(c, 400, "invalid id")
+		return
+	}
+
+	var acc model.TgAccount
+	if err := h.store.DB.First(&acc, id).Error; err != nil {
+		Fail(c, 404, "账号不存在")
+		return
+	}
+
+	var req struct {
+		SessionFile *string `json:"session_file"`
+		AppID       *int    `json:"app_id"`
+		AppHash     *string `json:"app_hash"`
+		Remark      *string `json:"remark"`
+		Enabled     *bool   `json:"enabled"`
+	}
+	if err := c.ShouldBindJSON(&req); err != nil {
+		Fail(c, 400, err.Error())
+		return
+	}
+
+	updates := map[string]any{}
+	if req.SessionFile != nil {
+		updates["session_file"] = *req.SessionFile
+	}
+	if req.AppID != nil {
+		updates["app_id"] = *req.AppID
+	}
+	if req.AppHash != nil {
+		updates["app_hash"] = *req.AppHash
+	}
+	if req.Remark != nil {
+		updates["remark"] = *req.Remark
+	}
+	if req.Enabled != nil {
+		updates["enabled"] = *req.Enabled
+	}
+
+	h.store.DB.Model(&acc).Updates(updates)
+	h.store.DB.First(&acc, id)
+
+	h.reloadAccounts()
+
+	LogAudit(h.store, c, "update", "tg_account", fmt.Sprintf("%d", id), updates)
+	OK(c, acc)
+}
+
+// Delete handles DELETE /tg-accounts/:id
+func (h *TgAccountHandler) Delete(c *gin.Context) {
+	id, err := strconv.ParseUint(c.Param("id"), 10, 64)
+	if err != nil {
+		Fail(c, 400, "invalid id")
+		return
+	}
+
+	if err := h.store.DB.Delete(&model.TgAccount{}, id).Error; err != nil {
+		Fail(c, 500, err.Error())
+		return
+	}
+
+	h.reloadAccounts()
+
+	LogAudit(h.store, c, "delete", "tg_account", fmt.Sprintf("%d", id), nil)
+	OK(c, gin.H{"message": "已删除"})
+}
+
+// Test handles POST /tg-accounts/:id/test — tests if the account can connect.
+func (h *TgAccountHandler) Test(c *gin.Context) {
+	id, err := strconv.ParseUint(c.Param("id"), 10, 64)
+	if err != nil {
+		Fail(c, 400, "invalid id")
+		return
+	}
+
+	var acc model.TgAccount
+	if err := h.store.DB.First(&acc, id).Error; err != nil {
+		Fail(c, 404, "账号不存在")
+		return
+	}
+
+	client := telegram.New(telegram.Account{
+		Phone:       acc.Phone,
+		SessionFile: acc.SessionFile,
+		AppID:       acc.AppID,
+		AppHash:     acc.AppHash,
+	})
+
+	ctx, cancel := context.WithTimeout(c.Request.Context(), 30*time.Second)
+	defer cancel()
+
+	if err := client.Connect(ctx); err != nil {
+		if fwe, ok := err.(*telegram.FloodWaitError); ok {
+			OK(c, gin.H{"status": "cooling", "message": fmt.Sprintf("FloodWait: 需等待 %d 秒", fwe.Seconds)})
+			return
+		}
+		OK(c, gin.H{"status": "fail", "error": err.Error()})
+		return
+	}
+	client.Disconnect()
+
+	OK(c, gin.H{"status": "ok", "message": "连接成功"})
+}
+
+// Import handles POST /tg-accounts/import — admin uploads a 协议号 folder.
+func (h *TgAccountHandler) Import(c *gin.Context) {
+	form, err := c.MultipartForm()
+	if err != nil {
+		Fail(c, 400, "invalid multipart form: "+err.Error())
+		return
+	}
+
+	files := form.File["files"]
+	if len(files) == 0 {
+		Fail(c, 400, "no files uploaded (expected field name: files)")
+		return
+	}
+
+	// Stage uploaded files to tmp/<uuid>/.
+	stageID := uuid.NewString()
+	stageDir := filepath.Join(h.tmpDir, stageID)
+	if err := os.MkdirAll(stageDir, 0o755); err != nil {
+		Fail(c, 500, "create stage dir: "+err.Error())
+		return
+	}
+
+	// Top-level folder name in webkitRelativePath (e.g. "13252753163"). Used to
+	// strip the prefix so RelPath inside the Result is relative to the folder.
+	topPrefix := detectTopPrefix(files)
+
+	staged := make([]sessionimport.StagedFile, 0, len(files))
+	for _, fh := range files {
+		// Gin stores the relative path in fh.Filename (browsers send it as the
+		// form part filename when webkitdirectory is used).
+		rel := normalizeRel(fh.Filename, topPrefix)
+		if rel == "" {
+			continue
+		}
+		abs := filepath.Join(stageDir, rel)
+		if err := os.MkdirAll(filepath.Dir(abs), 0o755); err != nil {
+			Fail(c, 500, "mkdir stage: "+err.Error())
+			return
+		}
+		if err := saveUpload(fh, abs); err != nil {
+			Fail(c, 500, "save upload: "+err.Error())
+			return
+		}
+		staged = append(staged, sessionimport.StagedFile{RelPath: rel, AbsPath: abs})
+	}
+
+	ctx, cancel := context.WithTimeout(c.Request.Context(), 60*time.Second)
+	defer cancel()
+
+	res, err := sessionimport.Import(ctx, h.sessionsDir, staged)
+	if err != nil {
+		Fail(c, 422, err.Error())
+		return
+	}
+
+	// Duplicate check.
+	var count int64
+	h.store.DB.Model(&model.TgAccount{}).Where("phone = ?", res.Meta.Phone).Count(&count)
+	if count > 0 {
+		Fail(c, 409, "该手机号已存在")
+		return
+	}
+
+	// Encrypt 2FA if present.
+	var twoFAEnc []byte
+	if res.TwoFAPlaintext != "" && h.crypto != nil {
+		twoFAEnc, err = h.crypto.Encrypt(res.TwoFAPlaintext)
+		if err != nil {
+			Fail(c, 500, "encrypt 2fa: "+err.Error())
+			return
+		}
+	}
+
+	acc := model.TgAccount{
+		Phone:          res.Meta.Phone,
+		SessionFile:    res.SessionFile,
+		AppID:          res.Meta.APIID,
+		AppHash:        res.Meta.APIHash,
+		Enabled:        true,
+		Status:         "idle",
+		TwoFAEnc:       twoFAEnc,
+		Device:         res.Meta.Device,
+		AppVersion:     res.Meta.AppVersion,
+		SDK:            res.Meta.SDK,
+		LangPack:       res.Meta.LangPack,
+		LangCode:       res.Meta.LangCode,
+		SystemLangCode: res.Meta.SystemLangCode,
+		FirstName:      res.Meta.FirstName,
+		LastName:       res.Meta.LastName,
+		TgUsername:     res.Meta.Username,
+		OriginDir:      res.OriginDir,
+		ImportStatus:   "ok",
+		Source:         "protocol",
+	}
+	if err := h.store.DB.Create(&acc).Error; err != nil {
+		if strings.Contains(strings.ToLower(err.Error()), "duplicate") {
+			Fail(c, 409, "该手机号已存在")
+			return
+		}
+		Fail(c, 500, "db create: "+err.Error())
+		return
+	}
+
+	// Rebuild account manager so new account is Acquireable.
+	h.reloadAccounts()
+
+	// Auto-test.
+	testResult := h.runTest(c.Request.Context(), &acc)
+	if testResult["status"] == "fail" {
+		h.store.DB.Model(&acc).Updates(map[string]any{
+			"enabled":       false,
+			"import_status": "dead",
+		})
+	}
+
+	LogAudit(h.store, c, "import", "tg_account", fmt.Sprintf("%d", acc.ID), gin.H{
+		"phone":  acc.Phone,
+		"source": res.UsedSource,
+	})
+
+	// Reload original so JSON response reflects updates.
+	h.store.DB.First(&acc, acc.ID)
+
+	// Cleanup stage dir on success.
+	_ = os.RemoveAll(stageDir)
+
+	OK(c, gin.H{"account": acc, "test_result": testResult})
+}
+
+// runTest calls Client.Connect + waits for ready — no GetMe, kept minimal.
+func (h *TgAccountHandler) runTest(ctx context.Context, acc *model.TgAccount) gin.H {
+	client := telegram.New(telegram.Account{
+		Phone:          acc.Phone,
+		SessionFile:    acc.SessionFile,
+		AppID:          acc.AppID,
+		AppHash:        acc.AppHash,
+		Device:         acc.Device,
+		AppVersion:     acc.AppVersion,
+		SystemVersion:  acc.SDK,
+		LangPack:       acc.LangPack,
+		LangCode:       acc.LangCode,
+		SystemLangCode: acc.SystemLangCode,
+	})
+	tctx, cancel := context.WithTimeout(ctx, 30*time.Second)
+	defer cancel()
+	if err := client.Connect(tctx); err != nil {
+		if fwe, ok := err.(*telegram.FloodWaitError); ok {
+			return gin.H{"status": "cooling", "message": fmt.Sprintf("FloodWait: 需等待 %d 秒", fwe.Seconds)}
+		}
+		return gin.H{"status": "fail", "error": err.Error()}
+	}
+	client.Disconnect()
+	return gin.H{"status": "ok", "message": "连接成功"}
+}
+
+func detectTopPrefix(files []*multipart.FileHeader) string {
+	if len(files) == 0 {
+		return ""
+	}
+	first := files[0].Filename
+	idx := strings.IndexAny(first, "/\\")
+	if idx <= 0 {
+		return ""
+	}
+	return first[:idx]
+}
+
+func normalizeRel(filename, topPrefix string) string {
+	// Convert backslashes to forward slashes for consistent path handling.
+	p := strings.ReplaceAll(filename, "\\", "/")
+	// Strip the top-level directory name so e.g. "13252753163/tdata/key_datas"
+	// becomes "tdata/key_datas".
+	if topPrefix != "" && strings.HasPrefix(p, topPrefix+"/") {
+		p = strings.TrimPrefix(p, topPrefix+"/")
+	}
+	// Reject any path that tries to escape.
+	if strings.Contains(p, "../") || strings.HasPrefix(p, "/") {
+		return ""
+	}
+	return p
+}
+
+func saveUpload(fh *multipart.FileHeader, dst string) error {
+	src, err := fh.Open()
+	if err != nil {
+		return err
+	}
+	defer src.Close()
+	out, err := os.Create(dst)
+	if err != nil {
+		return err
+	}
+	defer out.Close()
+	_, err = io.Copy(out, src)
+	return err
+}
+
+// reloadAccounts loads enabled TG accounts from DB and reinitializes the account manager.
+func (h *TgAccountHandler) reloadAccounts() {
+	var dbAccounts []model.TgAccount
+	h.store.DB.Where("enabled = ?", true).Find(&dbAccounts)
+
+	accounts := make([]telegram.Account, 0, len(dbAccounts))
+	for _, a := range dbAccounts {
+		accounts = append(accounts, telegram.Account{
+			Phone:          a.Phone,
+			SessionFile:    a.SessionFile,
+			AppID:          a.AppID,
+			AppHash:        a.AppHash,
+			Device:         a.Device,
+			AppVersion:     a.AppVersion,
+			SystemVersion:  a.SDK,
+			LangPack:       a.LangPack,
+			LangCode:       a.LangCode,
+			SystemLangCode: a.SystemLangCode,
+		})
+	}
+	h.tgManager.Init(accounts)
+}