feat: now back wait for RMQ answer
This commit is contained in:
parent
a29aefbe97
commit
ab29c33f5b
4 changed files with 143 additions and 30 deletions
|
|
@ -13,8 +13,8 @@ import (
|
|||
|
||||
type RabbitRequest struct {
|
||||
Name string `json:"name"`
|
||||
Status oapi.TitleStatus `json:"titlestatus,omitempty"`
|
||||
Rating float64 `json:"titleraring,omitempty"`
|
||||
Statuses []oapi.TitleStatus `json:"statuses,omitempty"`
|
||||
Rating float64 `json:"rating,omitempty"`
|
||||
Year int32 `json:"year,omitempty"`
|
||||
Season oapi.ReleaseSeason `json:"season,omitempty"`
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
|
|
@ -164,3 +164,98 @@ func WithTransient() PublishOption {
|
|||
func WithTimestamp(ts time.Time) PublishOption {
|
||||
return func(o *publishOptions) { o.timestamp = ts }
|
||||
}
|
||||
|
||||
type RPCClient struct {
|
||||
conn *amqp.Connection
|
||||
timeout time.Duration
|
||||
}
|
||||
|
||||
func NewRPCClient(conn *amqp.Connection, timeout time.Duration) *RPCClient {
|
||||
return &RPCClient{conn: conn, timeout: timeout}
|
||||
}
|
||||
|
||||
// Call отправляет запрос в очередь и ждёт ответа.
|
||||
// replyPayload — указатель на структуру, в которую раскодировать ответ (например, &MediaResponse{}).
|
||||
func (c *RPCClient) Call(
|
||||
ctx context.Context,
|
||||
requestQueue string,
|
||||
request RabbitRequest,
|
||||
replyPayload any,
|
||||
) error {
|
||||
// 1. Создаём временный канал (не из пула!)
|
||||
ch, err := c.conn.Channel()
|
||||
if err != nil {
|
||||
return fmt.Errorf("channel: %w", err)
|
||||
}
|
||||
defer ch.Close()
|
||||
|
||||
// 2. Создаём временную очередь для ответов
|
||||
q, err := ch.QueueDeclare(
|
||||
"", // auto name
|
||||
false, // not durable
|
||||
true, // exclusive
|
||||
true, // auto-delete
|
||||
false,
|
||||
nil,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("reply queue: %w", err)
|
||||
}
|
||||
|
||||
// 3. Подписываемся на ответы
|
||||
msgs, err := ch.Consume(
|
||||
q.Name,
|
||||
"",
|
||||
true, // auto-ack
|
||||
true, // exclusive
|
||||
false,
|
||||
false,
|
||||
nil,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("consume: %w", err)
|
||||
}
|
||||
|
||||
// 4. Готовим correlation ID
|
||||
corrID := time.Now().UnixNano()
|
||||
|
||||
// 5. Сериализуем запрос
|
||||
body, err := json.Marshal(request)
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshal request: %w", err)
|
||||
}
|
||||
|
||||
// 6. Публикуем запрос
|
||||
err = ch.Publish(
|
||||
"",
|
||||
requestQueue,
|
||||
false,
|
||||
false,
|
||||
amqp.Publishing{
|
||||
ContentType: "application/json",
|
||||
CorrelationId: fmt.Sprintf("%d", corrID),
|
||||
ReplyTo: q.Name,
|
||||
Timestamp: time.Now(),
|
||||
Body: body,
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("publish: %w", err)
|
||||
}
|
||||
|
||||
// 7. Ждём ответ с таймаутом
|
||||
ctx, cancel := context.WithTimeout(ctx, c.timeout)
|
||||
defer cancel()
|
||||
|
||||
for {
|
||||
select {
|
||||
case msg := <-msgs:
|
||||
if msg.CorrelationId == fmt.Sprintf("%d", corrID) {
|
||||
return json.Unmarshal(msg.Body, replyPayload)
|
||||
}
|
||||
// игнорируем другие сообщения (маловероятно, но возможно)
|
||||
case <-ctx.Done():
|
||||
return ctx.Err() // timeout or cancelled
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue