261 lines
6.6 KiB
Go
261 lines
6.6 KiB
Go
package rmq
|
||
|
||
import (
|
||
"context"
|
||
"encoding/json"
|
||
"fmt"
|
||
oapi "nyanimedb/api"
|
||
"sync"
|
||
"time"
|
||
|
||
amqp "github.com/rabbitmq/amqp091-go"
|
||
)
|
||
|
||
type RabbitRequest struct {
|
||
Name string `json:"name"`
|
||
Statuses []oapi.TitleStatus `json:"statuses,omitempty"`
|
||
Rating float64 `json:"rating,omitempty"`
|
||
Year int32 `json:"year,omitempty"`
|
||
Season oapi.ReleaseSeason `json:"season,omitempty"`
|
||
Timestamp time.Time `json:"timestamp"`
|
||
}
|
||
|
||
// Publisher — потокобезопасный публикатор с пулом каналов.
|
||
type Publisher struct {
|
||
conn *amqp.Connection
|
||
pool *sync.Pool
|
||
}
|
||
|
||
// NewPublisher создаёт новый Publisher.
|
||
// conn должен быть уже установленным и healthy.
|
||
// Рекомендуется передавать durable connection с reconnect-логикой.
|
||
func NewPublisher(conn *amqp.Connection) *Publisher {
|
||
return &Publisher{
|
||
conn: conn,
|
||
pool: &sync.Pool{
|
||
New: func() any {
|
||
ch, err := conn.Channel()
|
||
if err != nil {
|
||
// Паника уместна: невозможность открыть канал — критическая ошибка инициализации
|
||
panic(fmt.Errorf("rmqpool: failed to create channel: %w", err))
|
||
}
|
||
return ch
|
||
},
|
||
},
|
||
}
|
||
}
|
||
|
||
// Publish публикует сообщение в указанную очередь.
|
||
// Очередь объявляется как durable (если не существует).
|
||
// Поддерживает context для отмены/таймаута.
|
||
func (p *Publisher) Publish(
|
||
ctx context.Context,
|
||
queueName string,
|
||
payload RabbitRequest,
|
||
opts ...PublishOption,
|
||
) error {
|
||
// Применяем опции
|
||
options := &publishOptions{
|
||
contentType: "application/json",
|
||
deliveryMode: amqp.Persistent,
|
||
timestamp: time.Now(),
|
||
}
|
||
for _, opt := range opts {
|
||
opt(options)
|
||
}
|
||
|
||
// Сериализуем payload
|
||
body, err := json.Marshal(payload)
|
||
if err != nil {
|
||
return fmt.Errorf("rmqpool: failed to marshal payload: %w", err)
|
||
}
|
||
|
||
// Берём канал из пула
|
||
ch := p.getChannel()
|
||
if ch == nil {
|
||
return fmt.Errorf("rmqpool: channel is nil (connection may be closed)")
|
||
}
|
||
defer p.returnChannel(ch)
|
||
|
||
// Объявляем очередь (idempotent)
|
||
q, err := ch.QueueDeclare(
|
||
queueName,
|
||
true, // durable
|
||
false, // auto-delete
|
||
false, // exclusive
|
||
false, // no-wait
|
||
nil, // args
|
||
)
|
||
if err != nil {
|
||
return fmt.Errorf("rmqpool: failed to declare queue %q: %w", queueName, err)
|
||
}
|
||
|
||
// Подготавливаем сообщение
|
||
msg := amqp.Publishing{
|
||
DeliveryMode: options.deliveryMode,
|
||
ContentType: options.contentType,
|
||
Timestamp: options.timestamp,
|
||
Body: body,
|
||
}
|
||
|
||
// Публикуем с учётом контекста
|
||
done := make(chan error, 1)
|
||
go func() {
|
||
err := ch.Publish(
|
||
"", // exchange (default)
|
||
q.Name, // routing key
|
||
false, // mandatory
|
||
false, // immediate
|
||
msg,
|
||
)
|
||
done <- err
|
||
}()
|
||
|
||
select {
|
||
case err := <-done:
|
||
return err
|
||
case <-ctx.Done():
|
||
return ctx.Err()
|
||
}
|
||
}
|
||
|
||
func (p *Publisher) getChannel() *amqp.Channel {
|
||
raw := p.pool.Get()
|
||
if raw == nil {
|
||
ch, _ := p.conn.Channel()
|
||
return ch
|
||
}
|
||
ch := raw.(*amqp.Channel)
|
||
if ch.IsClosed() { // ← теперь есть!
|
||
ch.Close() // освободить ресурсы
|
||
ch, _ = p.conn.Channel()
|
||
}
|
||
return ch
|
||
}
|
||
|
||
// returnChannel возвращает канал в пул, если он жив.
|
||
func (p *Publisher) returnChannel(ch *amqp.Channel) {
|
||
if ch != nil && !ch.IsClosed() {
|
||
p.pool.Put(ch)
|
||
}
|
||
}
|
||
|
||
// PublishOption позволяет кастомизировать публикацию.
|
||
type PublishOption func(*publishOptions)
|
||
|
||
type publishOptions struct {
|
||
contentType string
|
||
deliveryMode uint8
|
||
timestamp time.Time
|
||
}
|
||
|
||
// WithContentType устанавливает Content-Type (по умолчанию "application/json").
|
||
func WithContentType(ct string) PublishOption {
|
||
return func(o *publishOptions) { o.contentType = ct }
|
||
}
|
||
|
||
// WithTransient делает сообщение transient (не сохраняется на диск).
|
||
// По умолчанию — Persistent.
|
||
func WithTransient() PublishOption {
|
||
return func(o *publishOptions) { o.deliveryMode = amqp.Transient }
|
||
}
|
||
|
||
// WithTimestamp устанавливает кастомную метку времени.
|
||
func WithTimestamp(ts time.Time) PublishOption {
|
||
return func(o *publishOptions) { o.timestamp = ts }
|
||
}
|
||
|
||
type RPCClient struct {
|
||
conn *amqp.Connection
|
||
timeout time.Duration
|
||
}
|
||
|
||
func NewRPCClient(conn *amqp.Connection, timeout time.Duration) *RPCClient {
|
||
return &RPCClient{conn: conn, timeout: timeout}
|
||
}
|
||
|
||
// Call отправляет запрос в очередь и ждёт ответа.
|
||
// replyPayload — указатель на структуру, в которую раскодировать ответ (например, &MediaResponse{}).
|
||
func (c *RPCClient) Call(
|
||
ctx context.Context,
|
||
requestQueue string,
|
||
request RabbitRequest,
|
||
replyPayload any,
|
||
) error {
|
||
// 1. Создаём временный канал (не из пула!)
|
||
ch, err := c.conn.Channel()
|
||
if err != nil {
|
||
return fmt.Errorf("channel: %w", err)
|
||
}
|
||
defer ch.Close()
|
||
|
||
// 2. Создаём временную очередь для ответов
|
||
q, err := ch.QueueDeclare(
|
||
"", // auto name
|
||
false, // not durable
|
||
true, // exclusive
|
||
true, // auto-delete
|
||
false,
|
||
nil,
|
||
)
|
||
if err != nil {
|
||
return fmt.Errorf("reply queue: %w", err)
|
||
}
|
||
|
||
// 3. Подписываемся на ответы
|
||
msgs, err := ch.Consume(
|
||
q.Name,
|
||
"",
|
||
true, // auto-ack
|
||
true, // exclusive
|
||
false,
|
||
false,
|
||
nil,
|
||
)
|
||
if err != nil {
|
||
return fmt.Errorf("consume: %w", err)
|
||
}
|
||
|
||
// 4. Готовим correlation ID
|
||
corrID := time.Now().UnixNano()
|
||
|
||
// 5. Сериализуем запрос
|
||
body, err := json.Marshal(request)
|
||
if err != nil {
|
||
return fmt.Errorf("marshal request: %w", err)
|
||
}
|
||
|
||
// 6. Публикуем запрос
|
||
err = ch.Publish(
|
||
"",
|
||
requestQueue,
|
||
false,
|
||
false,
|
||
amqp.Publishing{
|
||
ContentType: "application/json",
|
||
CorrelationId: fmt.Sprintf("%d", corrID),
|
||
ReplyTo: q.Name,
|
||
Timestamp: time.Now(),
|
||
Body: body,
|
||
},
|
||
)
|
||
if err != nil {
|
||
return fmt.Errorf("publish: %w", err)
|
||
}
|
||
|
||
// 7. Ждём ответ с таймаутом
|
||
ctx, cancel := context.WithTimeout(ctx, c.timeout)
|
||
defer cancel()
|
||
|
||
for {
|
||
select {
|
||
case msg := <-msgs:
|
||
if msg.CorrelationId == fmt.Sprintf("%d", corrID) {
|
||
return json.Unmarshal(msg.Body, replyPayload)
|
||
}
|
||
// игнорируем другие сообщения (маловероятно, но возможно)
|
||
case <-ctx.Done():
|
||
return ctx.Err() // timeout or cancelled
|
||
}
|
||
}
|
||
}
|