From 2b7637fd7523840ad52e5bccc6360c67f3cb6c26 Mon Sep 17 00:00:00 2001 From: Manuel Imperiale Date: Wed, 5 Feb 2020 17:16:45 +0100 Subject: [PATCH] NOISSUE - Add opc-ua type handling and unsubscription (#1029) * NOISSUE - Add opc-ua type handling and unsubscription Signed-off-by: Manuel Imperiale * Create const for max childrens Signed-off-by: Manuel Imperiale --- opcua/browser.go | 2 +- opcua/gopcua/browser.go | 7 +++-- opcua/gopcua/subscribe.go | 58 ++++++++++++++++++++++++++------------- opcua/message.go | 13 --------- 4 files changed, 45 insertions(+), 35 deletions(-) delete mode 100644 opcua/message.go diff --git a/opcua/browser.go b/opcua/browser.go index bc676c24..523cc170 100644 --- a/opcua/browser.go +++ b/opcua/browser.go @@ -6,7 +6,7 @@ package opcua // BrowsedNode represents the details of a browsed OPC-UA node. type BrowsedNode struct { NodeID string - Type string + DataType string Description string Unit string Scale string diff --git a/opcua/gopcua/browser.go b/opcua/gopcua/browser.go index 108a6b29..8e187d7e 100644 --- a/opcua/gopcua/browser.go +++ b/opcua/gopcua/browser.go @@ -14,6 +14,9 @@ import ( "github.com/mainflux/mainflux/opcua" ) +const maxChildrens = 7 + +// NodeDef represents the node browser responnse type NodeDef struct { NodeID *uaGopcua.NodeID NodeClass uaGopcua.NodeClass @@ -69,7 +72,7 @@ func (c browser) Browse(serverURI, nodeID string) ([]opcua.BrowsedNode, error) { for _, s := range nodeList { node := opcua.BrowsedNode{ NodeID: s.NodeID.String(), - Type: s.DataType, + DataType: s.DataType, Description: s.Description, Unit: s.Unit, Scale: s.Scale, @@ -82,7 +85,7 @@ func (c browser) Browse(serverURI, nodeID string) ([]opcua.BrowsedNode, error) { } func browse(n *opcuaGopcua.Node, path string, level int) ([]NodeDef, error) { - if level > 10 { + if level > maxChildrens { return nil, nil } diff --git a/opcua/gopcua/subscribe.go b/opcua/gopcua/subscribe.go index 9e2799b9..bef3b4e3 100644 --- a/opcua/gopcua/subscribe.go +++ b/opcua/gopcua/subscribe.go @@ -46,6 +46,15 @@ type client struct { logger logger.Logger } +type message struct { + ServerURI string + NodeID string + Type string + Time int64 + DataKey string + Data interface{} +} + // NewSubscriber returns new OPC-UA client instance. func NewSubscriber(ctx context.Context, pub mainflux.MessagePublisher, thingsRM, channelsRM, connectRM opcua.RouteMapRepository, log logger.Logger) opcua.Subscriber { return client{ @@ -99,15 +108,15 @@ func (c client) Subscribe(cfg opcua.Config) error { } defer sub.Cancel() - if err := c.runHandler(sub, cfg); err != nil { - return err + if err := c.runHandler(sub, cfg.ServerURI, cfg.NodeID); err != nil { + c.logger.Warn(fmt.Sprintf("Unsubscribed from OPC-UA node %s.%s: %s", cfg.ServerURI, cfg.NodeID, err)) } return nil } -func (c client) runHandler(sub *opcuaGopcua.Subscription, cfg opcua.Config) error { - nodeID, err := uaGopcua.ParseNodeID(cfg.NodeID) +func (c client) runHandler(sub *opcuaGopcua.Subscription, uri, node string) error { + nodeID, err := uaGopcua.ParseNodeID(node) if err != nil { return errors.Wrap(errFailedParseNodeID, err) } @@ -126,11 +135,11 @@ func (c client) runHandler(sub *opcuaGopcua.Subscription, cfg opcua.Config) erro go sub.Run(c.ctx) // Store subscription details - if err := db.Save(cfg.ServerURI, cfg.NodeID); err != nil { + if err := db.Save(uri, node); err != nil { return err } - c.logger.Info(fmt.Sprintf("subscribed to server %s and node_id %s", cfg.ServerURI, cfg.NodeID)) + c.logger.Info(fmt.Sprintf("subscribed to server %s and node_id %s", uri, node)) for { select { @@ -145,32 +154,43 @@ func (c client) runHandler(sub *opcuaGopcua.Subscription, cfg opcua.Config) erro switch x := res.Value.(type) { case *uaGopcua.DataChangeNotification: for _, item := range x.MonitoredItems { - msg := opcua.Message{ - ServerURI: cfg.ServerURI, - NodeID: cfg.NodeID, + msg := message{ + ServerURI: uri, + NodeID: node, Type: item.Value.Value.Type().String(), Time: item.Value.SourceTimestamp.Unix(), + DataKey: "v", } switch item.Value.Value.Type() { case uaGopcua.TypeIDBoolean: + msg.DataKey = "vb" msg.Data = item.Value.Value.Bool() + case uaGopcua.TypeIDString: + msg.DataKey = "vs" + msg.Data = item.Value.Value.String() case uaGopcua.TypeIDInt64, uaGopcua.TypeIDInt32, uaGopcua.TypeIDInt16: - msg.Data = item.Value.Value.Int() + msg.Data = float64(item.Value.Value.Int()) case uaGopcua.TypeIDUint64: - msg.Data = item.Value.Value.Uint() + msg.Data = float64(item.Value.Value.Uint()) case uaGopcua.TypeIDFloat, uaGopcua.TypeIDDouble: msg.Data = item.Value.Value.Float() - case uaGopcua.TypeIDString: - msg.Data = item.Value.Value.String() case uaGopcua.TypeIDByte: - msg.Data = item.Value.Value.EncodingMask() + msg.DataKey = "vs" + msg.Data = string(item.Value.Value.EncodingMask()) + case uaGopcua.TypeIDDateTime: + msg.Data = item.Value.Value.Time() default: msg.Data = 0 } if err := c.publish(token, msg); err != nil { - c.logger.Warn(fmt.Sprintf("failed to publish: %s", err)) + switch err { + case errNotFoundServerURI, errNotFoundNodeID, errNotFoundConn: + return err + default: + c.logger.Error(fmt.Sprintf("Failed to publish: %s", err)) + } } } @@ -182,17 +202,17 @@ func (c client) runHandler(sub *opcuaGopcua.Subscription, cfg opcua.Config) erro } // Publish forwards messages from the OPC-UA Server to Mainflux NATS broker -func (c client) publish(token string, m opcua.Message) error { +func (c client) publish(token string, m message) error { // Get route-map of the OPC-UA ServerURI chanID, err := c.channelsRM.Get(m.ServerURI) if err != nil { - return fmt.Errorf("%s %s", errNotFoundServerURI, m.ServerURI) + return errNotFoundServerURI } // Get route-map of the OPC-UA NodeID thingID, err := c.thingsRM.Get(m.NodeID) if err != nil { - return fmt.Errorf("%s %s", errNotFoundNodeID, m.NodeID) + return errNotFoundNodeID } // Check connection between ServerURI and NodeID @@ -202,7 +222,7 @@ func (c client) publish(token string, m opcua.Message) error { } // Publish on Mainflux NATS broker - SenML := fmt.Sprintf(`[{"n":"%s", "t": %d, "v":%v}]`, m.Type, m.Time, m.Data) + SenML := fmt.Sprintf(`[{"n":"%s", "t": %d, "%s":%v}]`, m.Type, m.Time, m.DataKey, m.Data) payload := []byte(SenML) msg := mainflux.Message{ Publisher: thingID, diff --git a/opcua/message.go b/opcua/message.go deleted file mode 100644 index 3e8d0047..00000000 --- a/opcua/message.go +++ /dev/null @@ -1,13 +0,0 @@ -// Copyright (c) Mainflux -// SPDX-License-Identifier: Apache-2.0 - -package opcua - -// Message represent an OPC-UA message -type Message struct { - ServerURI string - NodeID string - Type string - Time int64 - Data interface{} -}