1
0
mirror of https://github.com/eventials/goevents.git synced 2025-04-24 13:48:53 +08:00

Add Docs.

This commit is contained in:
Alexandre Vicenzi 2016-12-01 16:17:55 -02:00
parent 78809de308
commit 180f26f054
7 changed files with 128 additions and 4 deletions

View File

@ -7,6 +7,5 @@ RUN tar -C /usr/local/bin -xzvf dockerize-linux-amd64-v0.2.0.tar.gz
RUN mkdir -p /go/src/github.com/eventials/goevents
WORKDIR /go/src/github.com/eventials/goevents
RUN go get github.com/eventials/golog
RUN go get github.com/streadway/amqp
RUN go get github.com/stretchr/testify

View File

@ -1,2 +1,61 @@
# goevents
Go messaging library based on AMQP
# goevents [![Build Status](https://travis-ci.org/eventials/goevents.svg?branch=master)](https://travis-ci.org/eventials/goevents) [![GoDoc](https://godoc.org/github.com/eventials/goevents?status.svg)](http://godoc.org/github.com/eventials/goevents) [![Go Report Card](https://goreportcard.com/badge/github.com/eventials/goevents)](https://goreportcard.com/report/github.com/eventials/goevents)
Go messaging library
## About
`goevents` allows to dispatch events between applications.
An application produces events based on actions.
Another application consume these events and maybe create new events.
*Scenario:* If an application produces an events "payment-received", another application may want to delivery the product to the buyer.
## Supported Transport
- AMQP
## How to use
**The consumer**
```go
conn, err := NewConnection("amqp://guest:guest@127.0.0.1:5672/", "events-queue", "events-exchange")
if err != nil {
panic(err)
}
c, err := NewConsumer(conn, false)
if err != nil {
panic(err)
}
c.Subscribe("my_action", func(body []byte) bool {
fmt.Println(body)
return true
})
```
**The producer**
```go
conn, err := NewConnection("amqp://guest:guest@127.0.0.1:5672/", "events-queue", "events-exchange")
if err != nil {
panic(err)
}
p, err := NewProducer(conn)
if err != nil {
panic(err)
}
err = p.Publish("my_action", []byte("message"))
if err != nil {
panic(err)
}
```

View File

@ -15,6 +15,7 @@ type Connection struct {
queueName string
}
// NewConnection returns an AMQP Connection.
func NewConnection(url, exchange, queue string) (messaging.Connection, error) {
conn, err := amqplib.Dial(url)
@ -64,14 +65,17 @@ func NewConnection(url, exchange, queue string) (messaging.Connection, error) {
}, nil
}
// Consumer returns an AMQP Consumer.
func (c *Connection) Consumer(autoAck bool) (messaging.Consumer, error) {
return NewConsumer(c, autoAck)
}
// Producer returns an AMQP Producer.
func (c *Connection) Producer() (messaging.Producer, error) {
return NewProducer(c)
}
// Close closes the AMQP connection.
func (c *Connection) Close() {
c.channel.Close()
c.connection.Close()

View File

@ -21,6 +21,7 @@ type Consumer struct {
handlers []handler
}
// NewConsumer returns a new AMQP Consumer.
func NewConsumer(c messaging.Connection, autoAck bool) (messaging.Consumer, error) {
amqpConn := c.(*Connection)
@ -67,6 +68,7 @@ func (c *Consumer) getHandler(action string) (messaging.EventHandler, bool) {
return nil, false
}
// Subscribe allow to subscribe an action handler.
func (c *Consumer) Subscribe(action string, handlerFn messaging.EventHandler) error {
// TODO: Replace # pattern too.
pattern := strings.Replace(action, "*", "(.*)", 0)
@ -103,6 +105,7 @@ func (c *Consumer) Subscribe(action string, handlerFn messaging.EventHandler) er
return nil
}
// Unsubscribe allows to unsubscribe an action handler.
func (c *Consumer) Unsubscribe(action string) error {
err := c.conn.channel.QueueUnbind(
c.conn.queueName, // queue name
@ -131,6 +134,7 @@ func (c *Consumer) Unsubscribe(action string) error {
return nil
}
// Listen start to listen for new messages.
func (c *Consumer) Listen() error {
msgs, err := c.conn.channel.Consume(
c.conn.queueName, // queue
@ -155,6 +159,7 @@ func (c *Consumer) Listen() error {
return nil
}
// ListenForever start to listen for new messages and locks the current thread.
func (c *Consumer) ListenForever() error {
err := c.Listen()

View File

@ -11,7 +11,7 @@ func TestPublishConsume(t *testing.T) {
func1 := make(chan bool)
func2 := make(chan bool)
conn, err := NewConnection("amqp://guest:guest@broker:5672/", "event_PublishConsumeer", "webhooks")
conn, err := NewConnection("amqp://guest:guest@broker:5672/", "events-queue", "events-exchange")
assert.Nil(t, err)

View File

@ -12,6 +12,7 @@ type Producer struct {
conn *Connection
}
// NewProducer returns a new AMQP Producer.
func NewProducer(c messaging.Connection) (messaging.Producer, error) {
amqpConn := c.(*Connection)
@ -20,6 +21,7 @@ func NewProducer(c messaging.Connection) (messaging.Producer, error) {
}, nil
}
// Publish publishes an action.
func (p *Producer) Publish(action string, data []byte) error {
msg := amqplib.Publishing{
DeliveryMode: amqplib.Persistent,

55
doc.go Normal file
View File

@ -0,0 +1,55 @@
// Package events implements a messaging library
//
// About
//
// goevents allows to dispatch events between applications.
//
// An application produces events based on actions.
// Another application consume these events and maybe create new events.
//
// Supported Transport
//
// AMQP
//
// How to use
//
// The consumer
//
// conn, err := NewConnection("amqp://guest:guest@127.0.0.1:5672/", "events-queue", "events-exchange")
//
// if err != nil {
// panic(err)
// }
//
// c, err := NewConsumer(conn, false)
//
// if err != nil {
// panic(err)
// }
//
// c.Subscribe("my_action", func(body []byte) bool {
// fmt.Println(body)
// return true
// })
//
// The producer
//
// conn, err := NewConnection("amqp://guest:guest@127.0.0.1:5672/", "events-queue", "events-exchange")
//
// if err != nil {
// panic(err)
// }
//
// p, err := NewProducer(conn)
//
// if err != nil {
// panic(err)
// }
//
// err = p.Publish("my_action", []byte("message"))
//
// if err != nil {
// panic(err)
// }
//
package events