diff --git a/api/_build/openapi.yaml b/api/_build/openapi.yaml index 424e893..e85ddf9 100644 --- a/api/_build/openapi.yaml +++ b/api/_build/openapi.yaml @@ -16,6 +16,11 @@ paths: schema: type: boolean default: true + - name: ext_search + in: query + schema: + type: boolean + default: false - name: word in: query schema: diff --git a/api/api.gen.go b/api/api.gen.go index 32ab199..c8fd9aa 100644 --- a/api/api.gen.go +++ b/api/api.gen.go @@ -178,6 +178,7 @@ type GetTitlesParams struct { Cursor *Cursor `form:"cursor,omitempty" json:"cursor,omitempty"` Sort *TitleSort `form:"sort,omitempty" json:"sort,omitempty"` SortForward *bool `form:"sort_forward,omitempty" json:"sort_forward,omitempty"` + ExtSearch *bool `form:"ext_search,omitempty" json:"ext_search,omitempty"` Word *string `form:"word,omitempty" json:"word,omitempty"` // Status List of title statuses to filter @@ -334,6 +335,14 @@ func (siw *ServerInterfaceWrapper) GetTitles(c *gin.Context) { return } + // ------------- Optional query parameter "ext_search" ------------- + + err = runtime.BindQueryParameter("form", true, false, "ext_search", c.Request.URL.Query(), ¶ms.ExtSearch) + if err != nil { + siw.ErrorHandler(c, fmt.Errorf("Invalid format for parameter ext_search: %w", err), http.StatusBadRequest) + return + } + // ------------- Optional query parameter "word" ------------- err = runtime.BindQueryParameter("form", true, false, "word", c.Request.URL.Query(), ¶ms.Word) diff --git a/api/paths/titles.yaml b/api/paths/titles.yaml index af2d17b..4288417 100644 --- a/api/paths/titles.yaml +++ b/api/paths/titles.yaml @@ -8,6 +8,11 @@ get: schema: type: boolean default: true + - in: query + name: ext_search + schema: + type: boolean + default: false - in: query name: word schema: @@ -21,7 +26,6 @@ get: description: List of title statuses to filter style: form explode: false - - in: query name: rating schema: diff --git a/go.mod b/go.mod index 7b7cc71..6662bc1 100644 --- a/go.mod +++ b/go.mod @@ -37,6 +37,7 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/quic-go/qpack v0.5.1 // indirect github.com/quic-go/quic-go v0.54.0 // indirect + github.com/rabbitmq/amqp091-go v1.10.0 // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/ugorji/go/codec v1.3.0 // indirect go.uber.org/mock v0.5.0 // indirect diff --git a/go.sum b/go.sum index cd197e6..520a22b 100644 --- a/go.sum +++ b/go.sum @@ -72,9 +72,13 @@ github.com/quic-go/qpack v0.5.1 h1:giqksBPnT/HDtZ6VhtFKgoLOWmlyo9Ei6u9PqzIMbhI= github.com/quic-go/qpack v0.5.1/go.mod h1:+PC4XFrEskIVkcLzpEkbLqq1uCoxPhQuvK5rH1ZgaEg= github.com/quic-go/quic-go v0.54.0 h1:6s1YB9QotYI6Ospeiguknbp2Znb/jZYjZLRXn9kMQBg= github.com/quic-go/quic-go v0.54.0/go.mod h1:e68ZEaCdyviluZmy44P6Iey98v/Wfz6HCjQEm+l8zTY= +github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= +github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad/go.mod h1:qLr4V1qq6nMqFKkMo8ZTx3f+BZEkzsRUY10Xsm2mwU0= +github.com/streadway/amqp v1.1.0 h1:py12iX8XSyI7aN/3dUT8DFIDJazNJsVJdxNVEpnQTZM= +github.com/streadway/amqp v1.1.0/go.mod h1:WYSrTEYHOXHd0nwFeUXAe2G2hRnQT+deZJJf88uS9Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= diff --git a/modules/backend/handlers/common.go b/modules/backend/handlers/common.go index f820db6..aece414 100644 --- a/modules/backend/handlers/common.go +++ b/modules/backend/handlers/common.go @@ -4,16 +4,29 @@ import ( "encoding/json" "fmt" oapi "nyanimedb/api" + "nyanimedb/modules/backend/rmq" sqlc "nyanimedb/sql" "strconv" ) -type Server struct { - db *sqlc.Queries +type Handler struct { + publisher *rmq.Publisher } -func NewServer(db *sqlc.Queries) Server { - return Server{db: db} +func New(publisher *rmq.Publisher) *Handler { + return &Handler{publisher: publisher} +} + +type Server struct { + db *sqlc.Queries + publisher *rmq.Publisher // ← добавьте это поле +} + +func NewServer(db *sqlc.Queries, publisher *rmq.Publisher) *Server { + return &Server{ + db: db, + publisher: publisher, + } } func sql2StorageType(s *sqlc.StorageTypeT) (*oapi.StorageType, error) { diff --git a/modules/backend/handlers/titles.go b/modules/backend/handlers/titles.go index 77af7e4..9f11016 100644 --- a/modules/backend/handlers/titles.go +++ b/modules/backend/handlers/titles.go @@ -5,8 +5,10 @@ import ( "encoding/json" "fmt" oapi "nyanimedb/api" + "nyanimedb/modules/backend/rmq" sqlc "nyanimedb/sql" "strconv" + "time" "github.com/jackc/pgx/v5" log "github.com/sirupsen/logrus" @@ -154,6 +156,29 @@ func (s Server) GetTitle(ctx context.Context, request oapi.GetTitleRequestObject } func (s Server) GetTitles(ctx context.Context, request oapi.GetTitlesRequestObject) (oapi.GetTitlesResponseObject, error) { + + if request.Params.ExtSearch != nil && *request.Params.ExtSearch { + // Публикуем событие — как и просили + event := rmq.RabbitRequest{ + Name: "Attack on titans", + // Status oapi.TitleStatus `json:"titlestatus,omitempty"` + // Rating float64 `json:"titleraring,omitempty"` + // Year int32 `json:"year,omitempty"` + // Season oapi.ReleaseSeason `json:"season,omitempty"` + Timestamp: time.Now(), + } + + // Контекст с таймаутом (не блокируем ответ) + publishCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + + if err := s.publisher.Publish(publishCtx, "events.user", event); err != nil { + log.Errorf("RMQ publish failed (non-critical): %v", err) + } else { + log.Infof("RMQ publish succeed %v", err) + } + } + opai_titles := make([]oapi.Title, 0) word := Word2Sqlc(request.Params.Word) diff --git a/modules/backend/main.go b/modules/backend/main.go index 8f58ffe..7b995a5 100644 --- a/modules/backend/main.go +++ b/modules/backend/main.go @@ -3,6 +3,7 @@ package main import ( "context" "fmt" + "net/http" sqlc "nyanimedb/sql" "os" "reflect" @@ -10,11 +11,14 @@ import ( oapi "nyanimedb/api" handlers "nyanimedb/modules/backend/handlers" + "nyanimedb/modules/backend/rmq" "github.com/gin-contrib/cors" "github.com/gin-gonic/gin" "github.com/jackc/pgx/v5/pgxpool" "github.com/pelletier/go-toml/v2" + "github.com/rabbitmq/amqp091-go" + log "github.com/sirupsen/logrus" ) var AppConfig Config @@ -43,7 +47,21 @@ func main() { queries := sqlc.New(pool) - server := handlers.NewServer(queries) + // === RabbitMQ setup === + rmqURL := os.Getenv("RABBITMQ_URL") + if rmqURL == "" { + rmqURL = "amqp://guest:guest@10.1.0.65:5672/" + } + + rmqConn, err := amqp091.Dial(rmqURL) + if err != nil { + log.Fatalf("Failed to connect to RabbitMQ: %v", err) + } + defer rmqConn.Close() + + publisher := rmq.NewPublisher(rmqConn) + + server := handlers.NewServer(queries, publisher) // r.LoadHTMLGlob("templates/*") r.Use(cors.New(cors.Config{ @@ -60,24 +78,15 @@ func main() { // сюда можно добавить middlewares, если нужно []oapi.StrictMiddlewareFunc{}, )) - // r.GET("/", func(c *gin.Context) { - // c.HTML(http.StatusOK, "index.html", gin.H{ - // "title": "Welcome Page", - // "message": "Hello, Gin with HTML templates!", - // }) - // }) - // r.GET("/api", func(c *gin.Context) { - // items := []Item{ - // {ID: 1, Title: "First Item", Description: "This is the description of the first item."}, - // {ID: 2, Title: "Second Item", Description: "This is the description of the second item."}, - // {ID: 3, Title: "Third Item", Description: "This is the description of the third item."}, - // } + // Внедряем publisher в сервер + server = handlers.NewServer(queries, publisher) - // c.JSON(http.StatusOK, items) - // }) - - r.Run(":8080") + // Запуск + log.Infof("Server starting on :8080") + if err := r.Run(":8080"); err != nil && err != http.ErrServerClosed { + log.Fatalf("server failed: %v", err) + } } func InitConfig() error { diff --git a/modules/backend/rmq/rabbit.go b/modules/backend/rmq/rabbit.go new file mode 100644 index 0000000..85df89b --- /dev/null +++ b/modules/backend/rmq/rabbit.go @@ -0,0 +1,166 @@ +package rmq + +import ( + "context" + "encoding/json" + "fmt" + oapi "nyanimedb/api" + "sync" + "time" + + amqp "github.com/rabbitmq/amqp091-go" +) + +type RabbitRequest struct { + Name string `json:"name"` + Status oapi.TitleStatus `json:"titlestatus,omitempty"` + Rating float64 `json:"titleraring,omitempty"` + Year int32 `json:"year,omitempty"` + Season oapi.ReleaseSeason `json:"season,omitempty"` + Timestamp time.Time `json:"timestamp"` +} + +// Publisher — потокобезопасный публикатор с пулом каналов. +type Publisher struct { + conn *amqp.Connection + pool *sync.Pool +} + +// NewPublisher создаёт новый Publisher. +// conn должен быть уже установленным и healthy. +// Рекомендуется передавать durable connection с reconnect-логикой. +func NewPublisher(conn *amqp.Connection) *Publisher { + return &Publisher{ + conn: conn, + pool: &sync.Pool{ + New: func() any { + ch, err := conn.Channel() + if err != nil { + // Паника уместна: невозможность открыть канал — критическая ошибка инициализации + panic(fmt.Errorf("rmqpool: failed to create channel: %w", err)) + } + return ch + }, + }, + } +} + +// Publish публикует сообщение в указанную очередь. +// Очередь объявляется как durable (если не существует). +// Поддерживает context для отмены/таймаута. +func (p *Publisher) Publish( + ctx context.Context, + queueName string, + payload RabbitRequest, + opts ...PublishOption, +) error { + // Применяем опции + options := &publishOptions{ + contentType: "application/json", + deliveryMode: amqp.Persistent, + timestamp: time.Now(), + } + for _, opt := range opts { + opt(options) + } + + // Сериализуем payload + body, err := json.Marshal(payload) + if err != nil { + return fmt.Errorf("rmqpool: failed to marshal payload: %w", err) + } + + // Берём канал из пула + ch := p.getChannel() + if ch == nil { + return fmt.Errorf("rmqpool: channel is nil (connection may be closed)") + } + defer p.returnChannel(ch) + + // Объявляем очередь (idempotent) + q, err := ch.QueueDeclare( + queueName, + true, // durable + false, // auto-delete + false, // exclusive + false, // no-wait + nil, // args + ) + if err != nil { + return fmt.Errorf("rmqpool: failed to declare queue %q: %w", queueName, err) + } + + // Подготавливаем сообщение + msg := amqp.Publishing{ + DeliveryMode: options.deliveryMode, + ContentType: options.contentType, + Timestamp: options.timestamp, + Body: body, + } + + // Публикуем с учётом контекста + done := make(chan error, 1) + go func() { + err := ch.Publish( + "", // exchange (default) + q.Name, // routing key + false, // mandatory + false, // immediate + msg, + ) + done <- err + }() + + select { + case err := <-done: + return err + case <-ctx.Done(): + return ctx.Err() + } +} + +func (p *Publisher) getChannel() *amqp.Channel { + raw := p.pool.Get() + if raw == nil { + ch, _ := p.conn.Channel() + return ch + } + ch := raw.(*amqp.Channel) + if ch.IsClosed() { // ← теперь есть! + ch.Close() // освободить ресурсы + ch, _ = p.conn.Channel() + } + return ch +} + +// returnChannel возвращает канал в пул, если он жив. +func (p *Publisher) returnChannel(ch *amqp.Channel) { + if ch != nil && !ch.IsClosed() { + p.pool.Put(ch) + } +} + +// PublishOption позволяет кастомизировать публикацию. +type PublishOption func(*publishOptions) + +type publishOptions struct { + contentType string + deliveryMode uint8 + timestamp time.Time +} + +// WithContentType устанавливает Content-Type (по умолчанию "application/json"). +func WithContentType(ct string) PublishOption { + return func(o *publishOptions) { o.contentType = ct } +} + +// WithTransient делает сообщение transient (не сохраняется на диск). +// По умолчанию — Persistent. +func WithTransient() PublishOption { + return func(o *publishOptions) { o.deliveryMode = amqp.Transient } +} + +// WithTimestamp устанавливает кастомную метку времени. +func WithTimestamp(ts time.Time) PublishOption { + return func(o *publishOptions) { o.timestamp = ts } +}