From 77a63a1c748e073470ed19b355e5fd1860955597 Mon Sep 17 00:00:00 2001 From: Iron_Felix Date: Sun, 30 Nov 2025 02:57:11 +0300 Subject: [PATCH] feat: rabbitMQ is now calling from seatchtitles --- go.mod | 1 + go.sum | 2 + modules/backend/handlers/common.go | 21 +++- modules/backend/handlers/titles.go | 25 +++++ modules/backend/main.go | 43 +++++--- modules/backend/rabbit.go | 103 ------------------ modules/backend/rmq/rabbit.go | 166 +++++++++++++++++++++++++++++ 7 files changed, 237 insertions(+), 124 deletions(-) delete mode 100644 modules/backend/rabbit.go create mode 100644 modules/backend/rmq/rabbit.go diff --git a/go.mod b/go.mod index 7fc0e5f..f29cbb1 100644 --- a/go.mod +++ b/go.mod @@ -36,6 +36,7 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/quic-go/qpack v0.5.1 // indirect github.com/quic-go/quic-go v0.54.0 // indirect + github.com/rabbitmq/amqp091-go v1.10.0 // indirect github.com/streadway/amqp v1.1.0 // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/ugorji/go/codec v1.3.0 // indirect diff --git a/go.sum b/go.sum index e52e5c9..59cc7ba 100644 --- a/go.sum +++ b/go.sum @@ -70,6 +70,8 @@ github.com/quic-go/qpack v0.5.1 h1:giqksBPnT/HDtZ6VhtFKgoLOWmlyo9Ei6u9PqzIMbhI= github.com/quic-go/qpack v0.5.1/go.mod h1:+PC4XFrEskIVkcLzpEkbLqq1uCoxPhQuvK5rH1ZgaEg= github.com/quic-go/quic-go v0.54.0 h1:6s1YB9QotYI6Ospeiguknbp2Znb/jZYjZLRXn9kMQBg= github.com/quic-go/quic-go v0.54.0/go.mod h1:e68ZEaCdyviluZmy44P6Iey98v/Wfz6HCjQEm+l8zTY= +github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= +github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad/go.mod h1:qLr4V1qq6nMqFKkMo8ZTx3f+BZEkzsRUY10Xsm2mwU0= diff --git a/modules/backend/handlers/common.go b/modules/backend/handlers/common.go index f820db6..aece414 100644 --- a/modules/backend/handlers/common.go +++ b/modules/backend/handlers/common.go @@ -4,16 +4,29 @@ import ( "encoding/json" "fmt" oapi "nyanimedb/api" + "nyanimedb/modules/backend/rmq" sqlc "nyanimedb/sql" "strconv" ) -type Server struct { - db *sqlc.Queries +type Handler struct { + publisher *rmq.Publisher } -func NewServer(db *sqlc.Queries) Server { - return Server{db: db} +func New(publisher *rmq.Publisher) *Handler { + return &Handler{publisher: publisher} +} + +type Server struct { + db *sqlc.Queries + publisher *rmq.Publisher // ← добавьте это поле +} + +func NewServer(db *sqlc.Queries, publisher *rmq.Publisher) *Server { + return &Server{ + db: db, + publisher: publisher, + } } func sql2StorageType(s *sqlc.StorageTypeT) (*oapi.StorageType, error) { diff --git a/modules/backend/handlers/titles.go b/modules/backend/handlers/titles.go index 77af7e4..9f11016 100644 --- a/modules/backend/handlers/titles.go +++ b/modules/backend/handlers/titles.go @@ -5,8 +5,10 @@ import ( "encoding/json" "fmt" oapi "nyanimedb/api" + "nyanimedb/modules/backend/rmq" sqlc "nyanimedb/sql" "strconv" + "time" "github.com/jackc/pgx/v5" log "github.com/sirupsen/logrus" @@ -154,6 +156,29 @@ func (s Server) GetTitle(ctx context.Context, request oapi.GetTitleRequestObject } func (s Server) GetTitles(ctx context.Context, request oapi.GetTitlesRequestObject) (oapi.GetTitlesResponseObject, error) { + + if request.Params.ExtSearch != nil && *request.Params.ExtSearch { + // Публикуем событие — как и просили + event := rmq.RabbitRequest{ + Name: "Attack on titans", + // Status oapi.TitleStatus `json:"titlestatus,omitempty"` + // Rating float64 `json:"titleraring,omitempty"` + // Year int32 `json:"year,omitempty"` + // Season oapi.ReleaseSeason `json:"season,omitempty"` + Timestamp: time.Now(), + } + + // Контекст с таймаутом (не блокируем ответ) + publishCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + + if err := s.publisher.Publish(publishCtx, "events.user", event); err != nil { + log.Errorf("RMQ publish failed (non-critical): %v", err) + } else { + log.Infof("RMQ publish succeed %v", err) + } + } + opai_titles := make([]oapi.Title, 0) word := Word2Sqlc(request.Params.Word) diff --git a/modules/backend/main.go b/modules/backend/main.go index 3ac6603..25f175a 100644 --- a/modules/backend/main.go +++ b/modules/backend/main.go @@ -3,6 +3,7 @@ package main import ( "context" "fmt" + "net/http" sqlc "nyanimedb/sql" "os" "reflect" @@ -10,11 +11,14 @@ import ( oapi "nyanimedb/api" handlers "nyanimedb/modules/backend/handlers" + "nyanimedb/modules/backend/rmq" "github.com/gin-contrib/cors" "github.com/gin-gonic/gin" "github.com/jackc/pgx/v5/pgxpool" "github.com/pelletier/go-toml/v2" + "github.com/rabbitmq/amqp091-go" + log "github.com/sirupsen/logrus" ) var AppConfig Config @@ -43,7 +47,21 @@ func main() { queries := sqlc.New(pool) - server := handlers.NewServer(queries) + // === RabbitMQ setup === + rmqURL := os.Getenv("RABBITMQ_URL") + if rmqURL == "" { + rmqURL = "amqp://guest:guest@10.1.0.65:5672/" + } + + rmqConn, err := amqp091.Dial(rmqURL) + if err != nil { + log.Fatalf("Failed to connect to RabbitMQ: %v", err) + } + defer rmqConn.Close() + + publisher := rmq.NewPublisher(rmqConn) + + server := handlers.NewServer(queries, publisher) // r.LoadHTMLGlob("templates/*") r.Use(cors.New(cors.Config{ @@ -60,24 +78,15 @@ func main() { // сюда можно добавить middlewares, если нужно []oapi.StrictMiddlewareFunc{}, )) - // r.GET("/", func(c *gin.Context) { - // c.HTML(http.StatusOK, "index.html", gin.H{ - // "title": "Welcome Page", - // "message": "Hello, Gin with HTML templates!", - // }) - // }) - // r.GET("/api", func(c *gin.Context) { - // items := []Item{ - // {ID: 1, Title: "First Item", Description: "This is the description of the first item."}, - // {ID: 2, Title: "Second Item", Description: "This is the description of the second item."}, - // {ID: 3, Title: "Third Item", Description: "This is the description of the third item."}, - // } + // Внедряем publisher в сервер + server = handlers.NewServer(queries, publisher) - // c.JSON(http.StatusOK, items) - // }) - - r.Run(":8080") + // Запуск + log.Infof("Server starting on :8080") + if err := r.Run(":8080"); err != nil && err != http.ErrServerClosed { + log.Fatalf("server failed: %v", err) + } } func InitConfig() error { diff --git a/modules/backend/rabbit.go b/modules/backend/rabbit.go deleted file mode 100644 index f08bf39..0000000 --- a/modules/backend/rabbit.go +++ /dev/null @@ -1,103 +0,0 @@ -package main - -import ( - "context" - "encoding/json" - "fmt" - oapi "nyanimedb/api" - "time" - - "github.com/google/uuid" - "github.com/sirupsen/logrus" - "github.com/streadway/amqp" -) - -type RabbitRequest struct { - Name string `json:"name"` - Status oapi.TitleStatus `json:"titlestatus,omitempty"` - Rating float64 `json:"titleraring,omitempty"` - Year int32 `json:"year,omitempty"` - Season oapi.ReleaseSeason `json:"season,omitempty"` - Timestamp time.Time `json:"timestamp"` -} - -// PublishAndAwaitReply отправляет запрос и ждёт ответа от worker’а. -// Возвращает раскодированный ответ или ошибку. -func PublishAndAwaitReply( - ctx context.Context, - ch *amqp.Channel, - requestQueue string, // например: "svc.media.process.requests" - request RabbitRequest, // ваша структура запроса - replyCh chan<- any, // куда положить ответ (вы читаете извне) -) error { - // 1. Создаём временную очередь для ответов - replyQueue, err := ch.QueueDeclare( - "", // auto-generated name - false, // not durable - true, // exclusive - true, // auto-delete - false, // no-wait - nil, - ) - if err != nil { - return fmt.Errorf("failed to declare reply queue: %w", err) - } - - // 2. Готовим корреляционный ID - corrID := uuid.New().String() // ← используйте github.com/google/uuid - logrus.Infof("New CorrID: %s", corrID) - - // 3. Сериализуем запрос - body, err := json.Marshal(request) - if err != nil { - return fmt.Errorf("failed to marshal request: %w", err) - } - - // 4. Публикуем запрос - err = ch.Publish( - "", // default exchange (или свой, если используете) - requestQueue, - false, - false, - amqp.Publishing{ - ContentType: "application/json", - CorrelationId: corrID, - ReplyTo: replyQueue.Name, - DeliveryMode: amqp.Persistent, - Timestamp: time.Now(), - Body: body, - }, - ) - if err != nil { - return fmt.Errorf("failed to publish request, corrID: %s : %w", corrID, err) - } - - // 5. Подписываемся на ответы - msgs, err := ch.Consume( - replyQueue.Name, - "", // consumer tag - true, // auto-ack - true, // exclusive - false, // no-local - false, // no-wait - nil, // args - ) - if err != nil { - return fmt.Errorf("failed to consume from reply queue: %w", err) - } - - // 6. Ожидаем ответ с таймаутом - select { - case msg := <-msgs: - if msg.CorrelationId != corrID { - return fmt.Errorf("correlation ID mismatch: got %s, expected %s", msg.CorrelationId, corrID) - } - // Десериализуем — тут можно передать target-структуру или использовать interface{} - // В данном случае просто возвращаем байты или пусть вызывающая сторона парсит - replyCh <- msg.Body // или json.Unmarshal → и отправить структуру в канал - return nil - - case <-ctx.Done(): - return ctx.Err() - } -} diff --git a/modules/backend/rmq/rabbit.go b/modules/backend/rmq/rabbit.go new file mode 100644 index 0000000..85df89b --- /dev/null +++ b/modules/backend/rmq/rabbit.go @@ -0,0 +1,166 @@ +package rmq + +import ( + "context" + "encoding/json" + "fmt" + oapi "nyanimedb/api" + "sync" + "time" + + amqp "github.com/rabbitmq/amqp091-go" +) + +type RabbitRequest struct { + Name string `json:"name"` + Status oapi.TitleStatus `json:"titlestatus,omitempty"` + Rating float64 `json:"titleraring,omitempty"` + Year int32 `json:"year,omitempty"` + Season oapi.ReleaseSeason `json:"season,omitempty"` + Timestamp time.Time `json:"timestamp"` +} + +// Publisher — потокобезопасный публикатор с пулом каналов. +type Publisher struct { + conn *amqp.Connection + pool *sync.Pool +} + +// NewPublisher создаёт новый Publisher. +// conn должен быть уже установленным и healthy. +// Рекомендуется передавать durable connection с reconnect-логикой. +func NewPublisher(conn *amqp.Connection) *Publisher { + return &Publisher{ + conn: conn, + pool: &sync.Pool{ + New: func() any { + ch, err := conn.Channel() + if err != nil { + // Паника уместна: невозможность открыть канал — критическая ошибка инициализации + panic(fmt.Errorf("rmqpool: failed to create channel: %w", err)) + } + return ch + }, + }, + } +} + +// Publish публикует сообщение в указанную очередь. +// Очередь объявляется как durable (если не существует). +// Поддерживает context для отмены/таймаута. +func (p *Publisher) Publish( + ctx context.Context, + queueName string, + payload RabbitRequest, + opts ...PublishOption, +) error { + // Применяем опции + options := &publishOptions{ + contentType: "application/json", + deliveryMode: amqp.Persistent, + timestamp: time.Now(), + } + for _, opt := range opts { + opt(options) + } + + // Сериализуем payload + body, err := json.Marshal(payload) + if err != nil { + return fmt.Errorf("rmqpool: failed to marshal payload: %w", err) + } + + // Берём канал из пула + ch := p.getChannel() + if ch == nil { + return fmt.Errorf("rmqpool: channel is nil (connection may be closed)") + } + defer p.returnChannel(ch) + + // Объявляем очередь (idempotent) + q, err := ch.QueueDeclare( + queueName, + true, // durable + false, // auto-delete + false, // exclusive + false, // no-wait + nil, // args + ) + if err != nil { + return fmt.Errorf("rmqpool: failed to declare queue %q: %w", queueName, err) + } + + // Подготавливаем сообщение + msg := amqp.Publishing{ + DeliveryMode: options.deliveryMode, + ContentType: options.contentType, + Timestamp: options.timestamp, + Body: body, + } + + // Публикуем с учётом контекста + done := make(chan error, 1) + go func() { + err := ch.Publish( + "", // exchange (default) + q.Name, // routing key + false, // mandatory + false, // immediate + msg, + ) + done <- err + }() + + select { + case err := <-done: + return err + case <-ctx.Done(): + return ctx.Err() + } +} + +func (p *Publisher) getChannel() *amqp.Channel { + raw := p.pool.Get() + if raw == nil { + ch, _ := p.conn.Channel() + return ch + } + ch := raw.(*amqp.Channel) + if ch.IsClosed() { // ← теперь есть! + ch.Close() // освободить ресурсы + ch, _ = p.conn.Channel() + } + return ch +} + +// returnChannel возвращает канал в пул, если он жив. +func (p *Publisher) returnChannel(ch *amqp.Channel) { + if ch != nil && !ch.IsClosed() { + p.pool.Put(ch) + } +} + +// PublishOption позволяет кастомизировать публикацию. +type PublishOption func(*publishOptions) + +type publishOptions struct { + contentType string + deliveryMode uint8 + timestamp time.Time +} + +// WithContentType устанавливает Content-Type (по умолчанию "application/json"). +func WithContentType(ct string) PublishOption { + return func(o *publishOptions) { o.contentType = ct } +} + +// WithTransient делает сообщение transient (не сохраняется на диск). +// По умолчанию — Persistent. +func WithTransient() PublishOption { + return func(o *publishOptions) { o.deliveryMode = amqp.Transient } +} + +// WithTimestamp устанавливает кастомную метку времени. +func WithTimestamp(ts time.Time) PublishOption { + return func(o *publishOptions) { o.timestamp = ts } +}