1
0
mirror of https://github.com/mainflux/mainflux.git synced 2025-05-01 13:48:56 +08:00
Mainflux.mainflux/coap/adapter.go
Aleksandar Novaković b9bf63e377 MF-475 - Replace increment ID with UUID (#490)
* Update increment ID to UUID in things service

Update increment ID to UUID for things and channels in things
service and proto files. Also, update ID type from uint to string.

Signed-off-by: Aleksandar Novakovic <aleksandar.novakovic@mainflux.com>

* Update increment ID to UUID in http adapter

Signed-off-by: Aleksandar Novakovic <aleksandar.novakovic@mainflux.com>

* Update increment ID to UUID in ws adapter

Signed-off-by: Aleksandar Novakovic <aleksandar.novakovic@mainflux.com>

* Update increment ID to UUID in CoAP adapter

Signed-off-by: Aleksandar Novakovic <aleksandar.novakovic@mainflux.com>

* Update increment ID to UUID in normalizer service

Signed-off-by: Aleksandar Novakovic <aleksandar.novakovic@mainflux.com>

* Update increment ID to UUID in writer services

Signed-off-by: Aleksandar Novakovic <aleksandar.novakovic@mainflux.com>

* Update increment ID to UUID in reader services

Signed-off-by: Aleksandar Novakovic <aleksandar.novakovic@mainflux.com>

* Update increment ID to UUID in SDK

Update increment ID to UUID in SDK. Update id type to string.
Update tests.

Signed-off-by: Aleksandar Novakovic <aleksandar.novakovic@mainflux.com>

* Update increment ID to UUID in mqtt adapter

Signed-off-by: Aleksandar Novakovic <aleksandar.novakovic@mainflux.com>

* Remove unnecessary case from influxdb reader

Signed-off-by: Aleksandar Novakovic <aleksandar.novakovic@mainflux.com>

* Update tests in order to increase code coverage

Signed-off-by: Aleksandar Novakovic <aleksandar.novakovic@mainflux.com>

* Update lora adapter to use string ID instead of unsigned int

Signed-off-by: Aleksandar Novakovic <aleksandar.novakovic@mainflux.com>
2018-12-05 13:09:25 +01:00

151 lines
3.4 KiB
Go

//
// Copyright (c) 2018
// Mainflux
//
// SPDX-License-Identifier: Apache-2.0
//
// Package coap contains the domain concept definitions needed to support
// Mainflux coap adapter service functionality. All constant values are taken
// from RFC, and could be adjusted based on specific use case.
package coap
import (
"errors"
"sync"
"time"
"github.com/mainflux/mainflux"
broker "github.com/nats-io/go-nats"
)
const (
chanID = "id"
keyHeader = "key"
// AckRandomFactor is default ACK coefficient.
AckRandomFactor = 1.5
// AckTimeout is the amount of time to wait for a response.
AckTimeout = 2000 * time.Millisecond
// MaxRetransmit is the maximum number of times a message will be retransmitted.
MaxRetransmit = 4
)
var (
errBadOption = errors.New("bad option")
// ErrFailedMessagePublish indicates that message publishing failed.
ErrFailedMessagePublish = errors.New("failed to publish message")
// ErrFailedSubscription indicates that client couldn't subscribe to specified channel.
ErrFailedSubscription = errors.New("failed to subscribe to a channel")
// ErrFailedConnection indicates that service couldn't connect to message broker.
ErrFailedConnection = errors.New("failed to connect to message broker")
)
// Broker represents NATS broker instance.
type Broker interface {
mainflux.MessagePublisher
// Subscribes to channel with specified id and adds subscription to
// service map of subscriptions under given ID.
Subscribe(string, string, *Observer) error
}
// Service specifies coap service API.
type Service interface {
Broker
// Unsubscribe method is used to stop observing resource.
Unsubscribe(string)
}
var _ Service = (*adapterService)(nil)
type adapterService struct {
pubsub Broker
obs map[string]*Observer
obsLock sync.Mutex
}
// New instantiates the CoAP adapter implementation.
func New(pubsub Broker, responses <-chan string) Service {
as := &adapterService{
pubsub: pubsub,
obs: make(map[string]*Observer),
obsLock: sync.Mutex{},
}
go as.listenResponses(responses)
return as
}
func (svc *adapterService) get(obsID string) (*Observer, bool) {
svc.obsLock.Lock()
defer svc.obsLock.Unlock()
val, ok := svc.obs[obsID]
return val, ok
}
func (svc *adapterService) put(obsID string, o *Observer) {
svc.obsLock.Lock()
defer svc.obsLock.Unlock()
val, ok := svc.obs[obsID]
if ok {
close(val.Cancel)
}
svc.obs[obsID] = o
}
func (svc *adapterService) remove(obsID string) {
svc.obsLock.Lock()
defer svc.obsLock.Unlock()
val, ok := svc.obs[obsID]
if ok {
close(val.Cancel)
delete(svc.obs, obsID)
}
}
// ListenResponses method handles ACK messages received from client.
func (svc *adapterService) listenResponses(responses <-chan string) {
for {
id := <-responses
val, ok := svc.get(id)
if ok {
val.StoreExpired(false)
}
}
}
func (svc *adapterService) Publish(msg mainflux.RawMessage) error {
if err := svc.pubsub.Publish(msg); err != nil {
switch err {
case broker.ErrConnectionClosed, broker.ErrInvalidConnection:
return ErrFailedConnection
default:
return ErrFailedMessagePublish
}
}
return nil
}
func (svc *adapterService) Subscribe(chanID, obsID string, o *Observer) error {
if err := svc.pubsub.Subscribe(chanID, obsID, o); err != nil {
return ErrFailedSubscription
}
// Put method removes Observer if already exists.
svc.put(obsID, o)
return nil
}
func (svc *adapterService) Unsubscribe(obsID string) {
svc.remove(obsID)
}