1
0
mirror of https://github.com/mainflux/mainflux.git synced 2025-04-26 13:48:53 +08:00

Initialize properly NATS in CoAP adapter

Signed-off-by: Drasko DRASKOVIC <drasko.draskovic@gmail.com>
This commit is contained in:
Drasko DRASKOVIC 2017-10-01 18:33:55 +02:00
parent 9cec19896e
commit dd874008b2
4 changed files with 37 additions and 27 deletions

View File

@ -33,9 +33,8 @@ func main() {
nc := connectToNats(cfg, logger)
defer nc.Close()
nats.StoreConnection(nc)
ca := adapter.NewCoAPAdapter(logger)
repo := nats.NewMessageRepository(nc)
ca := adapter.NewCoAPAdapter(logger, repo)
nc.Subscribe("msg.http", ca.MsgHandler)
nc.Subscribe("msg.mqtt", ca.MsgHandler)

View File

@ -20,12 +20,14 @@ type Observer struct {
type CoAPAdapter struct {
obsMap map[string][]Observer
logger *zap.Logger
repo writer.MessageRepository
}
// NewCoAPAdapter creates new CoAP adapter struct
func NewCoAPAdapter(logger *zap.Logger) *CoAPAdapter {
func NewCoAPAdapter(logger *zap.Logger, repo writer.MessageRepository) *CoAPAdapter {
ca := &CoAPAdapter{
logger: logger,
repo: repo,
}
ca.obsMap = make(map[string][]Observer)

View File

@ -7,14 +7,13 @@ import (
mux "github.com/dereulenspiegel/coap-mux"
coap "github.com/dustin/go-coap"
"github.com/mainflux/mainflux/coap/nats"
"github.com/mainflux/mainflux/writer"
)
// sendMessage sends the message to NATS
func (ca *CoAPAdapter) sendMessage(l *net.UDPConn, a *net.UDPAddr, m *coap.Message) *coap.Message {
log.Printf("Got message in sendMessage: path=%q: %#v from %v", m.Path(), m, a)
var res *coap.Message = nil
var res *coap.Message
if m.IsConfirmable() {
res = &coap.Message{
Type: coap.Acknowledgement,
@ -43,15 +42,15 @@ func (ca *CoAPAdapter) sendMessage(l *net.UDPConn, a *net.UDPAddr, m *coap.Messa
n.Protocol = "coap"
n.Payload = m.Payload
if err := nats.Send(n); err != nil {
if err := ca.repo.Save(n); err != nil {
if m.IsConfirmable() {
res.Payload = []byte("{\"error\": \"cannot publish\"}")
res.Code = coap.InternalServerError
}
return res
}
if m.IsConfirmable() {
res.Payload = []byte("{\"res\": \"sent\"}")
res.Code = coap.Changed
}
return res
}
@ -86,7 +85,7 @@ func (ca *CoAPAdapter) deregisterObserver(o Observer, cid string) {
// observeMessage adds client to the observers map
func (ca *CoAPAdapter) observeMessage(l *net.UDPConn, a *net.UDPAddr, m *coap.Message) *coap.Message {
log.Printf("Got message in observeMessage: path=%q: %#v from %v", m.Path(), m, a)
var res *coap.Message = nil
var res *coap.Message
if m.IsConfirmable() {
res = &coap.Message{
@ -109,30 +108,33 @@ func (ca *CoAPAdapter) observeMessage(l *net.UDPConn, a *net.UDPAddr, m *coap.Me
message: m,
}
if m.Option(coap.Observe) != nil {
if value, ok := m.Option(coap.Observe).(uint32); ok {
if value == 0 {
ca.registerObserver(o, cid)
} else {
ca.deregisterObserver(o, cid)
}
if m.Option(coap.Observe) == nil {
if m.IsConfirmable() {
res.Code = coap.BadRequest
}
return res
}
if value, ok := m.Option(coap.Observe).(uint32); ok {
if value == 0 {
ca.registerObserver(o, cid)
} else {
// Interop - old deregister was when there was no Observe option provided
ca.deregisterObserver(o, cid)
}
} else {
// Interop - old deregister was when there was no Observe option provided
ca.deregisterObserver(o, cid)
}
if m.IsConfirmable() {
res.Payload = []byte("{\"res\": \"observing\"}")
res.Code = coap.Valid
}
return res
}
// obsTransmit transmits the message to observing clients
func (ca *CoAPAdapter) obsTransmit(n writer.RawMessage) {
for _, v := range ca.obsMap[n.Channel] {
msg := *(v.message)
msg.Payload = n.Payload

View File

@ -10,18 +10,25 @@ import (
const topic string = "msg.coap"
// Stored NATS connection
var snc *broker.Conn
var _ writer.MessageRepository = (*natsRepository)(nil)
func StoreConnection(nc *broker.Conn) {
snc = nc
type natsRepository struct {
nc *broker.Conn
}
func Send(msg writer.RawMessage) error {
// NewMessageRepository instantiates NATS message repository. Note that the
// repository will not truly persist messages, but instead they will be
// published to the topic and made available for persisting by all interested
// parties, i.e. the message-writer service.
func NewMessageRepository(nc *broker.Conn) writer.MessageRepository {
return &natsRepository{nc}
}
func (repo *natsRepository) Save(msg writer.RawMessage) error {
b, err := json.Marshal(msg)
if err != nil {
return err
}
return snc.Publish(topic, b)
return repo.nc.Publish(topic, b)
}