From ab29c33f5b522b68045b174cf06ddf33942468f1 Mon Sep 17 00:00:00 2001 From: Iron_Felix Date: Sun, 30 Nov 2025 04:02:28 +0300 Subject: [PATCH] feat: now back wait for RMQ answer --- modules/backend/handlers/common.go | 6 +- modules/backend/handlers/titles.go | 62 ++++++++++++------- modules/backend/main.go | 6 +- modules/backend/rmq/rabbit.go | 99 +++++++++++++++++++++++++++++- 4 files changed, 143 insertions(+), 30 deletions(-) diff --git a/modules/backend/handlers/common.go b/modules/backend/handlers/common.go index aece414..cad4f0f 100644 --- a/modules/backend/handlers/common.go +++ b/modules/backend/handlers/common.go @@ -19,13 +19,15 @@ func New(publisher *rmq.Publisher) *Handler { type Server struct { db *sqlc.Queries - publisher *rmq.Publisher // ← добавьте это поле + publisher *rmq.Publisher + RPCclient *rmq.RPCClient } -func NewServer(db *sqlc.Queries, publisher *rmq.Publisher) *Server { +func NewServer(db *sqlc.Queries, publisher *rmq.Publisher, rpcclient *rmq.RPCClient) *Server { return &Server{ db: db, publisher: publisher, + RPCclient: rpcclient, } } diff --git a/modules/backend/handlers/titles.go b/modules/backend/handlers/titles.go index 9f11016..300cc87 100644 --- a/modules/backend/handlers/titles.go +++ b/modules/backend/handlers/titles.go @@ -157,43 +157,61 @@ func (s Server) GetTitle(ctx context.Context, request oapi.GetTitleRequestObject func (s Server) GetTitles(ctx context.Context, request oapi.GetTitlesRequestObject) (oapi.GetTitlesResponseObject, error) { - if request.Params.ExtSearch != nil && *request.Params.ExtSearch { - // Публикуем событие — как и просили - event := rmq.RabbitRequest{ - Name: "Attack on titans", - // Status oapi.TitleStatus `json:"titlestatus,omitempty"` - // Rating float64 `json:"titleraring,omitempty"` - // Year int32 `json:"year,omitempty"` - // Season oapi.ReleaseSeason `json:"season,omitempty"` - Timestamp: time.Now(), - } - - // Контекст с таймаутом (не блокируем ответ) - publishCtx, cancel := context.WithTimeout(ctx, 30*time.Second) - defer cancel() - - if err := s.publisher.Publish(publishCtx, "events.user", event); err != nil { - log.Errorf("RMQ publish failed (non-critical): %v", err) - } else { - log.Infof("RMQ publish succeed %v", err) - } + opai_titles := make([]oapi.Title, 0) + mqreq := rmq.RabbitRequest{ + Timestamp: time.Now(), } - opai_titles := make([]oapi.Title, 0) - word := Word2Sqlc(request.Params.Word) + if word != nil { + mqreq.Name = *word + } season, err := ReleaseSeason2sqlc(request.Params.ReleaseSeason) if err != nil { log.Errorf("%v", err) return oapi.GetTitles400Response{}, err } + if season != nil { + mqreq.Season = *request.Params.ReleaseSeason + } title_statuses, err := TitleStatus2Sqlc(request.Params.Status) if err != nil { log.Errorf("%v", err) return oapi.GetTitles400Response{}, err } + if title_statuses != nil { + mqreq.Statuses = *request.Params.Status + } + + if request.Params.ExtSearch != nil && *request.Params.ExtSearch { + + // Структура для ответа (должна совпадать с тем, что шлёт микросервис) + var reply struct { + Status string `json:"status"` + Result string `json:"result"` + Preview string `json:"preview_url"` + } + + // Делаем RPC-вызов — и ЖДЁМ ответа + err := s.RPCclient.Call( + ctx, + "svc.media.process.requests", // ← очередь микросервиса + mqreq, + &reply, + ) + if err != nil { + log.Errorf("RabitMQ: %v", err) + // return oapi.GetTitles500Response{}, err + } + // // Возвращаем результат + // return oapi.ProcessMedia200JSONResponse{ + // Status: reply.Status, + // Result: reply.Result, + // Preview: reply.Preview, + // }, nil + } params := sqlc.SearchTitlesParams{ Word: word, diff --git a/modules/backend/main.go b/modules/backend/main.go index 13be887..9f992a5 100644 --- a/modules/backend/main.go +++ b/modules/backend/main.go @@ -60,8 +60,9 @@ func main() { defer rmqConn.Close() publisher := rmq.NewPublisher(rmqConn) + rpcClient := rmq.NewRPCClient(rmqConn, 30*time.Second) - server := handlers.NewServer(queries, publisher) + server := handlers.NewServer(queries, publisher, rpcClient) // r.LoadHTMLGlob("templates/*") r.Use(cors.New(cors.Config{ @@ -79,9 +80,6 @@ func main() { []oapi.StrictMiddlewareFunc{}, )) - // Внедряем publisher в сервер - server = handlers.NewServer(queries, publisher) - // Запуск log.Infof("Server starting on :8080") if err := r.Run(":8080"); err != nil && err != http.ErrServerClosed { diff --git a/modules/backend/rmq/rabbit.go b/modules/backend/rmq/rabbit.go index 85df89b..52c1979 100644 --- a/modules/backend/rmq/rabbit.go +++ b/modules/backend/rmq/rabbit.go @@ -13,8 +13,8 @@ import ( type RabbitRequest struct { Name string `json:"name"` - Status oapi.TitleStatus `json:"titlestatus,omitempty"` - Rating float64 `json:"titleraring,omitempty"` + Statuses []oapi.TitleStatus `json:"statuses,omitempty"` + Rating float64 `json:"rating,omitempty"` Year int32 `json:"year,omitempty"` Season oapi.ReleaseSeason `json:"season,omitempty"` Timestamp time.Time `json:"timestamp"` @@ -164,3 +164,98 @@ func WithTransient() PublishOption { func WithTimestamp(ts time.Time) PublishOption { return func(o *publishOptions) { o.timestamp = ts } } + +type RPCClient struct { + conn *amqp.Connection + timeout time.Duration +} + +func NewRPCClient(conn *amqp.Connection, timeout time.Duration) *RPCClient { + return &RPCClient{conn: conn, timeout: timeout} +} + +// Call отправляет запрос в очередь и ждёт ответа. +// replyPayload — указатель на структуру, в которую раскодировать ответ (например, &MediaResponse{}). +func (c *RPCClient) Call( + ctx context.Context, + requestQueue string, + request RabbitRequest, + replyPayload any, +) error { + // 1. Создаём временный канал (не из пула!) + ch, err := c.conn.Channel() + if err != nil { + return fmt.Errorf("channel: %w", err) + } + defer ch.Close() + + // 2. Создаём временную очередь для ответов + q, err := ch.QueueDeclare( + "", // auto name + false, // not durable + true, // exclusive + true, // auto-delete + false, + nil, + ) + if err != nil { + return fmt.Errorf("reply queue: %w", err) + } + + // 3. Подписываемся на ответы + msgs, err := ch.Consume( + q.Name, + "", + true, // auto-ack + true, // exclusive + false, + false, + nil, + ) + if err != nil { + return fmt.Errorf("consume: %w", err) + } + + // 4. Готовим correlation ID + corrID := time.Now().UnixNano() + + // 5. Сериализуем запрос + body, err := json.Marshal(request) + if err != nil { + return fmt.Errorf("marshal request: %w", err) + } + + // 6. Публикуем запрос + err = ch.Publish( + "", + requestQueue, + false, + false, + amqp.Publishing{ + ContentType: "application/json", + CorrelationId: fmt.Sprintf("%d", corrID), + ReplyTo: q.Name, + Timestamp: time.Now(), + Body: body, + }, + ) + if err != nil { + return fmt.Errorf("publish: %w", err) + } + + // 7. Ждём ответ с таймаутом + ctx, cancel := context.WithTimeout(ctx, c.timeout) + defer cancel() + + for { + select { + case msg := <-msgs: + if msg.CorrelationId == fmt.Sprintf("%d", corrID) { + return json.Unmarshal(msg.Body, replyPayload) + } + // игнорируем другие сообщения (маловероятно, но возможно) + case <-ctx.Done(): + return ctx.Err() // timeout or cancelled + } + } +}