From 62e0633e69a5bd4b658847155bb808beb34b821b Mon Sep 17 00:00:00 2001 From: Iron_Felix Date: Fri, 5 Dec 2025 21:20:51 +0300 Subject: [PATCH] fix: rmq --- modules/backend/handlers/common.go | 22 +-- modules/backend/handlers/titles.go | 1 - modules/backend/main.go | 3 +- modules/backend/rmq/rabbit.go | 214 ++++++----------------------- 4 files changed, 53 insertions(+), 187 deletions(-) diff --git a/modules/backend/handlers/common.go b/modules/backend/handlers/common.go index cad4f0f..58862e1 100644 --- a/modules/backend/handlers/common.go +++ b/modules/backend/handlers/common.go @@ -9,24 +9,24 @@ import ( "strconv" ) -type Handler struct { - publisher *rmq.Publisher -} +// type Handler struct { +// publisher *rmq.Publisher +// } -func New(publisher *rmq.Publisher) *Handler { - return &Handler{publisher: publisher} -} +// func New(publisher *rmq.Publisher) *Handler { +// return &Handler{publisher: publisher} +// } type Server struct { - db *sqlc.Queries - publisher *rmq.Publisher + db *sqlc.Queries + // publisher *rmq.Publisher RPCclient *rmq.RPCClient } -func NewServer(db *sqlc.Queries, publisher *rmq.Publisher, rpcclient *rmq.RPCClient) *Server { +func NewServer(db *sqlc.Queries, rpcclient *rmq.RPCClient) *Server { return &Server{ - db: db, - publisher: publisher, + db: db, + // publisher: publisher, RPCclient: rpcclient, } } diff --git a/modules/backend/handlers/titles.go b/modules/backend/handlers/titles.go index 300cc87..7aeeb11 100644 --- a/modules/backend/handlers/titles.go +++ b/modules/backend/handlers/titles.go @@ -197,7 +197,6 @@ func (s Server) GetTitles(ctx context.Context, request oapi.GetTitlesRequestObje // Делаем RPC-вызов — и ЖДЁМ ответа err := s.RPCclient.Call( ctx, - "svc.media.process.requests", // ← очередь микросервиса mqreq, &reply, ) diff --git a/modules/backend/main.go b/modules/backend/main.go index 755e3ef..e7e6ec8 100644 --- a/modules/backend/main.go +++ b/modules/backend/main.go @@ -59,10 +59,9 @@ func main() { } defer rmqConn.Close() - publisher := rmq.NewPublisher(rmqConn) rpcClient := rmq.NewRPCClient(rmqConn, 30*time.Second) - server := handlers.NewServer(queries, publisher, rpcClient) + server := handlers.NewServer(queries, rpcClient) r.Use(cors.New(cors.Config{ AllowOrigins: []string{AppConfig.ServiceAddress}, diff --git a/modules/backend/rmq/rabbit.go b/modules/backend/rmq/rabbit.go index 52c1979..25abbdb 100644 --- a/modules/backend/rmq/rabbit.go +++ b/modules/backend/rmq/rabbit.go @@ -4,13 +4,16 @@ import ( "context" "encoding/json" "fmt" - oapi "nyanimedb/api" - "sync" "time" + oapi "nyanimedb/api" + amqp "github.com/rabbitmq/amqp091-go" ) +const RPCQueueName = "anime_import_rpc" + +// RabbitRequest не меняем type RabbitRequest struct { Name string `json:"name"` Statuses []oapi.TitleStatus `json:"statuses,omitempty"` @@ -20,151 +23,6 @@ type RabbitRequest struct { Timestamp time.Time `json:"timestamp"` } -// Publisher — потокобезопасный публикатор с пулом каналов. -type Publisher struct { - conn *amqp.Connection - pool *sync.Pool -} - -// NewPublisher создаёт новый Publisher. -// conn должен быть уже установленным и healthy. -// Рекомендуется передавать durable connection с reconnect-логикой. -func NewPublisher(conn *amqp.Connection) *Publisher { - return &Publisher{ - conn: conn, - pool: &sync.Pool{ - New: func() any { - ch, err := conn.Channel() - if err != nil { - // Паника уместна: невозможность открыть канал — критическая ошибка инициализации - panic(fmt.Errorf("rmqpool: failed to create channel: %w", err)) - } - return ch - }, - }, - } -} - -// Publish публикует сообщение в указанную очередь. -// Очередь объявляется как durable (если не существует). -// Поддерживает context для отмены/таймаута. -func (p *Publisher) Publish( - ctx context.Context, - queueName string, - payload RabbitRequest, - opts ...PublishOption, -) error { - // Применяем опции - options := &publishOptions{ - contentType: "application/json", - deliveryMode: amqp.Persistent, - timestamp: time.Now(), - } - for _, opt := range opts { - opt(options) - } - - // Сериализуем payload - body, err := json.Marshal(payload) - if err != nil { - return fmt.Errorf("rmqpool: failed to marshal payload: %w", err) - } - - // Берём канал из пула - ch := p.getChannel() - if ch == nil { - return fmt.Errorf("rmqpool: channel is nil (connection may be closed)") - } - defer p.returnChannel(ch) - - // Объявляем очередь (idempotent) - q, err := ch.QueueDeclare( - queueName, - true, // durable - false, // auto-delete - false, // exclusive - false, // no-wait - nil, // args - ) - if err != nil { - return fmt.Errorf("rmqpool: failed to declare queue %q: %w", queueName, err) - } - - // Подготавливаем сообщение - msg := amqp.Publishing{ - DeliveryMode: options.deliveryMode, - ContentType: options.contentType, - Timestamp: options.timestamp, - Body: body, - } - - // Публикуем с учётом контекста - done := make(chan error, 1) - go func() { - err := ch.Publish( - "", // exchange (default) - q.Name, // routing key - false, // mandatory - false, // immediate - msg, - ) - done <- err - }() - - select { - case err := <-done: - return err - case <-ctx.Done(): - return ctx.Err() - } -} - -func (p *Publisher) getChannel() *amqp.Channel { - raw := p.pool.Get() - if raw == nil { - ch, _ := p.conn.Channel() - return ch - } - ch := raw.(*amqp.Channel) - if ch.IsClosed() { // ← теперь есть! - ch.Close() // освободить ресурсы - ch, _ = p.conn.Channel() - } - return ch -} - -// returnChannel возвращает канал в пул, если он жив. -func (p *Publisher) returnChannel(ch *amqp.Channel) { - if ch != nil && !ch.IsClosed() { - p.pool.Put(ch) - } -} - -// PublishOption позволяет кастомизировать публикацию. -type PublishOption func(*publishOptions) - -type publishOptions struct { - contentType string - deliveryMode uint8 - timestamp time.Time -} - -// WithContentType устанавливает Content-Type (по умолчанию "application/json"). -func WithContentType(ct string) PublishOption { - return func(o *publishOptions) { o.contentType = ct } -} - -// WithTransient делает сообщение transient (не сохраняется на диск). -// По умолчанию — Persistent. -func WithTransient() PublishOption { - return func(o *publishOptions) { o.deliveryMode = amqp.Transient } -} - -// WithTimestamp устанавливает кастомную метку времени. -func WithTimestamp(ts time.Time) PublishOption { - return func(o *publishOptions) { o.timestamp = ts } -} - type RPCClient struct { conn *amqp.Connection timeout time.Duration @@ -174,37 +32,48 @@ func NewRPCClient(conn *amqp.Connection, timeout time.Duration) *RPCClient { return &RPCClient{conn: conn, timeout: timeout} } -// Call отправляет запрос в очередь и ждёт ответа. -// replyPayload — указатель на структуру, в которую раскодировать ответ (например, &MediaResponse{}). func (c *RPCClient) Call( ctx context.Context, - requestQueue string, request RabbitRequest, replyPayload any, ) error { - // 1. Создаём временный канал (не из пула!) + + // 1. Канал для запроса и ответа ch, err := c.conn.Channel() if err != nil { return fmt.Errorf("channel: %w", err) } defer ch.Close() - // 2. Создаём временную очередь для ответов - q, err := ch.QueueDeclare( - "", // auto name - false, // not durable - true, // exclusive - true, // auto-delete + // 2. Декларируем фиксированную очередь RPC (идемпотентно) + _, err = ch.QueueDeclare( + RPCQueueName, + true, // durable + false, // auto-delete + false, // exclusive + false, // no-wait + nil, + ) + if err != nil { + return fmt.Errorf("declare rpc queue: %w", err) + } + + // 3. Создаём временную очередь ДЛЯ ОТВЕТА + replyQueue, err := ch.QueueDeclare( + "", + false, + true, + true, false, nil, ) if err != nil { - return fmt.Errorf("reply queue: %w", err) + return fmt.Errorf("declare reply queue: %w", err) } - // 3. Подписываемся на ответы + // 4. Подписываемся на очередь ответов msgs, err := ch.Consume( - q.Name, + replyQueue.Name, "", true, // auto-ack true, // exclusive @@ -213,28 +82,28 @@ func (c *RPCClient) Call( nil, ) if err != nil { - return fmt.Errorf("consume: %w", err) + return fmt.Errorf("consume reply: %w", err) } - // 4. Готовим correlation ID - corrID := time.Now().UnixNano() + // correlation ID + corrID := fmt.Sprintf("%d", time.Now().UnixNano()) - // 5. Сериализуем запрос + // 5. сериализация запроса body, err := json.Marshal(request) if err != nil { return fmt.Errorf("marshal request: %w", err) } - // 6. Публикуем запрос + // 6. Публикация RPC-запроса err = ch.Publish( "", - requestQueue, + RPCQueueName, // ← фиксированная очередь! false, false, amqp.Publishing{ ContentType: "application/json", - CorrelationId: fmt.Sprintf("%d", corrID), - ReplyTo: q.Name, + CorrelationId: corrID, + ReplyTo: replyQueue.Name, Timestamp: time.Now(), Body: body, }, @@ -244,18 +113,17 @@ func (c *RPCClient) Call( } // 7. Ждём ответ с таймаутом - ctx, cancel := context.WithTimeout(ctx, c.timeout) + timeoutCtx, cancel := context.WithTimeout(ctx, c.timeout) defer cancel() for { select { case msg := <-msgs: - if msg.CorrelationId == fmt.Sprintf("%d", corrID) { + if msg.CorrelationId == corrID { return json.Unmarshal(msg.Body, replyPayload) } - // игнорируем другие сообщения (маловероятно, но возможно) - case <-ctx.Done(): - return ctx.Err() // timeout or cancelled + case <-timeoutCtx.Done(): + return fmt.Errorf("rpc timeout: %w", timeoutCtx.Err()) } } }