129 lines
2.7 KiB
Go
129 lines
2.7 KiB
Go
package rmq
|
||
|
||
import (
|
||
"context"
|
||
"encoding/json"
|
||
"fmt"
|
||
"time"
|
||
|
||
oapi "nyanimedb/api"
|
||
|
||
amqp "github.com/rabbitmq/amqp091-go"
|
||
)
|
||
|
||
const RPCQueueName = "anime_import_rpc"
|
||
|
||
// RabbitRequest не меняем
|
||
type RabbitRequest struct {
|
||
Name string `json:"name"`
|
||
Statuses []oapi.TitleStatus `json:"statuses,omitempty"`
|
||
Rating float64 `json:"rating,omitempty"`
|
||
Year int32 `json:"year,omitempty"`
|
||
Season oapi.ReleaseSeason `json:"season,omitempty"`
|
||
Timestamp time.Time `json:"timestamp"`
|
||
}
|
||
|
||
type RPCClient struct {
|
||
conn *amqp.Connection
|
||
timeout time.Duration
|
||
}
|
||
|
||
func NewRPCClient(conn *amqp.Connection, timeout time.Duration) *RPCClient {
|
||
return &RPCClient{conn: conn, timeout: timeout}
|
||
}
|
||
|
||
func (c *RPCClient) Call(
|
||
ctx context.Context,
|
||
request RabbitRequest,
|
||
replyPayload any,
|
||
) error {
|
||
|
||
// 1. Канал для запроса и ответа
|
||
ch, err := c.conn.Channel()
|
||
if err != nil {
|
||
return fmt.Errorf("channel: %w", err)
|
||
}
|
||
defer ch.Close()
|
||
|
||
// 2. Декларируем фиксированную очередь RPC (идемпотентно)
|
||
_, err = ch.QueueDeclare(
|
||
RPCQueueName,
|
||
true, // durable
|
||
false, // auto-delete
|
||
false, // exclusive
|
||
false, // no-wait
|
||
nil,
|
||
)
|
||
if err != nil {
|
||
return fmt.Errorf("declare rpc queue: %w", err)
|
||
}
|
||
|
||
// 3. Создаём временную очередь ДЛЯ ОТВЕТА
|
||
replyQueue, err := ch.QueueDeclare(
|
||
"",
|
||
false,
|
||
true,
|
||
true,
|
||
false,
|
||
nil,
|
||
)
|
||
if err != nil {
|
||
return fmt.Errorf("declare reply queue: %w", err)
|
||
}
|
||
|
||
// 4. Подписываемся на очередь ответов
|
||
msgs, err := ch.Consume(
|
||
replyQueue.Name,
|
||
"",
|
||
true, // auto-ack
|
||
true, // exclusive
|
||
false,
|
||
false,
|
||
nil,
|
||
)
|
||
if err != nil {
|
||
return fmt.Errorf("consume reply: %w", err)
|
||
}
|
||
|
||
// correlation ID
|
||
corrID := fmt.Sprintf("%d", time.Now().UnixNano())
|
||
|
||
// 5. сериализация запроса
|
||
body, err := json.Marshal(request)
|
||
if err != nil {
|
||
return fmt.Errorf("marshal request: %w", err)
|
||
}
|
||
|
||
// 6. Публикация RPC-запроса
|
||
err = ch.Publish(
|
||
"",
|
||
RPCQueueName, // ← фиксированная очередь!
|
||
false,
|
||
false,
|
||
amqp.Publishing{
|
||
ContentType: "application/json",
|
||
CorrelationId: corrID,
|
||
ReplyTo: replyQueue.Name,
|
||
Timestamp: time.Now(),
|
||
Body: body,
|
||
},
|
||
)
|
||
if err != nil {
|
||
return fmt.Errorf("publish: %w", err)
|
||
}
|
||
|
||
// 7. Ждём ответ с таймаутом
|
||
timeoutCtx, cancel := context.WithTimeout(ctx, c.timeout)
|
||
defer cancel()
|
||
|
||
for {
|
||
select {
|
||
case msg := <-msgs:
|
||
if msg.CorrelationId == corrID {
|
||
return json.Unmarshal(msg.Body, replyPayload)
|
||
}
|
||
case <-timeoutCtx.Done():
|
||
return fmt.Errorf("rpc timeout: %w", timeoutCtx.Err())
|
||
}
|
||
}
|
||
}
|