1
0
mirror of https://github.com/mainflux/mainflux.git synced 2025-04-29 13:49:28 +08:00
Mainflux.mainflux/coap/observer.go
b1ackd0t cc5d5195ab
NOISSUE - Add nats wrapper for COAP (#1569)
* Add nats wrapper for COAP

Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com>

* Pass pubsub as argument

Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com>

* Defer close connection

Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com>

* Defer close connection

Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com>

* Rename endpoint to topic

Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com>
2022-03-02 23:37:54 +01:00

40 lines
853 B
Go

// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package coap
import (
"github.com/mainflux/mainflux/pkg/messaging/nats"
broker "github.com/nats-io/nats.go"
)
// Observer represents an internal observer used to handle CoAP observe messages.
type Observer interface {
Cancel(topic string) error
}
// NewObserver returns a new Observer instance.
func NewObserver(subject string, c Client, pubsub nats.PubSub) (Observer, error) {
err := pubsub.Subscribe(subject, c.SendMessage)
if err != nil {
return nil, err
}
ret := &observer{
client: c,
pubsub: pubsub,
}
return ret, nil
}
type observer struct {
client Client
pubsub nats.PubSub
}
func (o *observer) Cancel(topic string) error {
if err := o.pubsub.Unsubscribe(topic); err != nil && err != broker.ErrConnectionClosed {
return err
}
return o.client.Cancel()
}