nyanimedb/modules/backend/rmq/rabbit.go
Iron_Felix 62e0633e69
All checks were successful
Build and Deploy Go App / build (push) Successful in 6m46s
Build and Deploy Go App / deploy (push) Successful in 53s
fix: rmq
2025-12-05 21:20:51 +03:00

129 lines
2.7 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package rmq
import (
"context"
"encoding/json"
"fmt"
"time"
oapi "nyanimedb/api"
amqp "github.com/rabbitmq/amqp091-go"
)
const RPCQueueName = "anime_import_rpc"
// RabbitRequest не меняем
type RabbitRequest struct {
Name string `json:"name"`
Statuses []oapi.TitleStatus `json:"statuses,omitempty"`
Rating float64 `json:"rating,omitempty"`
Year int32 `json:"year,omitempty"`
Season oapi.ReleaseSeason `json:"season,omitempty"`
Timestamp time.Time `json:"timestamp"`
}
type RPCClient struct {
conn *amqp.Connection
timeout time.Duration
}
func NewRPCClient(conn *amqp.Connection, timeout time.Duration) *RPCClient {
return &RPCClient{conn: conn, timeout: timeout}
}
func (c *RPCClient) Call(
ctx context.Context,
request RabbitRequest,
replyPayload any,
) error {
// 1. Канал для запроса и ответа
ch, err := c.conn.Channel()
if err != nil {
return fmt.Errorf("channel: %w", err)
}
defer ch.Close()
// 2. Декларируем фиксированную очередь RPC (идемпотентно)
_, err = ch.QueueDeclare(
RPCQueueName,
true, // durable
false, // auto-delete
false, // exclusive
false, // no-wait
nil,
)
if err != nil {
return fmt.Errorf("declare rpc queue: %w", err)
}
// 3. Создаём временную очередь ДЛЯ ОТВЕТА
replyQueue, err := ch.QueueDeclare(
"",
false,
true,
true,
false,
nil,
)
if err != nil {
return fmt.Errorf("declare reply queue: %w", err)
}
// 4. Подписываемся на очередь ответов
msgs, err := ch.Consume(
replyQueue.Name,
"",
true, // auto-ack
true, // exclusive
false,
false,
nil,
)
if err != nil {
return fmt.Errorf("consume reply: %w", err)
}
// correlation ID
corrID := fmt.Sprintf("%d", time.Now().UnixNano())
// 5. сериализация запроса
body, err := json.Marshal(request)
if err != nil {
return fmt.Errorf("marshal request: %w", err)
}
// 6. Публикация RPC-запроса
err = ch.Publish(
"",
RPCQueueName, // ← фиксированная очередь!
false,
false,
amqp.Publishing{
ContentType: "application/json",
CorrelationId: corrID,
ReplyTo: replyQueue.Name,
Timestamp: time.Now(),
Body: body,
},
)
if err != nil {
return fmt.Errorf("publish: %w", err)
}
// 7. Ждём ответ с таймаутом
timeoutCtx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()
for {
select {
case msg := <-msgs:
if msg.CorrelationId == corrID {
return json.Unmarshal(msg.Body, replyPayload)
}
case <-timeoutCtx.Done():
return fmt.Errorf("rpc timeout: %w", timeoutCtx.Err())
}
}
}