fix: rmq
This commit is contained in:
parent
6a5994e33e
commit
62e0633e69
4 changed files with 53 additions and 187 deletions
|
|
@ -9,24 +9,24 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Handler struct {
|
// type Handler struct {
|
||||||
publisher *rmq.Publisher
|
// publisher *rmq.Publisher
|
||||||
}
|
// }
|
||||||
|
|
||||||
func New(publisher *rmq.Publisher) *Handler {
|
// func New(publisher *rmq.Publisher) *Handler {
|
||||||
return &Handler{publisher: publisher}
|
// return &Handler{publisher: publisher}
|
||||||
}
|
// }
|
||||||
|
|
||||||
type Server struct {
|
type Server struct {
|
||||||
db *sqlc.Queries
|
db *sqlc.Queries
|
||||||
publisher *rmq.Publisher
|
// publisher *rmq.Publisher
|
||||||
RPCclient *rmq.RPCClient
|
RPCclient *rmq.RPCClient
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewServer(db *sqlc.Queries, publisher *rmq.Publisher, rpcclient *rmq.RPCClient) *Server {
|
func NewServer(db *sqlc.Queries, rpcclient *rmq.RPCClient) *Server {
|
||||||
return &Server{
|
return &Server{
|
||||||
db: db,
|
db: db,
|
||||||
publisher: publisher,
|
// publisher: publisher,
|
||||||
RPCclient: rpcclient,
|
RPCclient: rpcclient,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -197,7 +197,6 @@ func (s Server) GetTitles(ctx context.Context, request oapi.GetTitlesRequestObje
|
||||||
// Делаем RPC-вызов — и ЖДЁМ ответа
|
// Делаем RPC-вызов — и ЖДЁМ ответа
|
||||||
err := s.RPCclient.Call(
|
err := s.RPCclient.Call(
|
||||||
ctx,
|
ctx,
|
||||||
"svc.media.process.requests", // ← очередь микросервиса
|
|
||||||
mqreq,
|
mqreq,
|
||||||
&reply,
|
&reply,
|
||||||
)
|
)
|
||||||
|
|
|
||||||
|
|
@ -59,10 +59,9 @@ func main() {
|
||||||
}
|
}
|
||||||
defer rmqConn.Close()
|
defer rmqConn.Close()
|
||||||
|
|
||||||
publisher := rmq.NewPublisher(rmqConn)
|
|
||||||
rpcClient := rmq.NewRPCClient(rmqConn, 30*time.Second)
|
rpcClient := rmq.NewRPCClient(rmqConn, 30*time.Second)
|
||||||
|
|
||||||
server := handlers.NewServer(queries, publisher, rpcClient)
|
server := handlers.NewServer(queries, rpcClient)
|
||||||
|
|
||||||
r.Use(cors.New(cors.Config{
|
r.Use(cors.New(cors.Config{
|
||||||
AllowOrigins: []string{AppConfig.ServiceAddress},
|
AllowOrigins: []string{AppConfig.ServiceAddress},
|
||||||
|
|
|
||||||
|
|
@ -4,13 +4,16 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
oapi "nyanimedb/api"
|
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
oapi "nyanimedb/api"
|
||||||
|
|
||||||
amqp "github.com/rabbitmq/amqp091-go"
|
amqp "github.com/rabbitmq/amqp091-go"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const RPCQueueName = "anime_import_rpc"
|
||||||
|
|
||||||
|
// RabbitRequest не меняем
|
||||||
type RabbitRequest struct {
|
type RabbitRequest struct {
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
Statuses []oapi.TitleStatus `json:"statuses,omitempty"`
|
Statuses []oapi.TitleStatus `json:"statuses,omitempty"`
|
||||||
|
|
@ -20,151 +23,6 @@ type RabbitRequest struct {
|
||||||
Timestamp time.Time `json:"timestamp"`
|
Timestamp time.Time `json:"timestamp"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// Publisher — потокобезопасный публикатор с пулом каналов.
|
|
||||||
type Publisher struct {
|
|
||||||
conn *amqp.Connection
|
|
||||||
pool *sync.Pool
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewPublisher создаёт новый Publisher.
|
|
||||||
// conn должен быть уже установленным и healthy.
|
|
||||||
// Рекомендуется передавать durable connection с reconnect-логикой.
|
|
||||||
func NewPublisher(conn *amqp.Connection) *Publisher {
|
|
||||||
return &Publisher{
|
|
||||||
conn: conn,
|
|
||||||
pool: &sync.Pool{
|
|
||||||
New: func() any {
|
|
||||||
ch, err := conn.Channel()
|
|
||||||
if err != nil {
|
|
||||||
// Паника уместна: невозможность открыть канал — критическая ошибка инициализации
|
|
||||||
panic(fmt.Errorf("rmqpool: failed to create channel: %w", err))
|
|
||||||
}
|
|
||||||
return ch
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Publish публикует сообщение в указанную очередь.
|
|
||||||
// Очередь объявляется как durable (если не существует).
|
|
||||||
// Поддерживает context для отмены/таймаута.
|
|
||||||
func (p *Publisher) Publish(
|
|
||||||
ctx context.Context,
|
|
||||||
queueName string,
|
|
||||||
payload RabbitRequest,
|
|
||||||
opts ...PublishOption,
|
|
||||||
) error {
|
|
||||||
// Применяем опции
|
|
||||||
options := &publishOptions{
|
|
||||||
contentType: "application/json",
|
|
||||||
deliveryMode: amqp.Persistent,
|
|
||||||
timestamp: time.Now(),
|
|
||||||
}
|
|
||||||
for _, opt := range opts {
|
|
||||||
opt(options)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Сериализуем payload
|
|
||||||
body, err := json.Marshal(payload)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("rmqpool: failed to marshal payload: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Берём канал из пула
|
|
||||||
ch := p.getChannel()
|
|
||||||
if ch == nil {
|
|
||||||
return fmt.Errorf("rmqpool: channel is nil (connection may be closed)")
|
|
||||||
}
|
|
||||||
defer p.returnChannel(ch)
|
|
||||||
|
|
||||||
// Объявляем очередь (idempotent)
|
|
||||||
q, err := ch.QueueDeclare(
|
|
||||||
queueName,
|
|
||||||
true, // durable
|
|
||||||
false, // auto-delete
|
|
||||||
false, // exclusive
|
|
||||||
false, // no-wait
|
|
||||||
nil, // args
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("rmqpool: failed to declare queue %q: %w", queueName, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Подготавливаем сообщение
|
|
||||||
msg := amqp.Publishing{
|
|
||||||
DeliveryMode: options.deliveryMode,
|
|
||||||
ContentType: options.contentType,
|
|
||||||
Timestamp: options.timestamp,
|
|
||||||
Body: body,
|
|
||||||
}
|
|
||||||
|
|
||||||
// Публикуем с учётом контекста
|
|
||||||
done := make(chan error, 1)
|
|
||||||
go func() {
|
|
||||||
err := ch.Publish(
|
|
||||||
"", // exchange (default)
|
|
||||||
q.Name, // routing key
|
|
||||||
false, // mandatory
|
|
||||||
false, // immediate
|
|
||||||
msg,
|
|
||||||
)
|
|
||||||
done <- err
|
|
||||||
}()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case err := <-done:
|
|
||||||
return err
|
|
||||||
case <-ctx.Done():
|
|
||||||
return ctx.Err()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Publisher) getChannel() *amqp.Channel {
|
|
||||||
raw := p.pool.Get()
|
|
||||||
if raw == nil {
|
|
||||||
ch, _ := p.conn.Channel()
|
|
||||||
return ch
|
|
||||||
}
|
|
||||||
ch := raw.(*amqp.Channel)
|
|
||||||
if ch.IsClosed() { // ← теперь есть!
|
|
||||||
ch.Close() // освободить ресурсы
|
|
||||||
ch, _ = p.conn.Channel()
|
|
||||||
}
|
|
||||||
return ch
|
|
||||||
}
|
|
||||||
|
|
||||||
// returnChannel возвращает канал в пул, если он жив.
|
|
||||||
func (p *Publisher) returnChannel(ch *amqp.Channel) {
|
|
||||||
if ch != nil && !ch.IsClosed() {
|
|
||||||
p.pool.Put(ch)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// PublishOption позволяет кастомизировать публикацию.
|
|
||||||
type PublishOption func(*publishOptions)
|
|
||||||
|
|
||||||
type publishOptions struct {
|
|
||||||
contentType string
|
|
||||||
deliveryMode uint8
|
|
||||||
timestamp time.Time
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithContentType устанавливает Content-Type (по умолчанию "application/json").
|
|
||||||
func WithContentType(ct string) PublishOption {
|
|
||||||
return func(o *publishOptions) { o.contentType = ct }
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithTransient делает сообщение transient (не сохраняется на диск).
|
|
||||||
// По умолчанию — Persistent.
|
|
||||||
func WithTransient() PublishOption {
|
|
||||||
return func(o *publishOptions) { o.deliveryMode = amqp.Transient }
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithTimestamp устанавливает кастомную метку времени.
|
|
||||||
func WithTimestamp(ts time.Time) PublishOption {
|
|
||||||
return func(o *publishOptions) { o.timestamp = ts }
|
|
||||||
}
|
|
||||||
|
|
||||||
type RPCClient struct {
|
type RPCClient struct {
|
||||||
conn *amqp.Connection
|
conn *amqp.Connection
|
||||||
timeout time.Duration
|
timeout time.Duration
|
||||||
|
|
@ -174,37 +32,48 @@ func NewRPCClient(conn *amqp.Connection, timeout time.Duration) *RPCClient {
|
||||||
return &RPCClient{conn: conn, timeout: timeout}
|
return &RPCClient{conn: conn, timeout: timeout}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Call отправляет запрос в очередь и ждёт ответа.
|
|
||||||
// replyPayload — указатель на структуру, в которую раскодировать ответ (например, &MediaResponse{}).
|
|
||||||
func (c *RPCClient) Call(
|
func (c *RPCClient) Call(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
requestQueue string,
|
|
||||||
request RabbitRequest,
|
request RabbitRequest,
|
||||||
replyPayload any,
|
replyPayload any,
|
||||||
) error {
|
) error {
|
||||||
// 1. Создаём временный канал (не из пула!)
|
|
||||||
|
// 1. Канал для запроса и ответа
|
||||||
ch, err := c.conn.Channel()
|
ch, err := c.conn.Channel()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("channel: %w", err)
|
return fmt.Errorf("channel: %w", err)
|
||||||
}
|
}
|
||||||
defer ch.Close()
|
defer ch.Close()
|
||||||
|
|
||||||
// 2. Создаём временную очередь для ответов
|
// 2. Декларируем фиксированную очередь RPC (идемпотентно)
|
||||||
q, err := ch.QueueDeclare(
|
_, err = ch.QueueDeclare(
|
||||||
"", // auto name
|
RPCQueueName,
|
||||||
false, // not durable
|
true, // durable
|
||||||
true, // exclusive
|
false, // auto-delete
|
||||||
true, // auto-delete
|
false, // exclusive
|
||||||
|
false, // no-wait
|
||||||
|
nil,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("declare rpc queue: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. Создаём временную очередь ДЛЯ ОТВЕТА
|
||||||
|
replyQueue, err := ch.QueueDeclare(
|
||||||
|
"",
|
||||||
|
false,
|
||||||
|
true,
|
||||||
|
true,
|
||||||
false,
|
false,
|
||||||
nil,
|
nil,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("reply queue: %w", err)
|
return fmt.Errorf("declare reply queue: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 3. Подписываемся на ответы
|
// 4. Подписываемся на очередь ответов
|
||||||
msgs, err := ch.Consume(
|
msgs, err := ch.Consume(
|
||||||
q.Name,
|
replyQueue.Name,
|
||||||
"",
|
"",
|
||||||
true, // auto-ack
|
true, // auto-ack
|
||||||
true, // exclusive
|
true, // exclusive
|
||||||
|
|
@ -213,28 +82,28 @@ func (c *RPCClient) Call(
|
||||||
nil,
|
nil,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("consume: %w", err)
|
return fmt.Errorf("consume reply: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 4. Готовим correlation ID
|
// correlation ID
|
||||||
corrID := time.Now().UnixNano()
|
corrID := fmt.Sprintf("%d", time.Now().UnixNano())
|
||||||
|
|
||||||
// 5. Сериализуем запрос
|
// 5. сериализация запроса
|
||||||
body, err := json.Marshal(request)
|
body, err := json.Marshal(request)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("marshal request: %w", err)
|
return fmt.Errorf("marshal request: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 6. Публикуем запрос
|
// 6. Публикация RPC-запроса
|
||||||
err = ch.Publish(
|
err = ch.Publish(
|
||||||
"",
|
"",
|
||||||
requestQueue,
|
RPCQueueName, // ← фиксированная очередь!
|
||||||
false,
|
false,
|
||||||
false,
|
false,
|
||||||
amqp.Publishing{
|
amqp.Publishing{
|
||||||
ContentType: "application/json",
|
ContentType: "application/json",
|
||||||
CorrelationId: fmt.Sprintf("%d", corrID),
|
CorrelationId: corrID,
|
||||||
ReplyTo: q.Name,
|
ReplyTo: replyQueue.Name,
|
||||||
Timestamp: time.Now(),
|
Timestamp: time.Now(),
|
||||||
Body: body,
|
Body: body,
|
||||||
},
|
},
|
||||||
|
|
@ -244,18 +113,17 @@ func (c *RPCClient) Call(
|
||||||
}
|
}
|
||||||
|
|
||||||
// 7. Ждём ответ с таймаутом
|
// 7. Ждём ответ с таймаутом
|
||||||
ctx, cancel := context.WithTimeout(ctx, c.timeout)
|
timeoutCtx, cancel := context.WithTimeout(ctx, c.timeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case msg := <-msgs:
|
case msg := <-msgs:
|
||||||
if msg.CorrelationId == fmt.Sprintf("%d", corrID) {
|
if msg.CorrelationId == corrID {
|
||||||
return json.Unmarshal(msg.Body, replyPayload)
|
return json.Unmarshal(msg.Body, replyPayload)
|
||||||
}
|
}
|
||||||
// игнорируем другие сообщения (маловероятно, но возможно)
|
case <-timeoutCtx.Done():
|
||||||
case <-ctx.Done():
|
return fmt.Errorf("rpc timeout: %w", timeoutCtx.Err())
|
||||||
return ctx.Err() // timeout or cancelled
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue