mirror of
https://github.com/eventials/goevents.git
synced 2025-04-24 13:48:53 +08:00
more options to declare queue
This commit is contained in:
parent
f4de19816d
commit
06a8443d6f
@ -52,6 +52,9 @@ type consumer struct {
|
||||
type ConsumerConfig struct {
|
||||
ConsumeRetryInterval time.Duration
|
||||
PrefetchCount int
|
||||
DurableQueue bool
|
||||
AutoDelete bool
|
||||
PrefixName string
|
||||
}
|
||||
|
||||
// NewConsumer returns a new AMQP Consumer.
|
||||
@ -60,6 +63,8 @@ func NewConsumer(c messaging.Connection, autoAck bool, exchange, queue string) (
|
||||
return NewConsumerConfig(c, autoAck, exchange, queue, ConsumerConfig{
|
||||
ConsumeRetryInterval: 2 * time.Second,
|
||||
PrefetchCount: 0,
|
||||
DurableQueue: true,
|
||||
AutoDelete: false,
|
||||
})
|
||||
}
|
||||
|
||||
@ -102,6 +107,10 @@ func (c *consumer) Close() {
|
||||
c.wg.Wait()
|
||||
}
|
||||
|
||||
func (c *consumer) uniqueNameWithPrefix() string {
|
||||
return fmt.Sprint("%s%d", c.config.PrefixName, time.Now().UnixNano())
|
||||
}
|
||||
|
||||
func (c *consumer) setupTopology() error {
|
||||
c.m.Lock()
|
||||
defer c.m.Unlock()
|
||||
@ -138,13 +147,17 @@ func (c *consumer) setupTopology() error {
|
||||
return err
|
||||
}
|
||||
|
||||
if c.queueName == "" && c.config.PrefixName != "" {
|
||||
c.queueName = c.uniqueNameWithPrefix()
|
||||
}
|
||||
|
||||
q, err := c.channel.QueueDeclare(
|
||||
c.queueName, // name
|
||||
true, // durable
|
||||
false, // auto-delete
|
||||
false, // exclusive
|
||||
false, // no-wait
|
||||
nil, // arguments
|
||||
c.queueName, // name
|
||||
c.config.DurableQueue, // durable
|
||||
c.config.AutoDelete, // auto-delete
|
||||
false, // exclusive
|
||||
false, // no-wait
|
||||
nil, // arguments
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
|
@ -626,3 +626,48 @@ func TestConsumePrefetch(t *testing.T) {
|
||||
wait <- true
|
||||
}
|
||||
}
|
||||
|
||||
func TestBlankQueueWithPrefix(t *testing.T) {
|
||||
myActionTimesCalled := 0
|
||||
|
||||
conn, err := NewConnection("amqp://guest:guest@broker:5672/")
|
||||
|
||||
assert.Nil(t, err)
|
||||
|
||||
defer conn.Close()
|
||||
|
||||
c, err := NewConsumerConfig(conn, false, "webhooks", "", ConsumerConfig{
|
||||
ConsumeRetryInterval: 2 * time.Second,
|
||||
PrefetchCount: 1,
|
||||
PrefixName: "test@",
|
||||
})
|
||||
|
||||
if assert.Nil(t, err) {
|
||||
defer c.Close()
|
||||
|
||||
// Clean all messages if any...
|
||||
c.channel.QueuePurge(c.queueName, false)
|
||||
|
||||
c.Subscribe("test1", func(e messaging.Event) error {
|
||||
defer func() {
|
||||
myActionTimesCalled++
|
||||
}()
|
||||
return nil
|
||||
}, nil)
|
||||
|
||||
go c.Consume()
|
||||
|
||||
p, err := NewProducer(conn, "webhooks")
|
||||
|
||||
assert.Nil(t, err)
|
||||
|
||||
p.Publish("test1", []byte(""))
|
||||
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
select {
|
||||
case <-time.After(1 * time.Second):
|
||||
assert.Equal(t, 1, myActionTimesCalled, "Consumer got wrong quantity of messages.")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user