nyanimedb/modules/backend/rabbit.go

103 lines
3.1 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"context"
"encoding/json"
"fmt"
oapi "nyanimedb/api"
"time"
"github.com/google/uuid"
"github.com/sirupsen/logrus"
"github.com/streadway/amqp"
)
type RabbitRequest struct {
Name string `json:"name"`
Status oapi.TitleStatus `json:"titlestatus,omitempty"`
Rating float64 `json:"titleraring,omitempty"`
Year int32 `json:"year,omitempty"`
Season oapi.ReleaseSeason `json:"season,omitempty"`
Timestamp time.Time `json:"timestamp"`
}
// PublishAndAwaitReply отправляет запрос и ждёт ответа от workerа.
// Возвращает раскодированный ответ или ошибку.
func PublishAndAwaitReply(
ctx context.Context,
ch *amqp.Channel,
requestQueue string, // например: "svc.media.process.requests"
request RabbitRequest, // ваша структура запроса
replyCh chan<- any, // куда положить ответ (вы читаете извне)
) error {
// 1. Создаём временную очередь для ответов
replyQueue, err := ch.QueueDeclare(
"", // auto-generated name
false, // not durable
true, // exclusive
true, // auto-delete
false, // no-wait
nil,
)
if err != nil {
return fmt.Errorf("failed to declare reply queue: %w", err)
}
// 2. Готовим корреляционный ID
corrID := uuid.New().String() // ← используйте github.com/google/uuid
logrus.Infof("New CorrID: %s", corrID)
// 3. Сериализуем запрос
body, err := json.Marshal(request)
if err != nil {
return fmt.Errorf("failed to marshal request: %w", err)
}
// 4. Публикуем запрос
err = ch.Publish(
"", // default exchange (или свой, если используете)
requestQueue,
false,
false,
amqp.Publishing{
ContentType: "application/json",
CorrelationId: corrID,
ReplyTo: replyQueue.Name,
DeliveryMode: amqp.Persistent,
Timestamp: time.Now(),
Body: body,
},
)
if err != nil {
return fmt.Errorf("failed to publish request, corrID: %s : %w", corrID, err)
}
// 5. Подписываемся на ответы
msgs, err := ch.Consume(
replyQueue.Name,
"", // consumer tag
true, // auto-ack
true, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
return fmt.Errorf("failed to consume from reply queue: %w", err)
}
// 6. Ожидаем ответ с таймаутом
select {
case msg := <-msgs:
if msg.CorrelationId != corrID {
return fmt.Errorf("correlation ID mismatch: got %s, expected %s", msg.CorrelationId, corrID)
}
// Десериализуем — тут можно передать target-структуру или использовать interface{}
// В данном случае просто возвращаем байты или пусть вызывающая сторона парсит
replyCh <- msg.Body // или json.Unmarshal → и отправить структуру в канал
return nil
case <-ctx.Done():
return ctx.Err()
}
}