1
0
mirror of https://github.com/mainflux/mainflux.git synced 2025-04-24 13:48:49 +08:00

NOISSUE - Add opc-ua type handling and unsubscription (#1029)

* NOISSUE - Add opc-ua type handling and unsubscription

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Create const for max childrens

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>
This commit is contained in:
Manuel Imperiale 2020-02-05 17:16:45 +01:00 committed by GitHub
parent f785116a6f
commit 2b7637fd75
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 45 additions and 35 deletions

View File

@ -6,7 +6,7 @@ package opcua
// BrowsedNode represents the details of a browsed OPC-UA node.
type BrowsedNode struct {
NodeID string
Type string
DataType string
Description string
Unit string
Scale string

View File

@ -14,6 +14,9 @@ import (
"github.com/mainflux/mainflux/opcua"
)
const maxChildrens = 7
// NodeDef represents the node browser responnse
type NodeDef struct {
NodeID *uaGopcua.NodeID
NodeClass uaGopcua.NodeClass
@ -69,7 +72,7 @@ func (c browser) Browse(serverURI, nodeID string) ([]opcua.BrowsedNode, error) {
for _, s := range nodeList {
node := opcua.BrowsedNode{
NodeID: s.NodeID.String(),
Type: s.DataType,
DataType: s.DataType,
Description: s.Description,
Unit: s.Unit,
Scale: s.Scale,
@ -82,7 +85,7 @@ func (c browser) Browse(serverURI, nodeID string) ([]opcua.BrowsedNode, error) {
}
func browse(n *opcuaGopcua.Node, path string, level int) ([]NodeDef, error) {
if level > 10 {
if level > maxChildrens {
return nil, nil
}

View File

@ -46,6 +46,15 @@ type client struct {
logger logger.Logger
}
type message struct {
ServerURI string
NodeID string
Type string
Time int64
DataKey string
Data interface{}
}
// NewSubscriber returns new OPC-UA client instance.
func NewSubscriber(ctx context.Context, pub mainflux.MessagePublisher, thingsRM, channelsRM, connectRM opcua.RouteMapRepository, log logger.Logger) opcua.Subscriber {
return client{
@ -99,15 +108,15 @@ func (c client) Subscribe(cfg opcua.Config) error {
}
defer sub.Cancel()
if err := c.runHandler(sub, cfg); err != nil {
return err
if err := c.runHandler(sub, cfg.ServerURI, cfg.NodeID); err != nil {
c.logger.Warn(fmt.Sprintf("Unsubscribed from OPC-UA node %s.%s: %s", cfg.ServerURI, cfg.NodeID, err))
}
return nil
}
func (c client) runHandler(sub *opcuaGopcua.Subscription, cfg opcua.Config) error {
nodeID, err := uaGopcua.ParseNodeID(cfg.NodeID)
func (c client) runHandler(sub *opcuaGopcua.Subscription, uri, node string) error {
nodeID, err := uaGopcua.ParseNodeID(node)
if err != nil {
return errors.Wrap(errFailedParseNodeID, err)
}
@ -126,11 +135,11 @@ func (c client) runHandler(sub *opcuaGopcua.Subscription, cfg opcua.Config) erro
go sub.Run(c.ctx)
// Store subscription details
if err := db.Save(cfg.ServerURI, cfg.NodeID); err != nil {
if err := db.Save(uri, node); err != nil {
return err
}
c.logger.Info(fmt.Sprintf("subscribed to server %s and node_id %s", cfg.ServerURI, cfg.NodeID))
c.logger.Info(fmt.Sprintf("subscribed to server %s and node_id %s", uri, node))
for {
select {
@ -145,32 +154,43 @@ func (c client) runHandler(sub *opcuaGopcua.Subscription, cfg opcua.Config) erro
switch x := res.Value.(type) {
case *uaGopcua.DataChangeNotification:
for _, item := range x.MonitoredItems {
msg := opcua.Message{
ServerURI: cfg.ServerURI,
NodeID: cfg.NodeID,
msg := message{
ServerURI: uri,
NodeID: node,
Type: item.Value.Value.Type().String(),
Time: item.Value.SourceTimestamp.Unix(),
DataKey: "v",
}
switch item.Value.Value.Type() {
case uaGopcua.TypeIDBoolean:
msg.DataKey = "vb"
msg.Data = item.Value.Value.Bool()
case uaGopcua.TypeIDString:
msg.DataKey = "vs"
msg.Data = item.Value.Value.String()
case uaGopcua.TypeIDInt64, uaGopcua.TypeIDInt32, uaGopcua.TypeIDInt16:
msg.Data = item.Value.Value.Int()
msg.Data = float64(item.Value.Value.Int())
case uaGopcua.TypeIDUint64:
msg.Data = item.Value.Value.Uint()
msg.Data = float64(item.Value.Value.Uint())
case uaGopcua.TypeIDFloat, uaGopcua.TypeIDDouble:
msg.Data = item.Value.Value.Float()
case uaGopcua.TypeIDString:
msg.Data = item.Value.Value.String()
case uaGopcua.TypeIDByte:
msg.Data = item.Value.Value.EncodingMask()
msg.DataKey = "vs"
msg.Data = string(item.Value.Value.EncodingMask())
case uaGopcua.TypeIDDateTime:
msg.Data = item.Value.Value.Time()
default:
msg.Data = 0
}
if err := c.publish(token, msg); err != nil {
c.logger.Warn(fmt.Sprintf("failed to publish: %s", err))
switch err {
case errNotFoundServerURI, errNotFoundNodeID, errNotFoundConn:
return err
default:
c.logger.Error(fmt.Sprintf("Failed to publish: %s", err))
}
}
}
@ -182,17 +202,17 @@ func (c client) runHandler(sub *opcuaGopcua.Subscription, cfg opcua.Config) erro
}
// Publish forwards messages from the OPC-UA Server to Mainflux NATS broker
func (c client) publish(token string, m opcua.Message) error {
func (c client) publish(token string, m message) error {
// Get route-map of the OPC-UA ServerURI
chanID, err := c.channelsRM.Get(m.ServerURI)
if err != nil {
return fmt.Errorf("%s %s", errNotFoundServerURI, m.ServerURI)
return errNotFoundServerURI
}
// Get route-map of the OPC-UA NodeID
thingID, err := c.thingsRM.Get(m.NodeID)
if err != nil {
return fmt.Errorf("%s %s", errNotFoundNodeID, m.NodeID)
return errNotFoundNodeID
}
// Check connection between ServerURI and NodeID
@ -202,7 +222,7 @@ func (c client) publish(token string, m opcua.Message) error {
}
// Publish on Mainflux NATS broker
SenML := fmt.Sprintf(`[{"n":"%s", "t": %d, "v":%v}]`, m.Type, m.Time, m.Data)
SenML := fmt.Sprintf(`[{"n":"%s", "t": %d, "%s":%v}]`, m.Type, m.Time, m.DataKey, m.Data)
payload := []byte(SenML)
msg := mainflux.Message{
Publisher: thingID,

View File

@ -1,13 +0,0 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package opcua
// Message represent an OPC-UA message
type Message struct {
ServerURI string
NodeID string
Type string
Time int64
Data interface{}
}