nyanimedb/modules/backend/rmq/rabbit.go
Iron_Felix ab29c33f5b
All checks were successful
Build and Deploy Go App / build (push) Successful in 6m20s
Build and Deploy Go App / deploy (push) Successful in 1m1s
feat: now back wait for RMQ answer
2025-11-30 04:02:28 +03:00

261 lines
6.6 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package rmq
import (
"context"
"encoding/json"
"fmt"
oapi "nyanimedb/api"
"sync"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
type RabbitRequest struct {
Name string `json:"name"`
Statuses []oapi.TitleStatus `json:"statuses,omitempty"`
Rating float64 `json:"rating,omitempty"`
Year int32 `json:"year,omitempty"`
Season oapi.ReleaseSeason `json:"season,omitempty"`
Timestamp time.Time `json:"timestamp"`
}
// Publisher — потокобезопасный публикатор с пулом каналов.
type Publisher struct {
conn *amqp.Connection
pool *sync.Pool
}
// NewPublisher создаёт новый Publisher.
// conn должен быть уже установленным и healthy.
// Рекомендуется передавать durable connection с reconnect-логикой.
func NewPublisher(conn *amqp.Connection) *Publisher {
return &Publisher{
conn: conn,
pool: &sync.Pool{
New: func() any {
ch, err := conn.Channel()
if err != nil {
// Паника уместна: невозможность открыть канал — критическая ошибка инициализации
panic(fmt.Errorf("rmqpool: failed to create channel: %w", err))
}
return ch
},
},
}
}
// Publish публикует сообщение в указанную очередь.
// Очередь объявляется как durable (если не существует).
// Поддерживает context для отмены/таймаута.
func (p *Publisher) Publish(
ctx context.Context,
queueName string,
payload RabbitRequest,
opts ...PublishOption,
) error {
// Применяем опции
options := &publishOptions{
contentType: "application/json",
deliveryMode: amqp.Persistent,
timestamp: time.Now(),
}
for _, opt := range opts {
opt(options)
}
// Сериализуем payload
body, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("rmqpool: failed to marshal payload: %w", err)
}
// Берём канал из пула
ch := p.getChannel()
if ch == nil {
return fmt.Errorf("rmqpool: channel is nil (connection may be closed)")
}
defer p.returnChannel(ch)
// Объявляем очередь (idempotent)
q, err := ch.QueueDeclare(
queueName,
true, // durable
false, // auto-delete
false, // exclusive
false, // no-wait
nil, // args
)
if err != nil {
return fmt.Errorf("rmqpool: failed to declare queue %q: %w", queueName, err)
}
// Подготавливаем сообщение
msg := amqp.Publishing{
DeliveryMode: options.deliveryMode,
ContentType: options.contentType,
Timestamp: options.timestamp,
Body: body,
}
// Публикуем с учётом контекста
done := make(chan error, 1)
go func() {
err := ch.Publish(
"", // exchange (default)
q.Name, // routing key
false, // mandatory
false, // immediate
msg,
)
done <- err
}()
select {
case err := <-done:
return err
case <-ctx.Done():
return ctx.Err()
}
}
func (p *Publisher) getChannel() *amqp.Channel {
raw := p.pool.Get()
if raw == nil {
ch, _ := p.conn.Channel()
return ch
}
ch := raw.(*amqp.Channel)
if ch.IsClosed() { // ← теперь есть!
ch.Close() // освободить ресурсы
ch, _ = p.conn.Channel()
}
return ch
}
// returnChannel возвращает канал в пул, если он жив.
func (p *Publisher) returnChannel(ch *amqp.Channel) {
if ch != nil && !ch.IsClosed() {
p.pool.Put(ch)
}
}
// PublishOption позволяет кастомизировать публикацию.
type PublishOption func(*publishOptions)
type publishOptions struct {
contentType string
deliveryMode uint8
timestamp time.Time
}
// WithContentType устанавливает Content-Type (по умолчанию "application/json").
func WithContentType(ct string) PublishOption {
return func(o *publishOptions) { o.contentType = ct }
}
// WithTransient делает сообщение transient (не сохраняется на диск).
// По умолчанию — Persistent.
func WithTransient() PublishOption {
return func(o *publishOptions) { o.deliveryMode = amqp.Transient }
}
// WithTimestamp устанавливает кастомную метку времени.
func WithTimestamp(ts time.Time) PublishOption {
return func(o *publishOptions) { o.timestamp = ts }
}
type RPCClient struct {
conn *amqp.Connection
timeout time.Duration
}
func NewRPCClient(conn *amqp.Connection, timeout time.Duration) *RPCClient {
return &RPCClient{conn: conn, timeout: timeout}
}
// Call отправляет запрос в очередь и ждёт ответа.
// replyPayload — указатель на структуру, в которую раскодировать ответ (например, &MediaResponse{}).
func (c *RPCClient) Call(
ctx context.Context,
requestQueue string,
request RabbitRequest,
replyPayload any,
) error {
// 1. Создаём временный канал (не из пула!)
ch, err := c.conn.Channel()
if err != nil {
return fmt.Errorf("channel: %w", err)
}
defer ch.Close()
// 2. Создаём временную очередь для ответов
q, err := ch.QueueDeclare(
"", // auto name
false, // not durable
true, // exclusive
true, // auto-delete
false,
nil,
)
if err != nil {
return fmt.Errorf("reply queue: %w", err)
}
// 3. Подписываемся на ответы
msgs, err := ch.Consume(
q.Name,
"",
true, // auto-ack
true, // exclusive
false,
false,
nil,
)
if err != nil {
return fmt.Errorf("consume: %w", err)
}
// 4. Готовим correlation ID
corrID := time.Now().UnixNano()
// 5. Сериализуем запрос
body, err := json.Marshal(request)
if err != nil {
return fmt.Errorf("marshal request: %w", err)
}
// 6. Публикуем запрос
err = ch.Publish(
"",
requestQueue,
false,
false,
amqp.Publishing{
ContentType: "application/json",
CorrelationId: fmt.Sprintf("%d", corrID),
ReplyTo: q.Name,
Timestamp: time.Now(),
Body: body,
},
)
if err != nil {
return fmt.Errorf("publish: %w", err)
}
// 7. Ждём ответ с таймаутом
ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()
for {
select {
case msg := <-msgs:
if msg.CorrelationId == fmt.Sprintf("%d", corrID) {
return json.Unmarshal(msg.Body, replyPayload)
}
// игнорируем другие сообщения (маловероятно, но возможно)
case <-ctx.Done():
return ctx.Err() // timeout or cancelled
}
}
}