1
0
mirror of https://github.com/mainflux/mainflux.git synced 2025-04-26 13:48:53 +08:00

NOISSUE - Add Node IdentifierType config in opcua-adapter (#967)

* NOISSUE - Add Node IdentifierType config in opcua-adapter

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Rm Read from subscribeToOpcServers

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>
This commit is contained in:
Manuel Imperiale 2019-11-26 20:07:04 +01:00 committed by Nikola Marčetić
parent 87dd91e328
commit 277342b8ca
6 changed files with 34 additions and 27 deletions

View File

@ -44,7 +44,7 @@ const (
defRouteMapURL = "localhost:6379"
defRouteMapPass = ""
defRouteMapDB = "0"
defNodesConfig = "/nodes.csv"
defNodesConfig = "/config/nodes.csv"
envHTTPPort = "MF_OPCUA_ADAPTER_HTTP_PORT"
envLogLevel = "MF_OPCUA_ADAPTER_LOG_LEVEL"
@ -60,7 +60,7 @@ const (
envRouteMapURL = "MF_OPCUA_ADAPTER_ROUTE_MAP_URL"
envRouteMapPass = "MF_OPCUA_ADAPTER_ROUTE_MAP_PASS"
envRouteMapDB = "MF_OPCUA_ADAPTER_ROUTE_MAP_DB"
envNodesConfig = "/nodes.csv"
envNodesConfig = "MF_OPCUA_ADAPTER_CONFIG_FILE"
thingsRMPrefix = "thing"
channelsRMPrefix = "channel"
@ -222,14 +222,15 @@ func subscribeToOpcuaServers(svc opcua.Service, nodes string, cfg opcua.Config,
return
}
if len(l) < 3 {
if len(l) < 4 {
logger.Warn(fmt.Sprintf("Empty or incomplete line found in file"))
return
}
cfg.ServerURI = l[0]
cfg.NodeNamespace = l[1]
cfg.NodeIdintifier = l[2]
cfg.NodeIdentifierType = l[2]
cfg.NodeIdentifier = l[3]
go subscribeToOpcuaServer(gc, cfg, logger)
}

View File

@ -47,4 +47,4 @@ services:
networks:
- docker_mainflux-base-net
volumes:
- ./nodes.csv:/nodes.csv
- ./nodes.csv:/config/nodes.csv

View File

@ -1,4 +1,4 @@
opc.tcp://opcua.rocks:4840,0,2255
opc.tcp://opcua.rocks:4840,0,2256
opc.tcp://opcua.rocks:4840,1,2255
opc.tcp://opcua.rocks:4840,1,2256
opc.tcp://opcua.rocks:4840,0,i,2255
opc.tcp://opcua.rocks:4840,0,i,2256
opc.tcp://opcua.rocks:4840,1,i,2255
opc.tcp://opcua.rocks:4840,1,i,2256

1 opc.tcp://opcua.rocks:4840 0 i 2255
2 opc.tcp://opcua.rocks:4840 0 i 2256
3 opc.tcp://opcua.rocks:4840 1 i 2255
4 opc.tcp://opcua.rocks:4840 1 i 2256

View File

@ -6,10 +6,10 @@ package gopcua
import (
"context"
"fmt"
"log"
opcuaGopcua "github.com/gopcua/opcua"
uaGopcua "github.com/gopcua/opcua/ua"
"github.com/mainflux/mainflux/errors"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/opcua"
)
@ -35,14 +35,14 @@ func NewReader(ctx context.Context, svc opcua.Service, log logger.Logger) opcua.
func (r reader) Read(cfg opcua.Config) error {
c := opcuaGopcua.NewClient(cfg.ServerURI, opcuaGopcua.SecurityMode(uaGopcua.MessageSecurityModeNone))
if err := c.Connect(r.ctx); err != nil {
log.Fatal(err)
return errors.Wrap(errFailedConn, err)
}
defer c.Close()
nid := fmt.Sprintf("ns=%s;i=%s", cfg.NodeNamespace, cfg.NodeIdintifier)
nid := fmt.Sprintf("ns=%s;%s=%s", cfg.NodeNamespace, cfg.NodeIdentifierType, cfg.NodeIdentifier)
id, err := uaGopcua.ParseNodeID(nid)
if err != nil {
r.logger.Error(fmt.Sprintf("invalid node id: %v", err))
return errors.Wrap(errFailedParseNodeID, err)
}
req := &uaGopcua.ReadRequest{
@ -55,16 +55,16 @@ func (r reader) Read(cfg opcua.Config) error {
resp, err := c.Read(req)
if err != nil {
r.logger.Error(fmt.Sprintf("Read failed: %s", err))
return errors.Wrap(errFailedRead, err)
}
if resp.Results[0].Status != uaGopcua.StatusOK {
r.logger.Error(fmt.Sprintf("Status not OK: %v", resp.Results[0].Status))
return errResponseStatus
}
// Publish on Mainflux NATS broker
msg := opcua.Message{
Namespace: cfg.NodeNamespace,
ID: cfg.NodeIdintifier,
ID: cfg.NodeIdentifier,
Data: resp.Results[0].Value.Float(),
}
r.svc.Publish(r.ctx, "", msg)

View File

@ -17,11 +17,13 @@ import (
var (
errFailedConn = errors.New("Failed to connect")
errFailedRead = errors.New("Failed to read")
errFailedSub = errors.New("Failed to subscribe")
errFailedFindEndpoint = errors.New("Failed to find suitable endpoint")
errFailedFetchEndpoint = errors.New("Failed to fetch OPC-UA server endpoints")
errFailedParseNodeID = errors.New("Failed to parse NodeID")
errFailedCreateReq = errors.New("Failed to creeate request")
errFailedCreateReq = errors.New("Failed to create request")
errResponseStatus = errors.New("Response status not OK")
)
var _ opcua.Subscriber = (*client)(nil)
@ -87,7 +89,7 @@ func (b client) Subscribe(cfg opcua.Config) error {
}
func (b client) runHandler(sub *opcuaGopcua.Subscription, cfg opcua.Config) error {
nid := fmt.Sprintf("ns=%s;i=%s", cfg.NodeNamespace, cfg.NodeIdintifier)
nid := fmt.Sprintf("ns=%s;%s=%s", cfg.NodeNamespace, cfg.NodeIdentifierType, cfg.NodeIdentifier)
nodeID, err := uaGopcua.ParseNodeID(nid)
if err != nil {
return errors.Wrap(errFailedParseNodeID, err)
@ -97,9 +99,12 @@ func (b client) runHandler(sub *opcuaGopcua.Subscription, cfg opcua.Config) erro
handle := uint32(42)
miCreateRequest := opcuaGopcua.NewMonitoredItemCreateRequestWithDefaults(nodeID, uaGopcua.AttributeIDValue, handle)
res, err := sub.Monitor(uaGopcua.TimestampsToReturnBoth, miCreateRequest)
if err != nil || res.Results[0].StatusCode != uaGopcua.StatusOK {
if err != nil {
return errors.Wrap(errFailedCreateReq, err)
}
if res.Results[0].StatusCode != uaGopcua.StatusOK {
return errResponseStatus
}
go sub.Run(b.ctx)
@ -119,7 +124,7 @@ func (b client) runHandler(sub *opcuaGopcua.Subscription, cfg opcua.Config) erro
// Publish on Mainflux NATS broker
msg := opcua.Message{
Namespace: cfg.NodeNamespace,
ID: cfg.NodeIdintifier,
ID: cfg.NodeIdentifier,
Data: item.Value.Value.Float(),
}
b.svc.Publish(b.ctx, "", msg)

View File

@ -59,13 +59,14 @@ type Service interface {
// Config OPC-UA Server
type Config struct {
ServerURI string
NodeNamespace string
NodeIdintifier string
Policy string
Mode string
CertFile string
KeyFile string
ServerURI string
NodeNamespace string
NodeIdentifier string
NodeIdentifierType string
Policy string
Mode string
CertFile string
KeyFile string
}
var _ Service = (*adapterService)(nil)