package rmq import ( "context" "encoding/json" "fmt" "time" oapi "nyanimedb/api" amqp "github.com/rabbitmq/amqp091-go" ) const RPCQueueName = "anime_import_rpc" // RabbitRequest не меняем type RabbitRequest struct { Name string `json:"name"` Statuses []oapi.TitleStatus `json:"statuses,omitempty"` Rating float64 `json:"rating,omitempty"` Year int32 `json:"year,omitempty"` Season oapi.ReleaseSeason `json:"season,omitempty"` Timestamp time.Time `json:"timestamp"` } type RPCClient struct { conn *amqp.Connection timeout time.Duration } func NewRPCClient(conn *amqp.Connection, timeout time.Duration) *RPCClient { return &RPCClient{conn: conn, timeout: timeout} } func (c *RPCClient) Call( ctx context.Context, request RabbitRequest, replyPayload any, ) error { // 1. Канал для запроса и ответа ch, err := c.conn.Channel() if err != nil { return fmt.Errorf("channel: %w", err) } defer ch.Close() // 2. Декларируем фиксированную очередь RPC (идемпотентно) _, err = ch.QueueDeclare( RPCQueueName, true, // durable false, // auto-delete false, // exclusive false, // no-wait nil, ) if err != nil { return fmt.Errorf("declare rpc queue: %w", err) } // 3. Создаём временную очередь ДЛЯ ОТВЕТА replyQueue, err := ch.QueueDeclare( "", false, true, true, false, nil, ) if err != nil { return fmt.Errorf("declare reply queue: %w", err) } // 4. Подписываемся на очередь ответов msgs, err := ch.Consume( replyQueue.Name, "", true, // auto-ack true, // exclusive false, false, nil, ) if err != nil { return fmt.Errorf("consume reply: %w", err) } // correlation ID corrID := fmt.Sprintf("%d", time.Now().UnixNano()) // 5. сериализация запроса body, err := json.Marshal(request) if err != nil { return fmt.Errorf("marshal request: %w", err) } // 6. Публикация RPC-запроса err = ch.Publish( "", RPCQueueName, // ← фиксированная очередь! false, false, amqp.Publishing{ ContentType: "application/json", CorrelationId: corrID, ReplyTo: replyQueue.Name, Timestamp: time.Now(), Body: body, }, ) if err != nil { return fmt.Errorf("publish: %w", err) } // 7. Ждём ответ с таймаутом timeoutCtx, cancel := context.WithTimeout(ctx, c.timeout) defer cancel() for { select { case msg := <-msgs: if msg.CorrelationId == corrID { return json.Unmarshal(msg.Body, replyPayload) } case <-timeoutCtx.Done(): return fmt.Errorf("rpc timeout: %w", timeoutCtx.Err()) } } }