1
0
mirror of https://github.com/mainflux/mainflux.git synced 2025-04-27 13:48:49 +08:00
Mainflux.mainflux/coap/observer.go
Dušan Borovčanin f10e49e6b5
MF-928 - Change CoAP lib (#1233)
* Switch CoAP lib

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Revert removed adapter code

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* WIP CoAP refactor

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Add auth key

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Fix observers map

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Fix reading message body

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Fix subtopic parsing

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Fix error handling

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Fix multi-protocol communication

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Separate client from observer

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Remove unused config

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Remove TCP option

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Inline error check

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Add logging client errors

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Replace RWMutex since we're not using RLock

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Inline error handling

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>

* Inline error handling

Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com>
2020-09-22 11:59:10 +02:00

48 lines
1.0 KiB
Go

// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package coap
import (
"github.com/gogo/protobuf/proto"
"github.com/mainflux/mainflux/pkg/messaging"
broker "github.com/nats-io/nats.go"
)
// Observer represents an internal observer used to handle CoAP observe messages.
type Observer interface {
Cancel() error
}
// NewObserver returns a new Observer instance.
func NewObserver(subject string, c Client, conn *broker.Conn) (Observer, error) {
sub, err := conn.Subscribe(subject, func(m *broker.Msg) {
var msg messaging.Message
if err := proto.Unmarshal(m.Data, &msg); err != nil {
return
}
// There is no error handling, but the client takes care to log the error.
c.SendMessage(msg)
})
if err != nil {
return nil, err
}
ret := &observer{
client: c,
sub: sub,
}
return ret, nil
}
type observer struct {
client Client
sub *broker.Subscription
}
func (o *observer) Cancel() error {
if err := o.sub.Unsubscribe(); err != nil && err != broker.ErrConnectionClosed {
return err
}
return o.client.Cancel()
}