1
0
mirror of https://github.com/mainflux/mainflux.git synced 2025-04-26 13:48:53 +08:00
Mainflux.mainflux/coap/adapter.go
Manuel Imperiale d8dca28072
MF-1078 - Add timestamp to published messages and use it in Transformer (#1106)
* MF-1078 - Add reception time to published messages and use it if SenML time is missing

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Fix comments

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Add message timestamp

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Fix pub

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Mv timestamp to service layer

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Revert WS

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Fix coap

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Fix CI bot

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Rename Timestamp Created

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Mv Created field to message creation

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Mv Created field to message creation

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Use UnixNano

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Use google.protobuf.Timestamp

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Add protoc-gen-gogo and gogoproto to CI script

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Use TimestampProto to generate timestamp

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>
2020-04-13 15:05:05 +02:00

153 lines
3.4 KiB
Go

// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
// Package coap contains the domain concept definitions needed to support
// Mainflux coap adapter service functionality. All constant values are taken
// from RFC, and could be adjusted based on specific use case.
package coap
import (
"context"
"fmt"
"sync"
"time"
"github.com/gogo/protobuf/proto"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/broker"
"github.com/mainflux/mainflux/logger"
"github.com/nats-io/nats.go"
)
const (
chanID = "id"
keyHeader = "key"
// AckRandomFactor is default ACK coefficient.
AckRandomFactor = 1.5
// AckTimeout is the amount of time to wait for a response.
AckTimeout = 2000 * time.Millisecond
// MaxRetransmit is the maximum number of times a message will be retransmitted.
MaxRetransmit = 4
)
// Service specifies coap service API.
type Service interface {
// Publish Messssage
Publish(ctx context.Context, token string, msg broker.Message) error
// Subscribes to channel with specified id, subtopic and adds subscription to
// service map of subscriptions under given ID.
Subscribe(chanID, subtopic, obsID string, obs *Observer) error
// Unsubscribe method is used to stop observing resource.
Unsubscribe(obsID string)
}
var _ Service = (*adapterService)(nil)
type adapterService struct {
auth mainflux.ThingsServiceClient
broker broker.Nats
log logger.Logger
obs map[string]*Observer
obsLock sync.Mutex
}
// New instantiates the CoAP adapter implementation.
func New(broker broker.Nats, log logger.Logger, auth mainflux.ThingsServiceClient, responses <-chan string) Service {
as := &adapterService{
auth: auth,
broker: broker,
log: log,
obs: make(map[string]*Observer),
obsLock: sync.Mutex{},
}
go as.listenResponses(responses)
return as
}
func (svc *adapterService) get(obsID string) (*Observer, bool) {
svc.obsLock.Lock()
defer svc.obsLock.Unlock()
val, ok := svc.obs[obsID]
return val, ok
}
func (svc *adapterService) put(obsID string, o *Observer) {
svc.obsLock.Lock()
defer svc.obsLock.Unlock()
val, ok := svc.obs[obsID]
if ok {
close(val.Cancel)
}
svc.obs[obsID] = o
}
func (svc *adapterService) remove(obsID string) {
svc.obsLock.Lock()
defer svc.obsLock.Unlock()
val, ok := svc.obs[obsID]
if ok {
close(val.Cancel)
delete(svc.obs, obsID)
}
}
// ListenResponses method handles ACK messages received from client.
func (svc *adapterService) listenResponses(responses <-chan string) {
for {
id := <-responses
val, ok := svc.get(id)
if ok {
val.StoreExpired(false)
}
}
}
func (svc *adapterService) Publish(ctx context.Context, token string, msg broker.Message) error {
return svc.broker.Publish(ctx, token, msg)
}
func (svc *adapterService) Subscribe(chanID, subtopic, obsID string, o *Observer) error {
subject := chanID
if subtopic != "" {
subject = fmt.Sprintf("%s.%s", chanID, subtopic)
}
sub, err := svc.broker.Subscribe(subject, func(msg *nats.Msg) {
if msg == nil {
return
}
var m broker.Message
if err := proto.Unmarshal(msg.Data, &m); err != nil {
return
}
o.Messages <- m
})
if err != nil {
return err
}
go func() {
<-o.Cancel
if err := sub.Unsubscribe(); err != nil {
svc.log.Error(fmt.Sprintf("Failed to unsubscribe from %s.%s", chanID, subtopic))
}
}()
// Put method removes Observer if already exists.
svc.put(obsID, o)
return nil
}
func (svc *adapterService) Unsubscribe(obsID string) {
svc.remove(obsID)
}