1
0
mirror of https://github.com/mainflux/mainflux.git synced 2025-04-28 13:48:49 +08:00
Mainflux.mainflux/coap/api/transport.go

297 lines
7.4 KiB
Go
Raw Normal View History

MF-374 - Bring back CoAP adapter (#413) * Bring old CoAP code back Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Fix channel ID formatting due to type change Uncomment error handling for authorization. Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Update CoAP adapter docs Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> * Add copyright headers Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com> * Remove redundant type declaration Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com> * Add CoAP adapter to the list of services Add CoAp adapter in Makefile services list and fix corresponding documentation. Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com> * Refactor CoAP code Merge multipe `const` block int single and declare consts before vars. Un-export notFound handler since there is no need to export it. Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com> * Update http version endpoint This separates CoAP and HTTP APIs. Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com> * Refactor CoAP POST method handling This PR is a part of CoAP adapter refactoring that will simplify adapter implementation. Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com> * Refactor CoAP adapter Change CoAP message handling to simplify adapter implementation. Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com> * Add backoff timeout for server ping to client Update CoAP adapter to provide subset of necessary features from protocol specification. Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com> * Fix leaking locked goroutine In case of the stopped ticker, its channel is NOT closed, so pinging might be left stuck waiting for the stopped ticker to send a notification. Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com> * Format code Use more meaningful name for Handlers map. Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com> * Use and stop ticker from the same goroutine Stop handler Ticker from ping goroutine rather than the cancel goroutine. Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com> * Check if subscription already exists in put method Fix potential leak of handlers providing check inside of put method. Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com> * Use MessageID as Observe option Since MessageID satisfies observe option behaviour, use Message ID instead of local timestamp. Remove Thicker from handler and use it on transport layer. Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com> * Use name Observer insted of Handler Name `Observer` is used in protocol specification, so this naming makes code more self-documenting. Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com> * Add CoAP adapter to docker-compose.yml Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com> * Add copyright headers Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com> * Remove unused constants Fix service name in startup log message. Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com> * Add metrics endpoint Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com> * Refactor code Config fields from main.go should not be exported; minor style changes. Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com> * Update authorization URI-Query option Use `authorization` value in URI-Query option instead of `key`. This mimics Authorization header in some other protocols (e.g. HTTP). Please note that this value can be replaced with simple `auth` to save space, due to constrained URI-Query option size. Signed-off-by: Dusan Borovcanin <dusan.borovcanin@mainflux.com>
2018-10-31 18:53:25 +01:00
//
// Copyright (c) 2018
// Mainflux
//
// SPDX-License-Identifier: Apache-2.0
//
package api
import (
"bytes"
"context"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"net"
"net/http"
"strconv"
"time"
"github.com/go-zoo/bone"
"github.com/mainflux/mainflux/coap"
log "github.com/mainflux/mainflux/logger"
"github.com/prometheus/client_golang/prometheus/promhttp"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
mux "github.com/dereulenspiegel/coap-mux"
gocoap "github.com/dustin/go-coap"
"github.com/mainflux/mainflux"
)
const protocol = "coap"
var (
errBadRequest = errors.New("bad request")
errBadOption = errors.New("bad option")
auth mainflux.ThingsServiceClient
logger log.Logger
)
type handler func(conn *net.UDPConn, addr *net.UDPAddr, msg *gocoap.Message) *gocoap.Message
func notFoundHandler(l *net.UDPConn, a *net.UDPAddr, m *gocoap.Message) *gocoap.Message {
if m.IsConfirmable() {
return &gocoap.Message{
Type: gocoap.Acknowledgement,
Code: gocoap.NotFound,
}
}
return nil
}
//MakeHTTPHandler creates handler for version endpoint.
func MakeHTTPHandler() http.Handler {
b := bone.New()
b.GetFunc("/version", mainflux.Version(protocol))
b.Handle("/metrics", promhttp.Handler())
return b
}
// MakeCOAPHandler creates handler for CoAP messages.
func MakeCOAPHandler(svc coap.Service, tc mainflux.ThingsServiceClient, l log.Logger, responses chan<- string) gocoap.Handler {
auth = tc
logger = l
r := mux.NewRouter()
r.Handle("/channels/{id}/messages", gocoap.FuncHandler(receive(svc))).Methods(gocoap.POST)
r.Handle("/channels/{id}/messages", gocoap.FuncHandler(observe(svc, responses)))
r.NotFoundHandler = gocoap.FuncHandler(notFoundHandler)
return r
}
func authorize(msg *gocoap.Message, res *gocoap.Message, cid uint64) (uint64, error) {
// Device Key is passed as Uri-Query parameter, which option ID is 15 (0xf).
key, err := authKey(msg.Option(gocoap.URIQuery))
if err != nil {
switch err {
case errBadOption:
res.Code = gocoap.BadOption
case errBadRequest:
res.Code = gocoap.BadRequest
}
return 0, err
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
id, err := auth.CanAccess(ctx, &mainflux.AccessReq{Token: key, ChanID: cid})
if err != nil {
e, ok := status.FromError(err)
if ok {
switch e.Code() {
case codes.PermissionDenied:
res.Code = gocoap.Forbidden
default:
res.Code = gocoap.ServiceUnavailable
}
return 0, err
}
res.Code = gocoap.InternalServerError
}
return id.GetValue(), nil
}
func receive(svc coap.Service) handler {
return func(conn *net.UDPConn, addr *net.UDPAddr, msg *gocoap.Message) *gocoap.Message {
// By default message is NonConfirmable, so
// NonConfirmable response is sent back.
res := &gocoap.Message{
Type: gocoap.NonConfirmable,
// According to https://tools.ietf.org/html/rfc7252#page-47: If the POST
// succeeds but does not result in a new resource being created on the
// server, the response SHOULD have a 2.04 (Changed) Response Code.
Code: gocoap.Changed,
MessageID: msg.MessageID,
Token: msg.Token,
Payload: []byte{},
}
if msg.IsConfirmable() {
res.Type = gocoap.Acknowledgement
res.SetOption(gocoap.ContentFormat, gocoap.AppJSON)
if len(msg.Payload) == 0 {
res.Code = gocoap.BadRequest
return res
}
}
channelID := mux.Var(msg, "id")
cid, err := strconv.ParseUint(channelID, 10, 64)
if err != nil {
res.Code = gocoap.NotFound
return res
}
publisher, err := authorize(msg, res, cid)
if err != nil {
res.Code = gocoap.Forbidden
return res
}
rawMsg := mainflux.RawMessage{
Channel: cid,
Publisher: publisher,
Protocol: protocol,
Payload: msg.Payload,
}
if err := svc.Publish(rawMsg); err != nil {
res.Code = gocoap.InternalServerError
}
return res
}
}
func observe(svc coap.Service, responses chan<- string) handler {
return func(conn *net.UDPConn, addr *net.UDPAddr, msg *gocoap.Message) *gocoap.Message {
res := &gocoap.Message{
Type: gocoap.Acknowledgement,
Code: gocoap.Content,
MessageID: msg.MessageID,
Token: msg.Token,
Payload: []byte{},
}
res.SetOption(gocoap.ContentFormat, gocoap.AppJSON)
id := mux.Var(msg, "id")
chanID, err := strconv.ParseUint(id, 10, 64)
if err != nil {
res.Code = gocoap.NotFound
return res
}
publisher, err := authorize(msg, res, chanID)
if err != nil {
res.Code = gocoap.Forbidden
logger.Warn(fmt.Sprintf("Failed to authorize: %s", err))
return res
}
obsID := fmt.Sprintf("%x-%d-%d", msg.Token, publisher, chanID)
if msg.Type == gocoap.Acknowledgement {
responses <- obsID
return nil
}
if value, ok := msg.Option(gocoap.Observe).(uint32); (ok && value == 1) || msg.Type == gocoap.Reset {
svc.Unsubscribe(obsID)
}
if value, ok := msg.Option(gocoap.Observe).(uint32); ok && value == 0 {
res.AddOption(gocoap.Observe, 1)
o := coap.NewObserver()
if err := svc.Subscribe(chanID, obsID, o); err != nil {
logger.Warn(fmt.Sprintf("Failed to subscribe to NATS subject: %s", err))
res.Code = gocoap.InternalServerError
return res
}
go handleMessage(conn, addr, o, msg)
go ping(svc, obsID, conn, addr, o, msg)
go cancel(o)
}
return res
}
}
func cancel(observer *coap.Observer) {
<-observer.Cancel
close(observer.Messages)
observer.StoreExpired(true)
}
func handleMessage(conn *net.UDPConn, addr *net.UDPAddr, o *coap.Observer, msg *gocoap.Message) {
notifyMsg := *msg
notifyMsg.Type = gocoap.NonConfirmable
notifyMsg.Code = gocoap.Content
notifyMsg.RemoveOption(gocoap.URIQuery)
for {
msg, ok := <-o.Messages
if !ok {
return
}
payload, err := json.Marshal(msg)
if err != nil {
logger.Warn(fmt.Sprintf("Failed to parse received message: %s", err))
continue
}
notifyMsg.Payload = payload
notifyMsg.MessageID = o.LoadMessageID()
buff := new(bytes.Buffer)
observe := uint64(notifyMsg.MessageID)
if err := binary.Write(buff, binary.BigEndian, observe); err != nil {
logger.Warn(fmt.Sprintf("Failed to generate Observe option value: %s", err))
continue
}
observeVal := buff.Bytes()
notifyMsg.SetOption(gocoap.Observe, observeVal[len(observeVal)-3:])
if err := gocoap.Transmit(conn, addr, notifyMsg); err != nil {
logger.Warn(fmt.Sprintf("Failed to send message to observer: %s", err))
}
}
}
func ping(svc coap.Service, obsID string, conn *net.UDPConn, addr *net.UDPAddr, o *coap.Observer, msg *gocoap.Message) {
pingMsg := *msg
pingMsg.Payload = []byte{}
pingMsg.Type = gocoap.Confirmable
pingMsg.RemoveOption(gocoap.URIQuery)
// According to RFC (https://tools.ietf.org/html/rfc7641#page-18), CON message must be sent at least every
// 24 hours. Since 24 hours is too long for our purposes, we use 12.
t := time.NewTicker(12 * time.Hour)
defer t.Stop()
for {
select {
case _, ok := <-t.C:
if !ok || o.LoadExpired() {
return
}
o.StoreExpired(true)
timeout := float64(coap.AckTimeout)
logger.Info(fmt.Sprintf("Ping client %s.", obsID))
for i := 0; i < coap.MaxRetransmit; i++ {
pingMsg.MessageID = o.LoadMessageID()
gocoap.Transmit(conn, addr, pingMsg)
time.Sleep(time.Duration(timeout * coap.AckRandomFactor))
if !o.LoadExpired() {
break
}
timeout = 2 * timeout
}
if o.LoadExpired() {
svc.Unsubscribe(obsID)
return
}
case <-o.Cancel:
return
}
}
}