mirror of
https://github.com/eventials/goevents.git
synced 2025-04-24 13:48:53 +08:00
Merge pull request #35 from skrater/master
methods to bind actions after consume is started
This commit is contained in:
commit
4001180607
@ -1,8 +1,8 @@
|
||||
language: go
|
||||
go:
|
||||
- 1.8
|
||||
- 1.9
|
||||
- tip
|
||||
- "1.9"
|
||||
- "1.10"
|
||||
- "tip"
|
||||
install:
|
||||
- go get github.com/streadway/amqp
|
||||
- go get github.com/stretchr/testify
|
||||
|
@ -34,7 +34,7 @@ type handler struct {
|
||||
type consumer struct {
|
||||
config ConsumerConfig
|
||||
|
||||
m sync.Mutex
|
||||
m sync.RWMutex
|
||||
wg sync.WaitGroup
|
||||
|
||||
conn *connection
|
||||
@ -282,8 +282,8 @@ func (c *consumer) retryMessage(msg amqplib.Delivery, h *handler, retryCount int
|
||||
}
|
||||
|
||||
func (c *consumer) getHandler(msg amqplib.Delivery) (*handler, bool) {
|
||||
c.m.Lock()
|
||||
defer c.m.Unlock()
|
||||
c.m.RLock()
|
||||
defer c.m.RUnlock()
|
||||
|
||||
action := getAction(msg)
|
||||
|
||||
@ -314,6 +314,9 @@ func (c *consumer) Subscribe(action string, handlerFn messaging.EventHandler, op
|
||||
}
|
||||
}
|
||||
|
||||
c.m.Lock()
|
||||
defer c.m.Unlock()
|
||||
|
||||
c.handlers = append(c.handlers, handler{
|
||||
action: action,
|
||||
fn: handlerFn,
|
||||
@ -328,27 +331,11 @@ func (c *consumer) Subscribe(action string, handlerFn messaging.EventHandler, op
|
||||
|
||||
// Unsubscribe allows to unsubscribe an action handler.
|
||||
func (c *consumer) Unsubscribe(action string) error {
|
||||
channel, err := c.conn.openChannel()
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer channel.Close()
|
||||
|
||||
err = channel.QueueUnbind(
|
||||
c.queueName, // queue name
|
||||
action, // routing key
|
||||
c.exchangeName, // exchange
|
||||
nil, // arguments
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
idx := -1
|
||||
|
||||
c.m.Lock()
|
||||
defer c.m.Unlock()
|
||||
|
||||
for i, h := range c.handlers {
|
||||
if h.action == action {
|
||||
idx = i
|
||||
@ -363,6 +350,51 @@ func (c *consumer) Unsubscribe(action string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *consumer) BindActions(actions ...string) error {
|
||||
channel, err := c.conn.openChannel()
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer channel.Close()
|
||||
|
||||
for _, action := range actions {
|
||||
err := c.bindActionToQueue(channel, c.queueName, action)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *consumer) UnbindActions(actions ...string) error {
|
||||
channel, err := c.conn.openChannel()
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer channel.Close()
|
||||
|
||||
for _, action := range actions {
|
||||
err := channel.QueueUnbind(
|
||||
c.queueName, // queue name
|
||||
action, // routing key
|
||||
c.exchangeName, // exchange
|
||||
nil, // arguments
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *consumer) bindActionToQueue(channel *amqplib.Channel, queueName string, action string) error {
|
||||
return channel.QueueBind(
|
||||
queueName, // queue name
|
||||
|
@ -86,6 +86,94 @@ func TestSubscribeActions(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestSubscribeActionsByBindAfterConsume(t *testing.T) {
|
||||
func1 := make(chan bool)
|
||||
func2 := make(chan bool)
|
||||
|
||||
c, err := NewConsumer(conn, false, "webhooks", "TestSubscribeActionsByBindAfterConsume")
|
||||
|
||||
if assert.Nil(t, err) {
|
||||
defer c.Close()
|
||||
|
||||
clearQueue(conn, c.queueName)
|
||||
|
||||
go c.Consume()
|
||||
|
||||
// take a time to setup topology
|
||||
time.Sleep(SleepSetupTopology)
|
||||
|
||||
c.Subscribe("my_action_1", func(e messaging.Event) error {
|
||||
func1 <- true
|
||||
return nil
|
||||
}, nil)
|
||||
|
||||
c.Subscribe("my_action_2", func(e messaging.Event) error {
|
||||
func2 <- true
|
||||
return nil
|
||||
}, nil)
|
||||
|
||||
assert.NoError(t, c.BindActions("my_action_1", "my_action_2"))
|
||||
|
||||
p, err := NewProducer(conn, "webhooks")
|
||||
|
||||
assert.Nil(t, err)
|
||||
|
||||
p.Publish("my_action_1", []byte(""))
|
||||
|
||||
select {
|
||||
case <-func1:
|
||||
case <-func2:
|
||||
assert.Fail(t, "called wrong action")
|
||||
case <-time.After(3 * time.Second):
|
||||
assert.Fail(t, "timed out")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestSubscribeActionsUnbindAfterConsume(t *testing.T) {
|
||||
func1 := make(chan bool)
|
||||
func2 := make(chan bool)
|
||||
|
||||
c, err := NewConsumer(conn, false, "webhooks", "TestSubscribeActionsUnbindAfterConsume")
|
||||
|
||||
if assert.Nil(t, err) {
|
||||
defer c.Close()
|
||||
|
||||
clearQueue(conn, c.queueName)
|
||||
|
||||
c.Subscribe("my_action_1", func(e messaging.Event) error {
|
||||
func1 <- true
|
||||
return nil
|
||||
}, nil)
|
||||
|
||||
c.Subscribe("my_action_2", func(e messaging.Event) error {
|
||||
func2 <- true
|
||||
return nil
|
||||
}, nil)
|
||||
|
||||
go c.Consume()
|
||||
|
||||
// take a time to setup topology
|
||||
time.Sleep(SleepSetupTopology)
|
||||
|
||||
assert.NoError(t, c.UnbindActions("my_action_2"))
|
||||
|
||||
p, err := NewProducer(conn, "webhooks")
|
||||
|
||||
assert.Nil(t, err)
|
||||
|
||||
p.Publish("my_action_2", []byte(""))
|
||||
|
||||
select {
|
||||
case <-func1:
|
||||
assert.Fail(t, "called wrong action")
|
||||
case <-func2:
|
||||
assert.Fail(t, "called wrong action")
|
||||
case <-time.After(3 * time.Second):
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestSubscribeWildcardActions(t *testing.T) {
|
||||
func1 := make(chan bool)
|
||||
func2 := make(chan bool)
|
||||
|
@ -25,6 +25,8 @@ type EventHandler func(Event) error
|
||||
type Consumer interface {
|
||||
Subscribe(action string, handler EventHandler, options *SubscribeOptions) error
|
||||
Unsubscribe(action string) error
|
||||
BindActions(actions ...string) error
|
||||
UnbindActions(actions ...string) error
|
||||
Consume()
|
||||
Close()
|
||||
}
|
||||
|
@ -30,3 +30,13 @@ func (c *Consumer) Consume() {
|
||||
func (c *Consumer) Close() {
|
||||
c.Called()
|
||||
}
|
||||
|
||||
func (c *Consumer) BindActions(actions ...string) error {
|
||||
args := c.Called(actions)
|
||||
return args.Error(0)
|
||||
}
|
||||
|
||||
func (c *Consumer) UnbindActions(actions ...string) error {
|
||||
args := c.Called(actions)
|
||||
return args.Error(0)
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user