nyanimedb/modules/backend/rmq/rabbit.go

166 lines
4.4 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package rmq
import (
"context"
"encoding/json"
"fmt"
oapi "nyanimedb/api"
"sync"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
type RabbitRequest struct {
Name string `json:"name"`
Status oapi.TitleStatus `json:"titlestatus,omitempty"`
Rating float64 `json:"titleraring,omitempty"`
Year int32 `json:"year,omitempty"`
Season oapi.ReleaseSeason `json:"season,omitempty"`
Timestamp time.Time `json:"timestamp"`
}
// Publisher — потокобезопасный публикатор с пулом каналов.
type Publisher struct {
conn *amqp.Connection
pool *sync.Pool
}
// NewPublisher создаёт новый Publisher.
// conn должен быть уже установленным и healthy.
// Рекомендуется передавать durable connection с reconnect-логикой.
func NewPublisher(conn *amqp.Connection) *Publisher {
return &Publisher{
conn: conn,
pool: &sync.Pool{
New: func() any {
ch, err := conn.Channel()
if err != nil {
// Паника уместна: невозможность открыть канал — критическая ошибка инициализации
panic(fmt.Errorf("rmqpool: failed to create channel: %w", err))
}
return ch
},
},
}
}
// Publish публикует сообщение в указанную очередь.
// Очередь объявляется как durable (если не существует).
// Поддерживает context для отмены/таймаута.
func (p *Publisher) Publish(
ctx context.Context,
queueName string,
payload RabbitRequest,
opts ...PublishOption,
) error {
// Применяем опции
options := &publishOptions{
contentType: "application/json",
deliveryMode: amqp.Persistent,
timestamp: time.Now(),
}
for _, opt := range opts {
opt(options)
}
// Сериализуем payload
body, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("rmqpool: failed to marshal payload: %w", err)
}
// Берём канал из пула
ch := p.getChannel()
if ch == nil {
return fmt.Errorf("rmqpool: channel is nil (connection may be closed)")
}
defer p.returnChannel(ch)
// Объявляем очередь (idempotent)
q, err := ch.QueueDeclare(
queueName,
true, // durable
false, // auto-delete
false, // exclusive
false, // no-wait
nil, // args
)
if err != nil {
return fmt.Errorf("rmqpool: failed to declare queue %q: %w", queueName, err)
}
// Подготавливаем сообщение
msg := amqp.Publishing{
DeliveryMode: options.deliveryMode,
ContentType: options.contentType,
Timestamp: options.timestamp,
Body: body,
}
// Публикуем с учётом контекста
done := make(chan error, 1)
go func() {
err := ch.Publish(
"", // exchange (default)
q.Name, // routing key
false, // mandatory
false, // immediate
msg,
)
done <- err
}()
select {
case err := <-done:
return err
case <-ctx.Done():
return ctx.Err()
}
}
func (p *Publisher) getChannel() *amqp.Channel {
raw := p.pool.Get()
if raw == nil {
ch, _ := p.conn.Channel()
return ch
}
ch := raw.(*amqp.Channel)
if ch.IsClosed() { // ← теперь есть!
ch.Close() // освободить ресурсы
ch, _ = p.conn.Channel()
}
return ch
}
// returnChannel возвращает канал в пул, если он жив.
func (p *Publisher) returnChannel(ch *amqp.Channel) {
if ch != nil && !ch.IsClosed() {
p.pool.Put(ch)
}
}
// PublishOption позволяет кастомизировать публикацию.
type PublishOption func(*publishOptions)
type publishOptions struct {
contentType string
deliveryMode uint8
timestamp time.Time
}
// WithContentType устанавливает Content-Type (по умолчанию "application/json").
func WithContentType(ct string) PublishOption {
return func(o *publishOptions) { o.contentType = ct }
}
// WithTransient делает сообщение transient (не сохраняется на диск).
// По умолчанию — Persistent.
func WithTransient() PublishOption {
return func(o *publishOptions) { o.deliveryMode = amqp.Transient }
}
// WithTimestamp устанавливает кастомную метку времени.
func WithTimestamp(ts time.Time) PublishOption {
return func(o *publishOptions) { o.timestamp = ts }
}