Merge branch 'dev' into dev-karas
Need to get a fresh openapi description for auth
This commit is contained in:
commit
3bbd2c2818
30 changed files with 1247 additions and 370 deletions
|
|
@ -47,10 +47,28 @@ func CheckPassword(password, hash string) (bool, error) {
|
|||
return argon2id.ComparePasswordAndHash(password, hash)
|
||||
}
|
||||
|
||||
func (s Server) generateImpersonationToken(userID string, impersonated_by string) (accessToken string, err error) {
|
||||
accessClaims := jwt.MapClaims{
|
||||
"user_id": userID,
|
||||
"exp": time.Now().Add(15 * time.Minute).Unix(),
|
||||
"imp_id": impersonated_by,
|
||||
}
|
||||
|
||||
at := jwt.NewWithClaims(jwt.SigningMethodHS256, accessClaims)
|
||||
|
||||
accessToken, err = at.SignedString([]byte(s.JwtPrivateKey))
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return accessToken, nil
|
||||
}
|
||||
|
||||
func (s Server) generateTokens(userID string) (accessToken string, refreshToken string, csrfToken string, err error) {
|
||||
accessClaims := jwt.MapClaims{
|
||||
"user_id": userID,
|
||||
"exp": time.Now().Add(15 * time.Minute).Unix(),
|
||||
//TODO: add created_at
|
||||
}
|
||||
at := jwt.NewWithClaims(jwt.SigningMethodHS256, accessClaims)
|
||||
accessToken, err = at.SignedString([]byte(s.JwtPrivateKey))
|
||||
|
|
@ -119,10 +137,7 @@ func (s Server) PostSignIn(ctx context.Context, req auth.PostSignInRequestObject
|
|||
// TODO: return 500
|
||||
}
|
||||
if !ok {
|
||||
err_msg := "invalid credentials"
|
||||
return auth.PostSignIn401JSONResponse{
|
||||
Error: &err_msg,
|
||||
}, nil
|
||||
return auth.PostSignIn401Response{}, nil
|
||||
}
|
||||
|
||||
accessToken, refreshToken, csrfToken, err := s.generateTokens(req.Body.Nickname)
|
||||
|
|
@ -144,47 +159,65 @@ func (s Server) PostSignIn(ctx context.Context, req auth.PostSignInRequestObject
|
|||
return result, nil
|
||||
}
|
||||
|
||||
// func (s Server) PostAuthVerifyToken(ctx context.Context, req auth.PostAuthVerifyTokenRequestObject) (auth.PostAuthVerifyTokenResponseObject, error) {
|
||||
// valid := false
|
||||
// var userID *string
|
||||
// var errStr *string
|
||||
func (s Server) GetImpersonationToken(ctx context.Context, req auth.GetImpersonationTokenRequestObject) (auth.GetImpersonationTokenResponseObject, error) {
|
||||
ginCtx, ok := ctx.Value(gin.ContextKey).(*gin.Context)
|
||||
if !ok {
|
||||
log.Print("failed to get gin context")
|
||||
// TODO: change to 500
|
||||
return auth.GetImpersonationToken200JSONResponse{}, fmt.Errorf("failed to get gin.Context from context.Context")
|
||||
}
|
||||
|
||||
// token, err := jwt.Parse(req.Body.Token, func(t *jwt.Token) (interface{}, error) {
|
||||
// if _, ok := t.Method.(*jwt.SigningMethodHMAC); !ok {
|
||||
// return nil, fmt.Errorf("unexpected signing method")
|
||||
// }
|
||||
// return accessSecret, nil
|
||||
// })
|
||||
token, err := ExtractBearerToken(ginCtx.Request.Header.Get("Authorization"))
|
||||
if err != nil {
|
||||
// TODO: return 500
|
||||
log.Errorf("failed to extract bearer token: %v", err)
|
||||
return auth.GetImpersonationToken401Response{}, err
|
||||
}
|
||||
log.Printf("got auth token: %s", token)
|
||||
|
||||
// if err != nil {
|
||||
// e := err.Error()
|
||||
// errStr = &e
|
||||
// return auth.PostAuthVerifyToken200JSONResponse{
|
||||
// Valid: &valid,
|
||||
// UserId: userID,
|
||||
// Error: errStr,
|
||||
// }, nil
|
||||
// }
|
||||
ext_service, err := s.db.GetExternalServiceByToken(context.Background(), &token)
|
||||
if err != nil {
|
||||
log.Errorf("failed to get external service by token: %v", err)
|
||||
return auth.GetImpersonationToken401Response{}, err
|
||||
// TODO: check err and retyrn 400/500
|
||||
}
|
||||
|
||||
// if claims, ok := token.Claims.(jwt.MapClaims); ok && token.Valid {
|
||||
// if uid, ok := claims["user_id"].(string); ok {
|
||||
// valid = true
|
||||
// userID = &uid
|
||||
// } else {
|
||||
// e := "user_id not found in token"
|
||||
// errStr = &e
|
||||
// }
|
||||
// } else {
|
||||
// e := "invalid token claims"
|
||||
// errStr = &e
|
||||
// }
|
||||
var user_id string = ""
|
||||
|
||||
// return auth.PostAuthVerifyToken200JSONResponse{
|
||||
// Valid: &valid,
|
||||
// UserId: userID,
|
||||
// Error: errStr,
|
||||
// }, nil
|
||||
// }
|
||||
if req.Body.ExternalId != nil {
|
||||
user, err := s.db.GetUserByExternalServiceId(context.Background(), sqlc.GetUserByExternalServiceIdParams{
|
||||
ExternalID: fmt.Sprintf("%d", *req.Body.ExternalId),
|
||||
ServiceID: ext_service.ID,
|
||||
})
|
||||
if err != nil {
|
||||
log.Errorf("failed to get user by external user id: %v", err)
|
||||
return auth.GetImpersonationToken401Response{}, err
|
||||
// TODO: check err and retyrn 400/500
|
||||
}
|
||||
|
||||
user_id = fmt.Sprintf("%d", user.ID)
|
||||
}
|
||||
|
||||
if req.Body.UserId != nil {
|
||||
// TODO: check user existence
|
||||
if user_id != "" && user_id != fmt.Sprintf("%d", *req.Body.UserId) {
|
||||
log.Error("user_id and external_d are incorrect")
|
||||
// TODO: 405
|
||||
return auth.GetImpersonationToken401Response{}, nil
|
||||
} else {
|
||||
user_id = fmt.Sprintf("%d", *req.Body.UserId)
|
||||
}
|
||||
}
|
||||
|
||||
accessToken, err := s.generateImpersonationToken(user_id, fmt.Sprintf("%d", ext_service.ID))
|
||||
if err != nil {
|
||||
log.Errorf("failed to generate impersonation token: %v", err)
|
||||
return auth.GetImpersonationToken401Response{}, err
|
||||
// TODO: check err and retyrn 400/500
|
||||
}
|
||||
|
||||
return auth.GetImpersonationToken200JSONResponse{AccessToken: accessToken}, nil
|
||||
}
|
||||
|
||||
// func (s Server) PostAuthRefreshToken(ctx context.Context, req auth.PostAuthRefreshTokenRequestObject) (auth.PostAuthRefreshTokenResponseObject, error) {
|
||||
// valid := false
|
||||
|
|
@ -236,3 +269,11 @@ func (s Server) PostSignIn(ctx context.Context, req auth.PostSignInRequestObject
|
|||
// Error: errStr,
|
||||
// }, nil
|
||||
// }
|
||||
|
||||
func ExtractBearerToken(header string) (string, error) {
|
||||
const prefix = "Bearer "
|
||||
if len(header) <= len(prefix) || header[:len(prefix)] != prefix {
|
||||
return "", fmt.Errorf("invalid bearer token format")
|
||||
}
|
||||
return header[len(prefix):], nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -9,3 +9,13 @@ INTO users (passhash, nickname)
|
|||
VALUES (sqlc.arg(passhash), sqlc.arg(nickname))
|
||||
RETURNING id;
|
||||
|
||||
-- name: GetExternalServiceByToken :one
|
||||
SELECT *
|
||||
FROM external_services
|
||||
WHERE auth_token = sqlc.arg('auth_token');
|
||||
|
||||
-- name: GetUserByExternalServiceId :one
|
||||
SELECT u.*
|
||||
FROM users u
|
||||
LEFT JOIN external_ids ei ON eu.user_id = u.id
|
||||
WHERE ei.external_id = sqlc.arg('external_id') AND ei.service_id = sqlc.arg('service_id');
|
||||
|
|
@ -9,24 +9,24 @@ import (
|
|||
"strconv"
|
||||
)
|
||||
|
||||
type Handler struct {
|
||||
publisher *rmq.Publisher
|
||||
}
|
||||
// type Handler struct {
|
||||
// publisher *rmq.Publisher
|
||||
// }
|
||||
|
||||
func New(publisher *rmq.Publisher) *Handler {
|
||||
return &Handler{publisher: publisher}
|
||||
}
|
||||
// func New(publisher *rmq.Publisher) *Handler {
|
||||
// return &Handler{publisher: publisher}
|
||||
// }
|
||||
|
||||
type Server struct {
|
||||
db *sqlc.Queries
|
||||
publisher *rmq.Publisher
|
||||
db *sqlc.Queries
|
||||
// publisher *rmq.Publisher
|
||||
RPCclient *rmq.RPCClient
|
||||
}
|
||||
|
||||
func NewServer(db *sqlc.Queries, publisher *rmq.Publisher, rpcclient *rmq.RPCClient) *Server {
|
||||
func NewServer(db *sqlc.Queries, rpcclient *rmq.RPCClient) *Server {
|
||||
return &Server{
|
||||
db: db,
|
||||
publisher: publisher,
|
||||
db: db,
|
||||
// publisher: publisher,
|
||||
RPCclient: rpcclient,
|
||||
}
|
||||
}
|
||||
|
|
@ -73,6 +73,14 @@ func (s Server) mapTitle(title sqlc.GetTitleByIDRow) (oapi.Title, error) {
|
|||
}
|
||||
oapi_title.TitleNames = title_names
|
||||
|
||||
if len(title.TitleDesc) > 0 {
|
||||
title_descs := make(map[string]string, 0)
|
||||
err = json.Unmarshal(title.TitleDesc, &title_descs)
|
||||
if err != nil {
|
||||
return oapi.Title{}, fmt.Errorf("unmarshal TitleDesc: %v", err)
|
||||
}
|
||||
oapi_title.TitleDesc = &title_descs
|
||||
}
|
||||
if len(title.EpisodesLen) > 0 {
|
||||
episodes_lens := make(map[string]float64, 0)
|
||||
err = json.Unmarshal(title.EpisodesLen, &episodes_lens)
|
||||
|
|
|
|||
141
modules/backend/handlers/images.go
Normal file
141
modules/backend/handlers/images.go
Normal file
|
|
@ -0,0 +1,141 @@
|
|||
package handlers
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"image"
|
||||
"image/jpeg"
|
||||
"image/png"
|
||||
"io"
|
||||
"net/http"
|
||||
oapi "nyanimedb/api"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
"github.com/disintegration/imaging"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"golang.org/x/image/webp"
|
||||
)
|
||||
|
||||
// PostMediaUpload implements oapi.StrictServerInterface.
|
||||
func (s *Server) PostMediaUpload(ctx context.Context, request oapi.PostMediaUploadRequestObject) (oapi.PostMediaUploadResponseObject, error) {
|
||||
// Получаем multipart body
|
||||
mp := request.MultipartBody
|
||||
if mp == nil {
|
||||
log.Errorf("PostMedia without body")
|
||||
return oapi.PostMediaUpload400JSONResponse("Multipart body is required"), nil
|
||||
}
|
||||
|
||||
// Парсим первую часть (предполагаем, что файл в поле "file")
|
||||
part, err := mp.NextPart()
|
||||
if err != nil {
|
||||
log.Errorf("PostMedia without file")
|
||||
return oapi.PostMediaUpload400JSONResponse("File required"), nil
|
||||
}
|
||||
defer part.Close()
|
||||
|
||||
// Читаем ВЕСЬ файл в память (для небольших изображений — нормально)
|
||||
// Если файлы могут быть большими — используйте лимитированный буфер (см. ниже)
|
||||
data, err := io.ReadAll(part)
|
||||
if err != nil {
|
||||
log.Errorf("PostMedia cannot read file")
|
||||
return oapi.PostMediaUpload400JSONResponse("File required"), nil
|
||||
}
|
||||
|
||||
if len(data) == 0 {
|
||||
log.Errorf("PostMedia empty file")
|
||||
return oapi.PostMediaUpload400JSONResponse("Empty file"), nil
|
||||
}
|
||||
|
||||
// Проверка MIME по первым 512 байтам
|
||||
mimeType := http.DetectContentType(data)
|
||||
if mimeType != "image/jpeg" && mimeType != "image/png" && mimeType != "image/webp" {
|
||||
log.Errorf("PostMedia bad type")
|
||||
return oapi.PostMediaUpload400JSONResponse("Bad data type"), nil
|
||||
}
|
||||
|
||||
// Декодируем изображение из буфера
|
||||
var img image.Image
|
||||
switch mimeType {
|
||||
case "image/jpeg":
|
||||
{
|
||||
img, err = jpeg.Decode(bytes.NewReader(data))
|
||||
if err != nil {
|
||||
log.Errorf("PostMedia cannot decode file: %v", err)
|
||||
return oapi.PostMediaUpload500Response{}, nil
|
||||
}
|
||||
}
|
||||
case "image/png":
|
||||
{
|
||||
img, err = png.Decode(bytes.NewReader(data))
|
||||
if err != nil {
|
||||
log.Errorf("PostMedia cannot decode file: %v", err)
|
||||
return oapi.PostMediaUpload500Response{}, nil
|
||||
}
|
||||
}
|
||||
case "image/webp":
|
||||
{
|
||||
img, err = webp.Decode(bytes.NewReader(data))
|
||||
if err != nil {
|
||||
log.Errorf("PostMedia cannot decode file: %v", err)
|
||||
return oapi.PostMediaUpload500Response{}, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var buf bytes.Buffer
|
||||
err = imaging.Encode(&buf, img, imaging.PNG)
|
||||
if err != nil {
|
||||
log.Errorf("PostMedia failed to re-encode JPEG: %v", err)
|
||||
return oapi.PostMediaUpload500Response{}, nil
|
||||
}
|
||||
|
||||
// TODO: to delete
|
||||
filename := part.FileName()
|
||||
if filename == "" {
|
||||
filename = "upload_" + generateRandomHex(8) + ".jpg"
|
||||
} else {
|
||||
filename = sanitizeFilename(filename)
|
||||
if !strings.HasSuffix(strings.ToLower(filename), ".png") {
|
||||
filename += ".png"
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: пойти на хуй ( вызвать файловую помойку)
|
||||
os.Mkdir("uploads", 0644)
|
||||
err = os.WriteFile(filepath.Join("./uploads", filename), buf.Bytes(), 0644)
|
||||
if err != nil {
|
||||
log.Errorf("PostMedia failed to write: %v", err)
|
||||
return oapi.PostMediaUpload500Response{}, nil
|
||||
}
|
||||
|
||||
return oapi.PostMediaUpload200JSONResponse{}, nil
|
||||
}
|
||||
|
||||
// Вспомогательные функции — как раньше
|
||||
func generateRandomHex(n int) string {
|
||||
b := make([]byte, n)
|
||||
for i := range b {
|
||||
b[i] = byte('a' + (i % 16))
|
||||
}
|
||||
return fmt.Sprintf("%x", b)
|
||||
}
|
||||
|
||||
func sanitizeFilename(name string) string {
|
||||
var clean strings.Builder
|
||||
for _, r := range name {
|
||||
if (r >= 'a' && r <= 'z') ||
|
||||
(r >= 'A' && r <= 'Z') ||
|
||||
(r >= '0' && r <= '9') ||
|
||||
r == '.' || r == '_' || r == '-' {
|
||||
clean.WriteRune(r)
|
||||
}
|
||||
}
|
||||
s := clean.String()
|
||||
if s == "" {
|
||||
return "file"
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
|
@ -197,7 +197,6 @@ func (s Server) GetTitles(ctx context.Context, request oapi.GetTitlesRequestObje
|
|||
// Делаем RPC-вызов — и ЖДЁМ ответа
|
||||
err := s.RPCclient.Call(
|
||||
ctx,
|
||||
"svc.media.process.requests", // ← очередь микросервиса
|
||||
mqreq,
|
||||
&reply,
|
||||
)
|
||||
|
|
|
|||
|
|
@ -69,6 +69,16 @@ func sqlDate2oapi(p_date pgtype.Timestamptz) *time.Time {
|
|||
return nil
|
||||
}
|
||||
|
||||
func oapiDate2sql(t *time.Time) pgtype.Timestamptz {
|
||||
if t == nil {
|
||||
return pgtype.Timestamptz{Valid: false}
|
||||
}
|
||||
return pgtype.Timestamptz{
|
||||
Time: *t,
|
||||
Valid: true,
|
||||
}
|
||||
}
|
||||
|
||||
// func UserTitleStatus2Sqlc(s *[]oapi.UserTitleStatus) (*SqlcUserStatus, error) {
|
||||
// var sqlc_status SqlcUserStatus
|
||||
// if s == nil {
|
||||
|
|
@ -365,6 +375,7 @@ func (s Server) AddUserTitle(ctx context.Context, request oapi.AddUserTitleReque
|
|||
TitleID: request.Body.TitleId,
|
||||
Status: *status,
|
||||
Rate: request.Body.Rate,
|
||||
Ftime: oapiDate2sql(request.Body.Ftime),
|
||||
}
|
||||
|
||||
user_title, err := s.db.InsertUserTitle(ctx, params)
|
||||
|
|
@ -428,6 +439,7 @@ func (s Server) UpdateUserTitle(ctx context.Context, request oapi.UpdateUserTitl
|
|||
Rate: request.Body.Rate,
|
||||
UserID: request.UserId,
|
||||
TitleID: request.TitleId,
|
||||
Ftime: oapiDate2sql(request.Body.Ftime),
|
||||
}
|
||||
|
||||
user_title, err := s.db.UpdateUserTitle(ctx, params)
|
||||
|
|
@ -485,3 +497,39 @@ func (s Server) GetUserTitle(ctx context.Context, request oapi.GetUserTitleReque
|
|||
|
||||
return oapi.GetUserTitle200JSONResponse(oapi_usertitle), nil
|
||||
}
|
||||
|
||||
// GetUsers implements oapi.StrictServerInterface.
|
||||
func (s *Server) GetUsers(ctx context.Context, request oapi.GetUsersRequestObject) (oapi.GetUsersResponseObject, error) {
|
||||
params := sqlc.SearchUserParams{
|
||||
Word: request.Params.Word,
|
||||
Cursor: request.Params.CursorId,
|
||||
Limit: request.Params.Limit,
|
||||
}
|
||||
_users, err := s.db.SearchUser(ctx, params)
|
||||
if err != nil {
|
||||
log.Errorf("%v", err)
|
||||
return oapi.GetUsers500Response{}, nil
|
||||
}
|
||||
if len(_users) == 0 {
|
||||
return oapi.GetUsers204Response{}, nil
|
||||
}
|
||||
|
||||
var users []oapi.User
|
||||
var cursor int64
|
||||
for _, user := range _users {
|
||||
oapi_user := oapi.User{ // maybe its possible to make one sqlc type and use one map func iinstead of this shit
|
||||
// add image
|
||||
CreationDate: &user.CreationDate,
|
||||
DispName: user.DispName,
|
||||
Id: &user.ID,
|
||||
Mail: StringToEmail(user.Mail),
|
||||
Nickname: user.Nickname,
|
||||
UserDesc: user.UserDesc,
|
||||
}
|
||||
users = append(users, oapi_user)
|
||||
|
||||
cursor = user.ID
|
||||
}
|
||||
|
||||
return oapi.GetUsers200JSONResponse{Data: users, Cursor: cursor}, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -59,10 +59,9 @@ func main() {
|
|||
}
|
||||
defer rmqConn.Close()
|
||||
|
||||
publisher := rmq.NewPublisher(rmqConn)
|
||||
rpcClient := rmq.NewRPCClient(rmqConn, 30*time.Second)
|
||||
|
||||
server := handlers.NewServer(queries, publisher, rpcClient)
|
||||
server := handlers.NewServer(queries, rpcClient)
|
||||
|
||||
r.Use(cors.New(cors.Config{
|
||||
AllowOrigins: []string{AppConfig.ServiceAddress},
|
||||
|
|
|
|||
|
|
@ -23,6 +23,37 @@ FROM users as t
|
|||
LEFT JOIN images as i ON (t.avatar_id = i.id)
|
||||
WHERE t.id = sqlc.arg('id')::bigint;
|
||||
|
||||
-- name: SearchUser :many
|
||||
SELECT
|
||||
u.id AS id,
|
||||
u.avatar_id AS avatar_id,
|
||||
u.mail AS mail,
|
||||
u.nickname AS nickname,
|
||||
u.disp_name AS disp_name,
|
||||
u.user_desc AS user_desc,
|
||||
u.creation_date AS creation_date,
|
||||
i.storage_type AS storage_type,
|
||||
i.image_path AS image_path
|
||||
FROM users AS u
|
||||
LEFT JOIN images AS i ON u.avatar_id = i.id
|
||||
WHERE
|
||||
(
|
||||
sqlc.narg('word')::text IS NULL
|
||||
OR (
|
||||
SELECT bool_and(
|
||||
u.nickname ILIKE ('%' || term || '%')
|
||||
OR u.disp_name ILIKE ('%' || term || '%')
|
||||
)
|
||||
FROM unnest(string_to_array(trim(sqlc.narg('word')::text), ' ')) AS term
|
||||
WHERE term <> ''
|
||||
)
|
||||
)
|
||||
AND (
|
||||
sqlc.narg('cursor')::int IS NULL
|
||||
OR u.id > sqlc.narg('cursor')::int
|
||||
)
|
||||
ORDER BY u.id ASC
|
||||
LIMIT COALESCE(sqlc.narg('limit')::int, 20);
|
||||
|
||||
-- name: GetStudioByID :one
|
||||
SELECT *
|
||||
|
|
@ -369,13 +400,14 @@ FROM reviews
|
|||
WHERE review_id = sqlc.arg('review_id')::bigint;
|
||||
|
||||
-- name: InsertUserTitle :one
|
||||
INSERT INTO usertitles (user_id, title_id, status, rate, review_id)
|
||||
INSERT INTO usertitles (user_id, title_id, status, rate, review_id, ctime)
|
||||
VALUES (
|
||||
sqlc.arg('user_id')::bigint,
|
||||
sqlc.arg('title_id')::bigint,
|
||||
sqlc.arg('status')::usertitle_status_t,
|
||||
sqlc.narg('rate')::int,
|
||||
sqlc.narg('review_id')::bigint
|
||||
sqlc.narg('review_id')::bigint,
|
||||
sqlc.narg('ftime')::timestamptz
|
||||
)
|
||||
RETURNING user_id, title_id, status, rate, review_id, ctime;
|
||||
|
||||
|
|
@ -384,7 +416,8 @@ RETURNING user_id, title_id, status, rate, review_id, ctime;
|
|||
UPDATE usertitles
|
||||
SET
|
||||
status = COALESCE(sqlc.narg('status')::usertitle_status_t, status),
|
||||
rate = COALESCE(sqlc.narg('rate')::int, rate)
|
||||
rate = COALESCE(sqlc.narg('rate')::int, rate),
|
||||
ctime = COALESCE(sqlc.narg('ftime')::timestamptz, ctime)
|
||||
WHERE
|
||||
user_id = sqlc.arg('user_id')
|
||||
AND title_id = sqlc.arg('title_id')
|
||||
|
|
|
|||
|
|
@ -4,13 +4,16 @@ import (
|
|||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
oapi "nyanimedb/api"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
oapi "nyanimedb/api"
|
||||
|
||||
amqp "github.com/rabbitmq/amqp091-go"
|
||||
)
|
||||
|
||||
const RPCQueueName = "anime_import_rpc"
|
||||
|
||||
// RabbitRequest не меняем
|
||||
type RabbitRequest struct {
|
||||
Name string `json:"name"`
|
||||
Statuses []oapi.TitleStatus `json:"statuses,omitempty"`
|
||||
|
|
@ -20,151 +23,6 @@ type RabbitRequest struct {
|
|||
Timestamp time.Time `json:"timestamp"`
|
||||
}
|
||||
|
||||
// Publisher — потокобезопасный публикатор с пулом каналов.
|
||||
type Publisher struct {
|
||||
conn *amqp.Connection
|
||||
pool *sync.Pool
|
||||
}
|
||||
|
||||
// NewPublisher создаёт новый Publisher.
|
||||
// conn должен быть уже установленным и healthy.
|
||||
// Рекомендуется передавать durable connection с reconnect-логикой.
|
||||
func NewPublisher(conn *amqp.Connection) *Publisher {
|
||||
return &Publisher{
|
||||
conn: conn,
|
||||
pool: &sync.Pool{
|
||||
New: func() any {
|
||||
ch, err := conn.Channel()
|
||||
if err != nil {
|
||||
// Паника уместна: невозможность открыть канал — критическая ошибка инициализации
|
||||
panic(fmt.Errorf("rmqpool: failed to create channel: %w", err))
|
||||
}
|
||||
return ch
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// Publish публикует сообщение в указанную очередь.
|
||||
// Очередь объявляется как durable (если не существует).
|
||||
// Поддерживает context для отмены/таймаута.
|
||||
func (p *Publisher) Publish(
|
||||
ctx context.Context,
|
||||
queueName string,
|
||||
payload RabbitRequest,
|
||||
opts ...PublishOption,
|
||||
) error {
|
||||
// Применяем опции
|
||||
options := &publishOptions{
|
||||
contentType: "application/json",
|
||||
deliveryMode: amqp.Persistent,
|
||||
timestamp: time.Now(),
|
||||
}
|
||||
for _, opt := range opts {
|
||||
opt(options)
|
||||
}
|
||||
|
||||
// Сериализуем payload
|
||||
body, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return fmt.Errorf("rmqpool: failed to marshal payload: %w", err)
|
||||
}
|
||||
|
||||
// Берём канал из пула
|
||||
ch := p.getChannel()
|
||||
if ch == nil {
|
||||
return fmt.Errorf("rmqpool: channel is nil (connection may be closed)")
|
||||
}
|
||||
defer p.returnChannel(ch)
|
||||
|
||||
// Объявляем очередь (idempotent)
|
||||
q, err := ch.QueueDeclare(
|
||||
queueName,
|
||||
true, // durable
|
||||
false, // auto-delete
|
||||
false, // exclusive
|
||||
false, // no-wait
|
||||
nil, // args
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("rmqpool: failed to declare queue %q: %w", queueName, err)
|
||||
}
|
||||
|
||||
// Подготавливаем сообщение
|
||||
msg := amqp.Publishing{
|
||||
DeliveryMode: options.deliveryMode,
|
||||
ContentType: options.contentType,
|
||||
Timestamp: options.timestamp,
|
||||
Body: body,
|
||||
}
|
||||
|
||||
// Публикуем с учётом контекста
|
||||
done := make(chan error, 1)
|
||||
go func() {
|
||||
err := ch.Publish(
|
||||
"", // exchange (default)
|
||||
q.Name, // routing key
|
||||
false, // mandatory
|
||||
false, // immediate
|
||||
msg,
|
||||
)
|
||||
done <- err
|
||||
}()
|
||||
|
||||
select {
|
||||
case err := <-done:
|
||||
return err
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Publisher) getChannel() *amqp.Channel {
|
||||
raw := p.pool.Get()
|
||||
if raw == nil {
|
||||
ch, _ := p.conn.Channel()
|
||||
return ch
|
||||
}
|
||||
ch := raw.(*amqp.Channel)
|
||||
if ch.IsClosed() { // ← теперь есть!
|
||||
ch.Close() // освободить ресурсы
|
||||
ch, _ = p.conn.Channel()
|
||||
}
|
||||
return ch
|
||||
}
|
||||
|
||||
// returnChannel возвращает канал в пул, если он жив.
|
||||
func (p *Publisher) returnChannel(ch *amqp.Channel) {
|
||||
if ch != nil && !ch.IsClosed() {
|
||||
p.pool.Put(ch)
|
||||
}
|
||||
}
|
||||
|
||||
// PublishOption позволяет кастомизировать публикацию.
|
||||
type PublishOption func(*publishOptions)
|
||||
|
||||
type publishOptions struct {
|
||||
contentType string
|
||||
deliveryMode uint8
|
||||
timestamp time.Time
|
||||
}
|
||||
|
||||
// WithContentType устанавливает Content-Type (по умолчанию "application/json").
|
||||
func WithContentType(ct string) PublishOption {
|
||||
return func(o *publishOptions) { o.contentType = ct }
|
||||
}
|
||||
|
||||
// WithTransient делает сообщение transient (не сохраняется на диск).
|
||||
// По умолчанию — Persistent.
|
||||
func WithTransient() PublishOption {
|
||||
return func(o *publishOptions) { o.deliveryMode = amqp.Transient }
|
||||
}
|
||||
|
||||
// WithTimestamp устанавливает кастомную метку времени.
|
||||
func WithTimestamp(ts time.Time) PublishOption {
|
||||
return func(o *publishOptions) { o.timestamp = ts }
|
||||
}
|
||||
|
||||
type RPCClient struct {
|
||||
conn *amqp.Connection
|
||||
timeout time.Duration
|
||||
|
|
@ -174,37 +32,48 @@ func NewRPCClient(conn *amqp.Connection, timeout time.Duration) *RPCClient {
|
|||
return &RPCClient{conn: conn, timeout: timeout}
|
||||
}
|
||||
|
||||
// Call отправляет запрос в очередь и ждёт ответа.
|
||||
// replyPayload — указатель на структуру, в которую раскодировать ответ (например, &MediaResponse{}).
|
||||
func (c *RPCClient) Call(
|
||||
ctx context.Context,
|
||||
requestQueue string,
|
||||
request RabbitRequest,
|
||||
replyPayload any,
|
||||
) error {
|
||||
// 1. Создаём временный канал (не из пула!)
|
||||
|
||||
// 1. Канал для запроса и ответа
|
||||
ch, err := c.conn.Channel()
|
||||
if err != nil {
|
||||
return fmt.Errorf("channel: %w", err)
|
||||
}
|
||||
defer ch.Close()
|
||||
|
||||
// 2. Создаём временную очередь для ответов
|
||||
q, err := ch.QueueDeclare(
|
||||
"", // auto name
|
||||
false, // not durable
|
||||
true, // exclusive
|
||||
true, // auto-delete
|
||||
// 2. Декларируем фиксированную очередь RPC (идемпотентно)
|
||||
_, err = ch.QueueDeclare(
|
||||
RPCQueueName,
|
||||
true, // durable
|
||||
false, // auto-delete
|
||||
false, // exclusive
|
||||
false, // no-wait
|
||||
nil,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("declare rpc queue: %w", err)
|
||||
}
|
||||
|
||||
// 3. Создаём временную очередь ДЛЯ ОТВЕТА
|
||||
replyQueue, err := ch.QueueDeclare(
|
||||
"",
|
||||
false,
|
||||
true,
|
||||
true,
|
||||
false,
|
||||
nil,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("reply queue: %w", err)
|
||||
return fmt.Errorf("declare reply queue: %w", err)
|
||||
}
|
||||
|
||||
// 3. Подписываемся на ответы
|
||||
// 4. Подписываемся на очередь ответов
|
||||
msgs, err := ch.Consume(
|
||||
q.Name,
|
||||
replyQueue.Name,
|
||||
"",
|
||||
true, // auto-ack
|
||||
true, // exclusive
|
||||
|
|
@ -213,28 +82,28 @@ func (c *RPCClient) Call(
|
|||
nil,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("consume: %w", err)
|
||||
return fmt.Errorf("consume reply: %w", err)
|
||||
}
|
||||
|
||||
// 4. Готовим correlation ID
|
||||
corrID := time.Now().UnixNano()
|
||||
// correlation ID
|
||||
corrID := fmt.Sprintf("%d", time.Now().UnixNano())
|
||||
|
||||
// 5. Сериализуем запрос
|
||||
// 5. сериализация запроса
|
||||
body, err := json.Marshal(request)
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshal request: %w", err)
|
||||
}
|
||||
|
||||
// 6. Публикуем запрос
|
||||
// 6. Публикация RPC-запроса
|
||||
err = ch.Publish(
|
||||
"",
|
||||
requestQueue,
|
||||
RPCQueueName, // ← фиксированная очередь!
|
||||
false,
|
||||
false,
|
||||
amqp.Publishing{
|
||||
ContentType: "application/json",
|
||||
CorrelationId: fmt.Sprintf("%d", corrID),
|
||||
ReplyTo: q.Name,
|
||||
CorrelationId: corrID,
|
||||
ReplyTo: replyQueue.Name,
|
||||
Timestamp: time.Now(),
|
||||
Body: body,
|
||||
},
|
||||
|
|
@ -244,18 +113,17 @@ func (c *RPCClient) Call(
|
|||
}
|
||||
|
||||
// 7. Ждём ответ с таймаутом
|
||||
ctx, cancel := context.WithTimeout(ctx, c.timeout)
|
||||
timeoutCtx, cancel := context.WithTimeout(ctx, c.timeout)
|
||||
defer cancel()
|
||||
|
||||
for {
|
||||
select {
|
||||
case msg := <-msgs:
|
||||
if msg.CorrelationId == fmt.Sprintf("%d", corrID) {
|
||||
if msg.CorrelationId == corrID {
|
||||
return json.Unmarshal(msg.Body, replyPayload)
|
||||
}
|
||||
// игнорируем другие сообщения (маловероятно, но возможно)
|
||||
case <-ctx.Done():
|
||||
return ctx.Err() // timeout or cancelled
|
||||
case <-timeoutCtx.Done():
|
||||
return fmt.Errorf("rpc timeout: %w", timeoutCtx.Err())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -13,4 +13,4 @@ import type { ClientOptions as ClientOptions2 } from './types.gen';
|
|||
*/
|
||||
export type CreateClientConfig<T extends ClientOptions = ClientOptions2> = (override?: Config<ClientOptions & T>) => Config<Required<ClientOptions> & T>;
|
||||
|
||||
export const client = createClient(createConfig<ClientOptions2>({ baseUrl: 'http://10.1.0.65:8081/api/v1' }));
|
||||
export const client = createClient(createConfig<ClientOptions2>({ baseUrl: '/api/v1' }));
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
|
||||
import type { Client, Options as Options2, TDataShape } from './client';
|
||||
import { client } from './client.gen';
|
||||
import type { AddUserTitleData, AddUserTitleErrors, AddUserTitleResponses, DeleteUserTitleData, DeleteUserTitleErrors, DeleteUserTitleResponses, GetTitleData, GetTitleErrors, GetTitleResponses, GetTitlesData, GetTitlesErrors, GetTitlesResponses, GetUsersIdData, GetUsersIdErrors, GetUsersIdResponses, GetUserTitleData, GetUserTitleErrors, GetUserTitleResponses, GetUserTitlesData, GetUserTitlesErrors, GetUserTitlesResponses, UpdateUserData, UpdateUserErrors, UpdateUserResponses, UpdateUserTitleData, UpdateUserTitleErrors, UpdateUserTitleResponses } from './types.gen';
|
||||
import type { AddUserTitleData, AddUserTitleErrors, AddUserTitleResponses, DeleteUserTitleData, DeleteUserTitleErrors, DeleteUserTitleResponses, GetTitleData, GetTitleErrors, GetTitleResponses, GetTitlesData, GetTitlesErrors, GetTitlesResponses, GetUsersData, GetUsersErrors, GetUsersIdData, GetUsersIdErrors, GetUsersIdResponses, GetUsersResponses, GetUserTitleData, GetUserTitleErrors, GetUserTitleResponses, GetUserTitlesData, GetUserTitlesErrors, GetUserTitlesResponses, UpdateUserData, UpdateUserErrors, UpdateUserResponses, UpdateUserTitleData, UpdateUserTitleErrors, UpdateUserTitleResponses } from './types.gen';
|
||||
|
||||
export type Options<TData extends TDataShape = TDataShape, ThrowOnError extends boolean = boolean> = Options2<TData, ThrowOnError> & {
|
||||
/**
|
||||
|
|
@ -32,6 +32,11 @@ export const getTitles = <ThrowOnError extends boolean = false>(options?: Option
|
|||
*/
|
||||
export const getTitle = <ThrowOnError extends boolean = false>(options: Options<GetTitleData, ThrowOnError>) => (options.client ?? client).get<GetTitleResponses, GetTitleErrors, ThrowOnError>({ url: '/titles/{title_id}', ...options });
|
||||
|
||||
/**
|
||||
* Search user by nickname or dispname (both in one param), response is always sorted by id
|
||||
*/
|
||||
export const getUsers = <ThrowOnError extends boolean = false>(options?: Options<GetUsersData, ThrowOnError>) => (options?.client ?? client).get<GetUsersResponses, GetUsersErrors, ThrowOnError>({ url: '/users/', ...options });
|
||||
|
||||
/**
|
||||
* Get user info
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -60,6 +60,12 @@ export type Title = {
|
|||
title_names: {
|
||||
[key: string]: Array<string>;
|
||||
};
|
||||
/**
|
||||
* Localized description. Key = language (ISO 639-1), value = description.
|
||||
*/
|
||||
title_desc?: {
|
||||
[key: string]: string;
|
||||
};
|
||||
studio?: Studio;
|
||||
tags: Tags;
|
||||
poster?: Image;
|
||||
|
|
@ -231,6 +237,50 @@ export type GetTitleResponses = {
|
|||
|
||||
export type GetTitleResponse = GetTitleResponses[keyof GetTitleResponses];
|
||||
|
||||
export type GetUsersData = {
|
||||
body?: never;
|
||||
path?: never;
|
||||
query?: {
|
||||
word?: string;
|
||||
limit?: number;
|
||||
/**
|
||||
* pass cursor naked
|
||||
*/
|
||||
cursor_id?: number;
|
||||
};
|
||||
url: '/users/';
|
||||
};
|
||||
|
||||
export type GetUsersErrors = {
|
||||
/**
|
||||
* Request params are not correct
|
||||
*/
|
||||
400: unknown;
|
||||
/**
|
||||
* Unknown server error
|
||||
*/
|
||||
500: unknown;
|
||||
};
|
||||
|
||||
export type GetUsersResponses = {
|
||||
/**
|
||||
* List of users with cursor
|
||||
*/
|
||||
200: {
|
||||
/**
|
||||
* List of users
|
||||
*/
|
||||
data: Array<User>;
|
||||
cursor: number;
|
||||
};
|
||||
/**
|
||||
* No users found
|
||||
*/
|
||||
204: void;
|
||||
};
|
||||
|
||||
export type GetUsersResponse = GetUsersResponses[keyof GetUsersResponses];
|
||||
|
||||
export type GetUsersIdData = {
|
||||
body?: never;
|
||||
path: {
|
||||
|
|
|
|||
|
|
@ -127,7 +127,16 @@ const handleLoadMore = async () => {
|
|||
</div>
|
||||
<TitlesFilterPanel filters={filters} setFilters={setFilters} />
|
||||
|
||||
{loading && <div className="mt-20 font-medium text-black">Loading...</div>}
|
||||
{loading && (
|
||||
<div className="mt-20 flex flex-col items-center justify-center space-y-4 font-medium text-black">
|
||||
<span>Loading...</span>
|
||||
<img
|
||||
src="https://images.steamusercontent.com/ugc/920301026407341369/69CBEF69DED504CD8CC7838D370061089F4D81BD/"
|
||||
alt="Loading animation"
|
||||
className="size-100 object-contain"
|
||||
/>
|
||||
</div>
|
||||
)}
|
||||
|
||||
{!loading && titles.length === 0 && (
|
||||
<div className="mt-20 font-medium text-black">No titles found.</div>
|
||||
|
|
|
|||
0
modules/frontend/src/pages/UsersPage/UsersPage.tsx
Normal file
0
modules/frontend/src/pages/UsersPage/UsersPage.tsx
Normal file
Loading…
Add table
Add a link
Reference in a new issue