diff --git a/Dockerfile b/Dockerfile index dcab353..6776af0 100644 --- a/Dockerfile +++ b/Dockerfile @@ -7,6 +7,5 @@ RUN tar -C /usr/local/bin -xzvf dockerize-linux-amd64-v0.2.0.tar.gz RUN mkdir -p /go/src/github.com/eventials/goevents WORKDIR /go/src/github.com/eventials/goevents -RUN go get github.com/eventials/golog RUN go get github.com/streadway/amqp RUN go get github.com/stretchr/testify diff --git a/README.md b/README.md index a8f8908..4dd0551 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,61 @@ -# goevents -Go messaging library based on AMQP +# goevents [![Build Status](https://travis-ci.org/eventials/goevents.svg?branch=master)](https://travis-ci.org/eventials/goevents) [![GoDoc](https://godoc.org/github.com/eventials/goevents?status.svg)](http://godoc.org/github.com/eventials/goevents) [![Go Report Card](https://goreportcard.com/badge/github.com/eventials/goevents)](https://goreportcard.com/report/github.com/eventials/goevents) + +Go messaging library + +## About + +`goevents` allows to dispatch events between applications. + +An application produces events based on actions. +Another application consume these events and maybe create new events. + +*Scenario:* If an application produces an events "payment-received", another application may want to delivery the product to the buyer. + +## Supported Transport + +- AMQP + +## How to use + +**The consumer** + +```go +conn, err := NewConnection("amqp://guest:guest@127.0.0.1:5672/", "events-queue", "events-exchange") + +if err != nil { + panic(err) +} + +c, err := NewConsumer(conn, false) + +if err != nil { + panic(err) +} + +c.Subscribe("my_action", func(body []byte) bool { + fmt.Println(body) + return true +}) +``` + +**The producer** + +```go +conn, err := NewConnection("amqp://guest:guest@127.0.0.1:5672/", "events-queue", "events-exchange") + +if err != nil { + panic(err) +} + +p, err := NewProducer(conn) + +if err != nil { + panic(err) +} + +err = p.Publish("my_action", []byte("message")) + +if err != nil { + panic(err) +} +``` diff --git a/amqp/connection.go b/amqp/connection.go index 5a69e57..4cf7562 100644 --- a/amqp/connection.go +++ b/amqp/connection.go @@ -15,6 +15,7 @@ type Connection struct { queueName string } +// NewConnection returns an AMQP Connection. func NewConnection(url, exchange, queue string) (messaging.Connection, error) { conn, err := amqplib.Dial(url) @@ -64,14 +65,17 @@ func NewConnection(url, exchange, queue string) (messaging.Connection, error) { }, nil } +// Consumer returns an AMQP Consumer. func (c *Connection) Consumer(autoAck bool) (messaging.Consumer, error) { return NewConsumer(c, autoAck) } +// Producer returns an AMQP Producer. func (c *Connection) Producer() (messaging.Producer, error) { return NewProducer(c) } +// Close closes the AMQP connection. func (c *Connection) Close() { c.channel.Close() c.connection.Close() diff --git a/amqp/consumer.go b/amqp/consumer.go index 3b41ba7..162116a 100644 --- a/amqp/consumer.go +++ b/amqp/consumer.go @@ -21,6 +21,7 @@ type Consumer struct { handlers []handler } +// NewConsumer returns a new AMQP Consumer. func NewConsumer(c messaging.Connection, autoAck bool) (messaging.Consumer, error) { amqpConn := c.(*Connection) @@ -67,6 +68,7 @@ func (c *Consumer) getHandler(action string) (messaging.EventHandler, bool) { return nil, false } +// Subscribe allow to subscribe an action handler. func (c *Consumer) Subscribe(action string, handlerFn messaging.EventHandler) error { // TODO: Replace # pattern too. pattern := strings.Replace(action, "*", "(.*)", 0) @@ -103,6 +105,7 @@ func (c *Consumer) Subscribe(action string, handlerFn messaging.EventHandler) er return nil } +// Unsubscribe allows to unsubscribe an action handler. func (c *Consumer) Unsubscribe(action string) error { err := c.conn.channel.QueueUnbind( c.conn.queueName, // queue name @@ -131,6 +134,7 @@ func (c *Consumer) Unsubscribe(action string) error { return nil } +// Listen start to listen for new messages. func (c *Consumer) Listen() error { msgs, err := c.conn.channel.Consume( c.conn.queueName, // queue @@ -155,6 +159,7 @@ func (c *Consumer) Listen() error { return nil } +// ListenForever start to listen for new messages and locks the current thread. func (c *Consumer) ListenForever() error { err := c.Listen() diff --git a/amqp/integration_test.go b/amqp/integration_test.go index 13f117f..e6e5a61 100644 --- a/amqp/integration_test.go +++ b/amqp/integration_test.go @@ -11,7 +11,7 @@ func TestPublishConsume(t *testing.T) { func1 := make(chan bool) func2 := make(chan bool) - conn, err := NewConnection("amqp://guest:guest@broker:5672/", "event_PublishConsumeer", "webhooks") + conn, err := NewConnection("amqp://guest:guest@broker:5672/", "events-queue", "events-exchange") assert.Nil(t, err) diff --git a/amqp/producer.go b/amqp/producer.go index 9e666ed..3c1819c 100644 --- a/amqp/producer.go +++ b/amqp/producer.go @@ -12,6 +12,7 @@ type Producer struct { conn *Connection } +// NewProducer returns a new AMQP Producer. func NewProducer(c messaging.Connection) (messaging.Producer, error) { amqpConn := c.(*Connection) @@ -20,6 +21,7 @@ func NewProducer(c messaging.Connection) (messaging.Producer, error) { }, nil } +// Publish publishes an action. func (p *Producer) Publish(action string, data []byte) error { msg := amqplib.Publishing{ DeliveryMode: amqplib.Persistent, diff --git a/doc.go b/doc.go new file mode 100644 index 0000000..6076f2a --- /dev/null +++ b/doc.go @@ -0,0 +1,55 @@ +// Package events implements a messaging library +// +// About +// +// goevents allows to dispatch events between applications. +// +// An application produces events based on actions. +// Another application consume these events and maybe create new events. +// +// Supported Transport +// +// AMQP +// +// How to use +// +// The consumer +// +// conn, err := NewConnection("amqp://guest:guest@127.0.0.1:5672/", "events-queue", "events-exchange") +// +// if err != nil { +// panic(err) +// } +// +// c, err := NewConsumer(conn, false) +// +// if err != nil { +// panic(err) +// } +// +// c.Subscribe("my_action", func(body []byte) bool { +// fmt.Println(body) +// return true +// }) +// +// The producer +// +// conn, err := NewConnection("amqp://guest:guest@127.0.0.1:5672/", "events-queue", "events-exchange") +// +// if err != nil { +// panic(err) +// } +// +// p, err := NewProducer(conn) +// +// if err != nil { +// panic(err) +// } +// +// err = p.Publish("my_action", []byte("message")) +// +// if err != nil { +// panic(err) +// } +// +package events