package rmq import ( "context" "encoding/json" "fmt" oapi "nyanimedb/api" "sync" "time" amqp "github.com/rabbitmq/amqp091-go" ) type RabbitRequest struct { Name string `json:"name"` Statuses []oapi.TitleStatus `json:"statuses,omitempty"` Rating float64 `json:"rating,omitempty"` Year int32 `json:"year,omitempty"` Season oapi.ReleaseSeason `json:"season,omitempty"` Timestamp time.Time `json:"timestamp"` } // Publisher — потокобезопасный публикатор с пулом каналов. type Publisher struct { conn *amqp.Connection pool *sync.Pool } // NewPublisher создаёт новый Publisher. // conn должен быть уже установленным и healthy. // Рекомендуется передавать durable connection с reconnect-логикой. func NewPublisher(conn *amqp.Connection) *Publisher { return &Publisher{ conn: conn, pool: &sync.Pool{ New: func() any { ch, err := conn.Channel() if err != nil { // Паника уместна: невозможность открыть канал — критическая ошибка инициализации panic(fmt.Errorf("rmqpool: failed to create channel: %w", err)) } return ch }, }, } } // Publish публикует сообщение в указанную очередь. // Очередь объявляется как durable (если не существует). // Поддерживает context для отмены/таймаута. func (p *Publisher) Publish( ctx context.Context, queueName string, payload RabbitRequest, opts ...PublishOption, ) error { // Применяем опции options := &publishOptions{ contentType: "application/json", deliveryMode: amqp.Persistent, timestamp: time.Now(), } for _, opt := range opts { opt(options) } // Сериализуем payload body, err := json.Marshal(payload) if err != nil { return fmt.Errorf("rmqpool: failed to marshal payload: %w", err) } // Берём канал из пула ch := p.getChannel() if ch == nil { return fmt.Errorf("rmqpool: channel is nil (connection may be closed)") } defer p.returnChannel(ch) // Объявляем очередь (idempotent) q, err := ch.QueueDeclare( queueName, true, // durable false, // auto-delete false, // exclusive false, // no-wait nil, // args ) if err != nil { return fmt.Errorf("rmqpool: failed to declare queue %q: %w", queueName, err) } // Подготавливаем сообщение msg := amqp.Publishing{ DeliveryMode: options.deliveryMode, ContentType: options.contentType, Timestamp: options.timestamp, Body: body, } // Публикуем с учётом контекста done := make(chan error, 1) go func() { err := ch.Publish( "", // exchange (default) q.Name, // routing key false, // mandatory false, // immediate msg, ) done <- err }() select { case err := <-done: return err case <-ctx.Done(): return ctx.Err() } } func (p *Publisher) getChannel() *amqp.Channel { raw := p.pool.Get() if raw == nil { ch, _ := p.conn.Channel() return ch } ch := raw.(*amqp.Channel) if ch.IsClosed() { // ← теперь есть! ch.Close() // освободить ресурсы ch, _ = p.conn.Channel() } return ch } // returnChannel возвращает канал в пул, если он жив. func (p *Publisher) returnChannel(ch *amqp.Channel) { if ch != nil && !ch.IsClosed() { p.pool.Put(ch) } } // PublishOption позволяет кастомизировать публикацию. type PublishOption func(*publishOptions) type publishOptions struct { contentType string deliveryMode uint8 timestamp time.Time } // WithContentType устанавливает Content-Type (по умолчанию "application/json"). func WithContentType(ct string) PublishOption { return func(o *publishOptions) { o.contentType = ct } } // WithTransient делает сообщение transient (не сохраняется на диск). // По умолчанию — Persistent. func WithTransient() PublishOption { return func(o *publishOptions) { o.deliveryMode = amqp.Transient } } // WithTimestamp устанавливает кастомную метку времени. func WithTimestamp(ts time.Time) PublishOption { return func(o *publishOptions) { o.timestamp = ts } } type RPCClient struct { conn *amqp.Connection timeout time.Duration } func NewRPCClient(conn *amqp.Connection, timeout time.Duration) *RPCClient { return &RPCClient{conn: conn, timeout: timeout} } // Call отправляет запрос в очередь и ждёт ответа. // replyPayload — указатель на структуру, в которую раскодировать ответ (например, &MediaResponse{}). func (c *RPCClient) Call( ctx context.Context, requestQueue string, request RabbitRequest, replyPayload any, ) error { // 1. Создаём временный канал (не из пула!) ch, err := c.conn.Channel() if err != nil { return fmt.Errorf("channel: %w", err) } defer ch.Close() // 2. Создаём временную очередь для ответов q, err := ch.QueueDeclare( "", // auto name false, // not durable true, // exclusive true, // auto-delete false, nil, ) if err != nil { return fmt.Errorf("reply queue: %w", err) } // 3. Подписываемся на ответы msgs, err := ch.Consume( q.Name, "", true, // auto-ack true, // exclusive false, false, nil, ) if err != nil { return fmt.Errorf("consume: %w", err) } // 4. Готовим correlation ID corrID := time.Now().UnixNano() // 5. Сериализуем запрос body, err := json.Marshal(request) if err != nil { return fmt.Errorf("marshal request: %w", err) } // 6. Публикуем запрос err = ch.Publish( "", requestQueue, false, false, amqp.Publishing{ ContentType: "application/json", CorrelationId: fmt.Sprintf("%d", corrID), ReplyTo: q.Name, Timestamp: time.Now(), Body: body, }, ) if err != nil { return fmt.Errorf("publish: %w", err) } // 7. Ждём ответ с таймаутом ctx, cancel := context.WithTimeout(ctx, c.timeout) defer cancel() for { select { case msg := <-msgs: if msg.CorrelationId == fmt.Sprintf("%d", corrID) { return json.Unmarshal(msg.Body, replyPayload) } // игнорируем другие сообщения (маловероятно, но возможно) case <-ctx.Done(): return ctx.Err() // timeout or cancelled } } }