diff --git a/cmd/coap/main.go b/cmd/coap/main.go index 160b7881..d4a3e789 100644 --- a/cmd/coap/main.go +++ b/cmd/coap/main.go @@ -33,9 +33,8 @@ func main() { nc := connectToNats(cfg, logger) defer nc.Close() - nats.StoreConnection(nc) - - ca := adapter.NewCoAPAdapter(logger) + repo := nats.NewMessageRepository(nc) + ca := adapter.NewCoAPAdapter(logger, repo) nc.Subscribe("msg.http", ca.MsgHandler) nc.Subscribe("msg.mqtt", ca.MsgHandler) diff --git a/coap/adapter.go b/coap/adapter.go index e0a911c3..7f0b8a49 100644 --- a/coap/adapter.go +++ b/coap/adapter.go @@ -20,12 +20,14 @@ type Observer struct { type CoAPAdapter struct { obsMap map[string][]Observer logger *zap.Logger + repo writer.MessageRepository } // NewCoAPAdapter creates new CoAP adapter struct -func NewCoAPAdapter(logger *zap.Logger) *CoAPAdapter { +func NewCoAPAdapter(logger *zap.Logger, repo writer.MessageRepository) *CoAPAdapter { ca := &CoAPAdapter{ logger: logger, + repo: repo, } ca.obsMap = make(map[string][]Observer) diff --git a/coap/messages.go b/coap/messages.go index 91abfbc7..add648ee 100644 --- a/coap/messages.go +++ b/coap/messages.go @@ -7,14 +7,13 @@ import ( mux "github.com/dereulenspiegel/coap-mux" coap "github.com/dustin/go-coap" - "github.com/mainflux/mainflux/coap/nats" "github.com/mainflux/mainflux/writer" ) // sendMessage sends the message to NATS func (ca *CoAPAdapter) sendMessage(l *net.UDPConn, a *net.UDPAddr, m *coap.Message) *coap.Message { log.Printf("Got message in sendMessage: path=%q: %#v from %v", m.Path(), m, a) - var res *coap.Message = nil + var res *coap.Message if m.IsConfirmable() { res = &coap.Message{ Type: coap.Acknowledgement, @@ -43,15 +42,15 @@ func (ca *CoAPAdapter) sendMessage(l *net.UDPConn, a *net.UDPAddr, m *coap.Messa n.Protocol = "coap" n.Payload = m.Payload - if err := nats.Send(n); err != nil { + if err := ca.repo.Save(n); err != nil { if m.IsConfirmable() { - res.Payload = []byte("{\"error\": \"cannot publish\"}") + res.Code = coap.InternalServerError } return res } if m.IsConfirmable() { - res.Payload = []byte("{\"res\": \"sent\"}") + res.Code = coap.Changed } return res } @@ -86,7 +85,7 @@ func (ca *CoAPAdapter) deregisterObserver(o Observer, cid string) { // observeMessage adds client to the observers map func (ca *CoAPAdapter) observeMessage(l *net.UDPConn, a *net.UDPAddr, m *coap.Message) *coap.Message { log.Printf("Got message in observeMessage: path=%q: %#v from %v", m.Path(), m, a) - var res *coap.Message = nil + var res *coap.Message if m.IsConfirmable() { res = &coap.Message{ @@ -109,30 +108,33 @@ func (ca *CoAPAdapter) observeMessage(l *net.UDPConn, a *net.UDPAddr, m *coap.Me message: m, } - if m.Option(coap.Observe) != nil { - if value, ok := m.Option(coap.Observe).(uint32); ok { - if value == 0 { - ca.registerObserver(o, cid) - } else { - ca.deregisterObserver(o, cid) - } + if m.Option(coap.Observe) == nil { + if m.IsConfirmable() { + res.Code = coap.BadRequest + } + return res + } + + if value, ok := m.Option(coap.Observe).(uint32); ok { + if value == 0 { + ca.registerObserver(o, cid) } else { - // Interop - old deregister was when there was no Observe option provided ca.deregisterObserver(o, cid) } + } else { + // Interop - old deregister was when there was no Observe option provided + ca.deregisterObserver(o, cid) } if m.IsConfirmable() { - res.Payload = []byte("{\"res\": \"observing\"}") + res.Code = coap.Valid } return res } // obsTransmit transmits the message to observing clients func (ca *CoAPAdapter) obsTransmit(n writer.RawMessage) { - for _, v := range ca.obsMap[n.Channel] { - msg := *(v.message) msg.Payload = n.Payload diff --git a/coap/nats/messages.go b/coap/nats/messages.go index 42dc62c3..eedb7353 100644 --- a/coap/nats/messages.go +++ b/coap/nats/messages.go @@ -10,18 +10,25 @@ import ( const topic string = "msg.coap" -// Stored NATS connection -var snc *broker.Conn +var _ writer.MessageRepository = (*natsRepository)(nil) -func StoreConnection(nc *broker.Conn) { - snc = nc +type natsRepository struct { + nc *broker.Conn } -func Send(msg writer.RawMessage) error { +// NewMessageRepository instantiates NATS message repository. Note that the +// repository will not truly persist messages, but instead they will be +// published to the topic and made available for persisting by all interested +// parties, i.e. the message-writer service. +func NewMessageRepository(nc *broker.Conn) writer.MessageRepository { + return &natsRepository{nc} +} + +func (repo *natsRepository) Save(msg writer.RawMessage) error { b, err := json.Marshal(msg) if err != nil { return err } - return snc.Publish(topic, b) + return repo.nc.Publish(topic, b) }