1
0
mirror of https://github.com/mainflux/mainflux.git synced 2025-04-24 13:48:49 +08:00

NOISSUE - Store successfull OPC-UA subscriptions (#999)

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>
This commit is contained in:
Manuel Imperiale 2020-01-10 17:56:03 +01:00 committed by GitHub
parent b3991b8497
commit 4ba1717582
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 107 additions and 54 deletions

View File

@ -5,9 +5,7 @@ package main
import (
"context"
"encoding/csv"
"fmt"
"io"
"log"
"net/http"
"os"
@ -20,6 +18,7 @@ import (
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/opcua"
"github.com/mainflux/mainflux/opcua/api"
"github.com/mainflux/mainflux/opcua/db"
"github.com/mainflux/mainflux/opcua/gopcua"
pub "github.com/mainflux/mainflux/opcua/nats"
"github.com/mainflux/mainflux/opcua/redis"
@ -44,7 +43,6 @@ const (
defRouteMapURL = "localhost:6379"
defRouteMapPass = ""
defRouteMapDB = "0"
defNodesConfig = "/config/nodes.csv"
envHTTPPort = "MF_OPCUA_ADAPTER_HTTP_PORT"
envLogLevel = "MF_OPCUA_ADAPTER_LOG_LEVEL"
@ -60,13 +58,10 @@ const (
envRouteMapURL = "MF_OPCUA_ADAPTER_ROUTE_MAP_URL"
envRouteMapPass = "MF_OPCUA_ADAPTER_ROUTE_MAP_PASS"
envRouteMapDB = "MF_OPCUA_ADAPTER_ROUTE_MAP_DB"
envNodesConfig = "MF_OPCUA_ADAPTER_CONFIG_FILE"
thingsRMPrefix = "thing"
channelsRMPrefix = "channel"
connectionRMPrefix = "connection"
columns = 2
)
type config struct {
@ -81,7 +76,6 @@ type config struct {
routeMapURL string
routeMapPass string
routeMapDB string
nodesConfig string
}
func main() {
@ -129,7 +123,7 @@ func main() {
}, []string{"method"}),
)
go subscribeToNodesFromFile(sub, cfg.nodesConfig, cfg.opcuaConfig, logger)
go subscribeToStoredSubs(sub, cfg.opcuaConfig, logger)
go subscribeToThingsES(svc, esConn, cfg.esConsumerName, logger)
errs := make(chan error, 2)
@ -165,7 +159,6 @@ func loadConfig() config {
routeMapURL: mainflux.Env(envRouteMapURL, defRouteMapURL),
routeMapPass: mainflux.Env(envRouteMapPass, defRouteMapPass),
routeMapDB: mainflux.Env(envRouteMapDB, defRouteMapDB),
nodesConfig: mainflux.Env(envNodesConfig, defNodesConfig),
}
}
@ -194,44 +187,21 @@ func connectToRedis(redisURL, redisPass, redisDB string, logger logger.Logger) *
})
}
func subscribeToNodesFromFile(sub opcua.Subscriber, nodes string, cfg opcua.Config, logger logger.Logger) {
if _, err := os.Stat(nodes); os.IsNotExist(err) {
logger.Warn(fmt.Sprintf("Config file not found: %s", err))
return
}
file, err := os.OpenFile(nodes, os.O_RDONLY, os.ModePerm)
func subscribeToStoredSubs(sub opcua.Subscriber, cfg opcua.Config, logger logger.Logger) {
// Get all stored subscriptions
nodes, err := db.ReadAll()
if err != nil {
logger.Warn(fmt.Sprintf("Failed to open config file: %s", err))
return
logger.Warn(fmt.Sprintf("Read stored subscriptions failed: %s", err))
}
defer file.Close()
reader := csv.NewReader(file)
for {
l, err := reader.Read()
if err == io.EOF {
break
}
if err != nil {
logger.Warn(fmt.Sprintf("Failed to read config file: %s", err))
return
}
if len(l) < columns {
logger.Warn("Empty or incomplete line found in file")
return
}
cfg.ServerURI = l[0]
cfg.NodeID = l[1]
go subscribe(sub, cfg, logger)
}
}
func subscribe(sub opcua.Subscriber, cfg opcua.Config, logger logger.Logger) {
if err := sub.Subscribe(cfg); err != nil {
logger.Warn(fmt.Sprintf("Subscription failed: %s", err))
for _, n := range nodes {
cfg.ServerURI = n.ServerURI
cfg.NodeID = n.NodeID
go func() {
if err := sub.Subscribe(cfg); err != nil {
logger.Warn(fmt.Sprintf("Subscription failed: %s", err))
}
}()
}
}

View File

@ -12,6 +12,9 @@ networks:
docker_mainflux-base-net:
external: true
volumes:
mainflux-opcua-adapter-volume:
services:
opcua-redis:
image: redis:5.0-alpine
@ -47,4 +50,4 @@ services:
networks:
- docker_mainflux-base-net
volumes:
- ./nodes.csv:/config/nodes.csv
- mainflux-opcua-adapter-volume:/store

View File

@ -1 +0,0 @@
opc.tcp://opcua.rocks:4840,ns=0;i=2256
1 opc.tcp://opcua.rocks:4840 ns=0;i=2256

75
opcua/db/subs.go Normal file
View File

@ -0,0 +1,75 @@
// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package db
import (
"encoding/csv"
"io"
"os"
"github.com/mainflux/mainflux/errors"
)
const columns = 2
const path = "/store/nodes.csv"
var (
errNotFound = errors.New("file not found")
errWriteFile = errors.New("failed de write file")
errOpenFile = errors.New("failed to open file")
errReadFile = errors.New("failed to read file")
errEmptyLine = errors.New("empty or incomplete line found in file")
)
// Node represents an OPC-UA node
type Node struct {
ServerURI string
NodeID string
}
// Save stores a successfull subscription
func Save(serverURI, nodeID string) error {
file, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY, os.ModePerm)
if err != nil {
errors.Wrap(errWriteFile, err)
}
csvWriter := csv.NewWriter(file)
csvWriter.Write([]string{serverURI, nodeID})
csvWriter.Flush()
return nil
}
// ReadAll returns all stored subscriptions
func ReadAll() ([]Node, error) {
if _, err := os.Stat(path); os.IsNotExist(err) {
return nil, errors.Wrap(errNotFound, err)
}
file, err := os.OpenFile(path, os.O_RDONLY, os.ModePerm)
if err != nil {
return nil, errors.Wrap(errOpenFile, err)
}
defer file.Close()
reader := csv.NewReader(file)
nodes := []Node{}
for {
l, err := reader.Read()
if err == io.EOF {
break
}
if err != nil {
return nil, errors.Wrap(errReadFile, err)
}
if len(l) < columns {
return nil, errEmptyLine
}
nodes = append(nodes, Node{l[0], l[1]})
}
return nodes, nil
}

View File

@ -14,6 +14,7 @@ import (
"github.com/mainflux/mainflux/errors"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/opcua"
"github.com/mainflux/mainflux/opcua/db"
)
const protocol = "opcua"
@ -124,7 +125,13 @@ func (c client) runHandler(sub *opcuaGopcua.Subscription, cfg opcua.Config) erro
go sub.Run(c.ctx)
c.logger.Info(fmt.Sprintf("subscribe to server %s and node_id %s", cfg.ServerURI, cfg.NodeID))
// Store subscription details
if err := db.Save(cfg.ServerURI, cfg.NodeID); err != nil {
return err
}
c.logger.Info(fmt.Sprintf("subscribed to server %s and node_id %s", cfg.ServerURI, cfg.NodeID))
for {
select {
case <-c.ctx.Done():

View File

@ -120,7 +120,12 @@ func (as *adapterService) ConnectThing(mfxChanID, mfxThingID string) error {
as.cfg.NodeID = nodeID
as.cfg.ServerURI = serverURI
go as.subscribe(as.cfg)
go func() {
if err := as.subscriber.Subscribe(as.cfg); err != nil {
as.logger.Warn(fmt.Sprintf("subscription failed: %s", err))
}
}()
c := fmt.Sprintf("%s:%s", mfxChanID, mfxThingID)
return as.connectRM.Save(c, c)
@ -138,9 +143,3 @@ func (as *adapterService) DisconnectThing(mfxChanID, mfxThingID string) error {
c := fmt.Sprintf("%s:%s", mfxChanID, mfxThingID)
return as.connectRM.Remove(c)
}
func (as *adapterService) subscribe(cfg Config) {
if err := as.subscriber.Subscribe(cfg); err != nil {
as.logger.Warn(fmt.Sprintf("subscription failed: %s", err))
}
}