Compare commits
4 commits
f71c1f4f08
...
1308e265a6
| Author | SHA1 | Date | |
|---|---|---|---|
| 1308e265a6 | |||
| 77a63a1c74 | |||
| c6cebb0ed2 | |||
| 1756d61da4 |
9 changed files with 258 additions and 22 deletions
|
|
@ -16,6 +16,11 @@ paths:
|
||||||
schema:
|
schema:
|
||||||
type: boolean
|
type: boolean
|
||||||
default: true
|
default: true
|
||||||
|
- name: ext_search
|
||||||
|
in: query
|
||||||
|
schema:
|
||||||
|
type: boolean
|
||||||
|
default: false
|
||||||
- name: word
|
- name: word
|
||||||
in: query
|
in: query
|
||||||
schema:
|
schema:
|
||||||
|
|
|
||||||
|
|
@ -178,6 +178,7 @@ type GetTitlesParams struct {
|
||||||
Cursor *Cursor `form:"cursor,omitempty" json:"cursor,omitempty"`
|
Cursor *Cursor `form:"cursor,omitempty" json:"cursor,omitempty"`
|
||||||
Sort *TitleSort `form:"sort,omitempty" json:"sort,omitempty"`
|
Sort *TitleSort `form:"sort,omitempty" json:"sort,omitempty"`
|
||||||
SortForward *bool `form:"sort_forward,omitempty" json:"sort_forward,omitempty"`
|
SortForward *bool `form:"sort_forward,omitempty" json:"sort_forward,omitempty"`
|
||||||
|
ExtSearch *bool `form:"ext_search,omitempty" json:"ext_search,omitempty"`
|
||||||
Word *string `form:"word,omitempty" json:"word,omitempty"`
|
Word *string `form:"word,omitempty" json:"word,omitempty"`
|
||||||
|
|
||||||
// Status List of title statuses to filter
|
// Status List of title statuses to filter
|
||||||
|
|
@ -334,6 +335,14 @@ func (siw *ServerInterfaceWrapper) GetTitles(c *gin.Context) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ------------- Optional query parameter "ext_search" -------------
|
||||||
|
|
||||||
|
err = runtime.BindQueryParameter("form", true, false, "ext_search", c.Request.URL.Query(), ¶ms.ExtSearch)
|
||||||
|
if err != nil {
|
||||||
|
siw.ErrorHandler(c, fmt.Errorf("Invalid format for parameter ext_search: %w", err), http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// ------------- Optional query parameter "word" -------------
|
// ------------- Optional query parameter "word" -------------
|
||||||
|
|
||||||
err = runtime.BindQueryParameter("form", true, false, "word", c.Request.URL.Query(), ¶ms.Word)
|
err = runtime.BindQueryParameter("form", true, false, "word", c.Request.URL.Query(), ¶ms.Word)
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,11 @@ get:
|
||||||
schema:
|
schema:
|
||||||
type: boolean
|
type: boolean
|
||||||
default: true
|
default: true
|
||||||
|
- in: query
|
||||||
|
name: ext_search
|
||||||
|
schema:
|
||||||
|
type: boolean
|
||||||
|
default: false
|
||||||
- in: query
|
- in: query
|
||||||
name: word
|
name: word
|
||||||
schema:
|
schema:
|
||||||
|
|
@ -21,7 +26,6 @@ get:
|
||||||
description: List of title statuses to filter
|
description: List of title statuses to filter
|
||||||
style: form
|
style: form
|
||||||
explode: false
|
explode: false
|
||||||
|
|
||||||
- in: query
|
- in: query
|
||||||
name: rating
|
name: rating
|
||||||
schema:
|
schema:
|
||||||
|
|
|
||||||
1
go.mod
1
go.mod
|
|
@ -37,6 +37,7 @@ require (
|
||||||
github.com/modern-go/reflect2 v1.0.2 // indirect
|
github.com/modern-go/reflect2 v1.0.2 // indirect
|
||||||
github.com/quic-go/qpack v0.5.1 // indirect
|
github.com/quic-go/qpack v0.5.1 // indirect
|
||||||
github.com/quic-go/quic-go v0.54.0 // indirect
|
github.com/quic-go/quic-go v0.54.0 // indirect
|
||||||
|
github.com/rabbitmq/amqp091-go v1.10.0 // indirect
|
||||||
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
|
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
|
||||||
github.com/ugorji/go/codec v1.3.0 // indirect
|
github.com/ugorji/go/codec v1.3.0 // indirect
|
||||||
go.uber.org/mock v0.5.0 // indirect
|
go.uber.org/mock v0.5.0 // indirect
|
||||||
|
|
|
||||||
4
go.sum
4
go.sum
|
|
@ -72,9 +72,13 @@ github.com/quic-go/qpack v0.5.1 h1:giqksBPnT/HDtZ6VhtFKgoLOWmlyo9Ei6u9PqzIMbhI=
|
||||||
github.com/quic-go/qpack v0.5.1/go.mod h1:+PC4XFrEskIVkcLzpEkbLqq1uCoxPhQuvK5rH1ZgaEg=
|
github.com/quic-go/qpack v0.5.1/go.mod h1:+PC4XFrEskIVkcLzpEkbLqq1uCoxPhQuvK5rH1ZgaEg=
|
||||||
github.com/quic-go/quic-go v0.54.0 h1:6s1YB9QotYI6Ospeiguknbp2Znb/jZYjZLRXn9kMQBg=
|
github.com/quic-go/quic-go v0.54.0 h1:6s1YB9QotYI6Ospeiguknbp2Znb/jZYjZLRXn9kMQBg=
|
||||||
github.com/quic-go/quic-go v0.54.0/go.mod h1:e68ZEaCdyviluZmy44P6Iey98v/Wfz6HCjQEm+l8zTY=
|
github.com/quic-go/quic-go v0.54.0/go.mod h1:e68ZEaCdyviluZmy44P6Iey98v/Wfz6HCjQEm+l8zTY=
|
||||||
|
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
|
||||||
|
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
|
||||||
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
|
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
|
||||||
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
|
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
|
||||||
github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad/go.mod h1:qLr4V1qq6nMqFKkMo8ZTx3f+BZEkzsRUY10Xsm2mwU0=
|
github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad/go.mod h1:qLr4V1qq6nMqFKkMo8ZTx3f+BZEkzsRUY10Xsm2mwU0=
|
||||||
|
github.com/streadway/amqp v1.1.0 h1:py12iX8XSyI7aN/3dUT8DFIDJazNJsVJdxNVEpnQTZM=
|
||||||
|
github.com/streadway/amqp v1.1.0/go.mod h1:WYSrTEYHOXHd0nwFeUXAe2G2hRnQT+deZJJf88uS9Bg=
|
||||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||||
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
|
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
|
||||||
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
|
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
|
||||||
|
|
|
||||||
|
|
@ -4,16 +4,29 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
oapi "nyanimedb/api"
|
oapi "nyanimedb/api"
|
||||||
|
"nyanimedb/modules/backend/rmq"
|
||||||
sqlc "nyanimedb/sql"
|
sqlc "nyanimedb/sql"
|
||||||
"strconv"
|
"strconv"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Server struct {
|
type Handler struct {
|
||||||
db *sqlc.Queries
|
publisher *rmq.Publisher
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewServer(db *sqlc.Queries) Server {
|
func New(publisher *rmq.Publisher) *Handler {
|
||||||
return Server{db: db}
|
return &Handler{publisher: publisher}
|
||||||
|
}
|
||||||
|
|
||||||
|
type Server struct {
|
||||||
|
db *sqlc.Queries
|
||||||
|
publisher *rmq.Publisher // ← добавьте это поле
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewServer(db *sqlc.Queries, publisher *rmq.Publisher) *Server {
|
||||||
|
return &Server{
|
||||||
|
db: db,
|
||||||
|
publisher: publisher,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func sql2StorageType(s *sqlc.StorageTypeT) (*oapi.StorageType, error) {
|
func sql2StorageType(s *sqlc.StorageTypeT) (*oapi.StorageType, error) {
|
||||||
|
|
|
||||||
|
|
@ -5,8 +5,10 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
oapi "nyanimedb/api"
|
oapi "nyanimedb/api"
|
||||||
|
"nyanimedb/modules/backend/rmq"
|
||||||
sqlc "nyanimedb/sql"
|
sqlc "nyanimedb/sql"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/jackc/pgx/v5"
|
"github.com/jackc/pgx/v5"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
@ -154,6 +156,29 @@ func (s Server) GetTitle(ctx context.Context, request oapi.GetTitleRequestObject
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s Server) GetTitles(ctx context.Context, request oapi.GetTitlesRequestObject) (oapi.GetTitlesResponseObject, error) {
|
func (s Server) GetTitles(ctx context.Context, request oapi.GetTitlesRequestObject) (oapi.GetTitlesResponseObject, error) {
|
||||||
|
|
||||||
|
if request.Params.ExtSearch != nil && *request.Params.ExtSearch {
|
||||||
|
// Публикуем событие — как и просили
|
||||||
|
event := rmq.RabbitRequest{
|
||||||
|
Name: "Attack on titans",
|
||||||
|
// Status oapi.TitleStatus `json:"titlestatus,omitempty"`
|
||||||
|
// Rating float64 `json:"titleraring,omitempty"`
|
||||||
|
// Year int32 `json:"year,omitempty"`
|
||||||
|
// Season oapi.ReleaseSeason `json:"season,omitempty"`
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Контекст с таймаутом (не блокируем ответ)
|
||||||
|
publishCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
if err := s.publisher.Publish(publishCtx, "events.user", event); err != nil {
|
||||||
|
log.Errorf("RMQ publish failed (non-critical): %v", err)
|
||||||
|
} else {
|
||||||
|
log.Infof("RMQ publish succeed %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
opai_titles := make([]oapi.Title, 0)
|
opai_titles := make([]oapi.Title, 0)
|
||||||
|
|
||||||
word := Word2Sqlc(request.Params.Word)
|
word := Word2Sqlc(request.Params.Word)
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,7 @@ package main
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"net/http"
|
||||||
sqlc "nyanimedb/sql"
|
sqlc "nyanimedb/sql"
|
||||||
"os"
|
"os"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
|
@ -10,11 +11,14 @@ import (
|
||||||
|
|
||||||
oapi "nyanimedb/api"
|
oapi "nyanimedb/api"
|
||||||
handlers "nyanimedb/modules/backend/handlers"
|
handlers "nyanimedb/modules/backend/handlers"
|
||||||
|
"nyanimedb/modules/backend/rmq"
|
||||||
|
|
||||||
"github.com/gin-contrib/cors"
|
"github.com/gin-contrib/cors"
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
"github.com/jackc/pgx/v5/pgxpool"
|
"github.com/jackc/pgx/v5/pgxpool"
|
||||||
"github.com/pelletier/go-toml/v2"
|
"github.com/pelletier/go-toml/v2"
|
||||||
|
"github.com/rabbitmq/amqp091-go"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
var AppConfig Config
|
var AppConfig Config
|
||||||
|
|
@ -43,7 +47,21 @@ func main() {
|
||||||
|
|
||||||
queries := sqlc.New(pool)
|
queries := sqlc.New(pool)
|
||||||
|
|
||||||
server := handlers.NewServer(queries)
|
// === RabbitMQ setup ===
|
||||||
|
rmqURL := os.Getenv("RABBITMQ_URL")
|
||||||
|
if rmqURL == "" {
|
||||||
|
rmqURL = "amqp://guest:guest@10.1.0.65:5672/"
|
||||||
|
}
|
||||||
|
|
||||||
|
rmqConn, err := amqp091.Dial(rmqURL)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to connect to RabbitMQ: %v", err)
|
||||||
|
}
|
||||||
|
defer rmqConn.Close()
|
||||||
|
|
||||||
|
publisher := rmq.NewPublisher(rmqConn)
|
||||||
|
|
||||||
|
server := handlers.NewServer(queries, publisher)
|
||||||
// r.LoadHTMLGlob("templates/*")
|
// r.LoadHTMLGlob("templates/*")
|
||||||
|
|
||||||
r.Use(cors.New(cors.Config{
|
r.Use(cors.New(cors.Config{
|
||||||
|
|
@ -60,24 +78,15 @@ func main() {
|
||||||
// сюда можно добавить middlewares, если нужно
|
// сюда можно добавить middlewares, если нужно
|
||||||
[]oapi.StrictMiddlewareFunc{},
|
[]oapi.StrictMiddlewareFunc{},
|
||||||
))
|
))
|
||||||
// r.GET("/", func(c *gin.Context) {
|
|
||||||
// c.HTML(http.StatusOK, "index.html", gin.H{
|
|
||||||
// "title": "Welcome Page",
|
|
||||||
// "message": "Hello, Gin with HTML templates!",
|
|
||||||
// })
|
|
||||||
// })
|
|
||||||
|
|
||||||
// r.GET("/api", func(c *gin.Context) {
|
// Внедряем publisher в сервер
|
||||||
// items := []Item{
|
server = handlers.NewServer(queries, publisher)
|
||||||
// {ID: 1, Title: "First Item", Description: "This is the description of the first item."},
|
|
||||||
// {ID: 2, Title: "Second Item", Description: "This is the description of the second item."},
|
|
||||||
// {ID: 3, Title: "Third Item", Description: "This is the description of the third item."},
|
|
||||||
// }
|
|
||||||
|
|
||||||
// c.JSON(http.StatusOK, items)
|
// Запуск
|
||||||
// })
|
log.Infof("Server starting on :8080")
|
||||||
|
if err := r.Run(":8080"); err != nil && err != http.ErrServerClosed {
|
||||||
r.Run(":8080")
|
log.Fatalf("server failed: %v", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func InitConfig() error {
|
func InitConfig() error {
|
||||||
|
|
|
||||||
166
modules/backend/rmq/rabbit.go
Normal file
166
modules/backend/rmq/rabbit.go
Normal file
|
|
@ -0,0 +1,166 @@
|
||||||
|
package rmq
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
oapi "nyanimedb/api"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
amqp "github.com/rabbitmq/amqp091-go"
|
||||||
|
)
|
||||||
|
|
||||||
|
type RabbitRequest struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
Status oapi.TitleStatus `json:"titlestatus,omitempty"`
|
||||||
|
Rating float64 `json:"titleraring,omitempty"`
|
||||||
|
Year int32 `json:"year,omitempty"`
|
||||||
|
Season oapi.ReleaseSeason `json:"season,omitempty"`
|
||||||
|
Timestamp time.Time `json:"timestamp"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Publisher — потокобезопасный публикатор с пулом каналов.
|
||||||
|
type Publisher struct {
|
||||||
|
conn *amqp.Connection
|
||||||
|
pool *sync.Pool
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewPublisher создаёт новый Publisher.
|
||||||
|
// conn должен быть уже установленным и healthy.
|
||||||
|
// Рекомендуется передавать durable connection с reconnect-логикой.
|
||||||
|
func NewPublisher(conn *amqp.Connection) *Publisher {
|
||||||
|
return &Publisher{
|
||||||
|
conn: conn,
|
||||||
|
pool: &sync.Pool{
|
||||||
|
New: func() any {
|
||||||
|
ch, err := conn.Channel()
|
||||||
|
if err != nil {
|
||||||
|
// Паника уместна: невозможность открыть канал — критическая ошибка инициализации
|
||||||
|
panic(fmt.Errorf("rmqpool: failed to create channel: %w", err))
|
||||||
|
}
|
||||||
|
return ch
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Publish публикует сообщение в указанную очередь.
|
||||||
|
// Очередь объявляется как durable (если не существует).
|
||||||
|
// Поддерживает context для отмены/таймаута.
|
||||||
|
func (p *Publisher) Publish(
|
||||||
|
ctx context.Context,
|
||||||
|
queueName string,
|
||||||
|
payload RabbitRequest,
|
||||||
|
opts ...PublishOption,
|
||||||
|
) error {
|
||||||
|
// Применяем опции
|
||||||
|
options := &publishOptions{
|
||||||
|
contentType: "application/json",
|
||||||
|
deliveryMode: amqp.Persistent,
|
||||||
|
timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
for _, opt := range opts {
|
||||||
|
opt(options)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Сериализуем payload
|
||||||
|
body, err := json.Marshal(payload)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("rmqpool: failed to marshal payload: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Берём канал из пула
|
||||||
|
ch := p.getChannel()
|
||||||
|
if ch == nil {
|
||||||
|
return fmt.Errorf("rmqpool: channel is nil (connection may be closed)")
|
||||||
|
}
|
||||||
|
defer p.returnChannel(ch)
|
||||||
|
|
||||||
|
// Объявляем очередь (idempotent)
|
||||||
|
q, err := ch.QueueDeclare(
|
||||||
|
queueName,
|
||||||
|
true, // durable
|
||||||
|
false, // auto-delete
|
||||||
|
false, // exclusive
|
||||||
|
false, // no-wait
|
||||||
|
nil, // args
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("rmqpool: failed to declare queue %q: %w", queueName, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Подготавливаем сообщение
|
||||||
|
msg := amqp.Publishing{
|
||||||
|
DeliveryMode: options.deliveryMode,
|
||||||
|
ContentType: options.contentType,
|
||||||
|
Timestamp: options.timestamp,
|
||||||
|
Body: body,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Публикуем с учётом контекста
|
||||||
|
done := make(chan error, 1)
|
||||||
|
go func() {
|
||||||
|
err := ch.Publish(
|
||||||
|
"", // exchange (default)
|
||||||
|
q.Name, // routing key
|
||||||
|
false, // mandatory
|
||||||
|
false, // immediate
|
||||||
|
msg,
|
||||||
|
)
|
||||||
|
done <- err
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case err := <-done:
|
||||||
|
return err
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Publisher) getChannel() *amqp.Channel {
|
||||||
|
raw := p.pool.Get()
|
||||||
|
if raw == nil {
|
||||||
|
ch, _ := p.conn.Channel()
|
||||||
|
return ch
|
||||||
|
}
|
||||||
|
ch := raw.(*amqp.Channel)
|
||||||
|
if ch.IsClosed() { // ← теперь есть!
|
||||||
|
ch.Close() // освободить ресурсы
|
||||||
|
ch, _ = p.conn.Channel()
|
||||||
|
}
|
||||||
|
return ch
|
||||||
|
}
|
||||||
|
|
||||||
|
// returnChannel возвращает канал в пул, если он жив.
|
||||||
|
func (p *Publisher) returnChannel(ch *amqp.Channel) {
|
||||||
|
if ch != nil && !ch.IsClosed() {
|
||||||
|
p.pool.Put(ch)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// PublishOption позволяет кастомизировать публикацию.
|
||||||
|
type PublishOption func(*publishOptions)
|
||||||
|
|
||||||
|
type publishOptions struct {
|
||||||
|
contentType string
|
||||||
|
deliveryMode uint8
|
||||||
|
timestamp time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithContentType устанавливает Content-Type (по умолчанию "application/json").
|
||||||
|
func WithContentType(ct string) PublishOption {
|
||||||
|
return func(o *publishOptions) { o.contentType = ct }
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithTransient делает сообщение transient (не сохраняется на диск).
|
||||||
|
// По умолчанию — Persistent.
|
||||||
|
func WithTransient() PublishOption {
|
||||||
|
return func(o *publishOptions) { o.deliveryMode = amqp.Transient }
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithTimestamp устанавливает кастомную метку времени.
|
||||||
|
func WithTimestamp(ts time.Time) PublishOption {
|
||||||
|
return func(o *publishOptions) { o.timestamp = ts }
|
||||||
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue