diff --git a/cmd/opcua/main.go b/cmd/opcua/main.go index a6625872..3b526958 100644 --- a/cmd/opcua/main.go +++ b/cmd/opcua/main.go @@ -5,9 +5,7 @@ package main import ( "context" - "encoding/csv" "fmt" - "io" "log" "net/http" "os" @@ -20,6 +18,7 @@ import ( "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/opcua" "github.com/mainflux/mainflux/opcua/api" + "github.com/mainflux/mainflux/opcua/db" "github.com/mainflux/mainflux/opcua/gopcua" pub "github.com/mainflux/mainflux/opcua/nats" "github.com/mainflux/mainflux/opcua/redis" @@ -44,7 +43,6 @@ const ( defRouteMapURL = "localhost:6379" defRouteMapPass = "" defRouteMapDB = "0" - defNodesConfig = "/config/nodes.csv" envHTTPPort = "MF_OPCUA_ADAPTER_HTTP_PORT" envLogLevel = "MF_OPCUA_ADAPTER_LOG_LEVEL" @@ -60,13 +58,10 @@ const ( envRouteMapURL = "MF_OPCUA_ADAPTER_ROUTE_MAP_URL" envRouteMapPass = "MF_OPCUA_ADAPTER_ROUTE_MAP_PASS" envRouteMapDB = "MF_OPCUA_ADAPTER_ROUTE_MAP_DB" - envNodesConfig = "MF_OPCUA_ADAPTER_CONFIG_FILE" thingsRMPrefix = "thing" channelsRMPrefix = "channel" connectionRMPrefix = "connection" - - columns = 2 ) type config struct { @@ -81,7 +76,6 @@ type config struct { routeMapURL string routeMapPass string routeMapDB string - nodesConfig string } func main() { @@ -129,7 +123,7 @@ func main() { }, []string{"method"}), ) - go subscribeToNodesFromFile(sub, cfg.nodesConfig, cfg.opcuaConfig, logger) + go subscribeToStoredSubs(sub, cfg.opcuaConfig, logger) go subscribeToThingsES(svc, esConn, cfg.esConsumerName, logger) errs := make(chan error, 2) @@ -165,7 +159,6 @@ func loadConfig() config { routeMapURL: mainflux.Env(envRouteMapURL, defRouteMapURL), routeMapPass: mainflux.Env(envRouteMapPass, defRouteMapPass), routeMapDB: mainflux.Env(envRouteMapDB, defRouteMapDB), - nodesConfig: mainflux.Env(envNodesConfig, defNodesConfig), } } @@ -194,44 +187,21 @@ func connectToRedis(redisURL, redisPass, redisDB string, logger logger.Logger) * }) } -func subscribeToNodesFromFile(sub opcua.Subscriber, nodes string, cfg opcua.Config, logger logger.Logger) { - if _, err := os.Stat(nodes); os.IsNotExist(err) { - logger.Warn(fmt.Sprintf("Config file not found: %s", err)) - return - } - - file, err := os.OpenFile(nodes, os.O_RDONLY, os.ModePerm) +func subscribeToStoredSubs(sub opcua.Subscriber, cfg opcua.Config, logger logger.Logger) { + // Get all stored subscriptions + nodes, err := db.ReadAll() if err != nil { - logger.Warn(fmt.Sprintf("Failed to open config file: %s", err)) - return + logger.Warn(fmt.Sprintf("Read stored subscriptions failed: %s", err)) } - defer file.Close() - reader := csv.NewReader(file) - for { - l, err := reader.Read() - if err == io.EOF { - break - } - if err != nil { - logger.Warn(fmt.Sprintf("Failed to read config file: %s", err)) - return - } - - if len(l) < columns { - logger.Warn("Empty or incomplete line found in file") - return - } - - cfg.ServerURI = l[0] - cfg.NodeID = l[1] - go subscribe(sub, cfg, logger) - } -} - -func subscribe(sub opcua.Subscriber, cfg opcua.Config, logger logger.Logger) { - if err := sub.Subscribe(cfg); err != nil { - logger.Warn(fmt.Sprintf("Subscription failed: %s", err)) + for _, n := range nodes { + cfg.ServerURI = n.ServerURI + cfg.NodeID = n.NodeID + go func() { + if err := sub.Subscribe(cfg); err != nil { + logger.Warn(fmt.Sprintf("Subscription failed: %s", err)) + } + }() } } diff --git a/docker/addons/opcua-adapter/docker-compose.yml b/docker/addons/opcua-adapter/docker-compose.yml index 74059f4c..0d37b359 100644 --- a/docker/addons/opcua-adapter/docker-compose.yml +++ b/docker/addons/opcua-adapter/docker-compose.yml @@ -12,6 +12,9 @@ networks: docker_mainflux-base-net: external: true +volumes: + mainflux-opcua-adapter-volume: + services: opcua-redis: image: redis:5.0-alpine @@ -47,4 +50,4 @@ services: networks: - docker_mainflux-base-net volumes: - - ./nodes.csv:/config/nodes.csv + - mainflux-opcua-adapter-volume:/store diff --git a/docker/addons/opcua-adapter/nodes.csv b/docker/addons/opcua-adapter/nodes.csv deleted file mode 100644 index e65be3a9..00000000 --- a/docker/addons/opcua-adapter/nodes.csv +++ /dev/null @@ -1 +0,0 @@ -opc.tcp://opcua.rocks:4840,ns=0;i=2256 diff --git a/opcua/db/subs.go b/opcua/db/subs.go new file mode 100644 index 00000000..2e2279a0 --- /dev/null +++ b/opcua/db/subs.go @@ -0,0 +1,75 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package db + +import ( + "encoding/csv" + "io" + "os" + + "github.com/mainflux/mainflux/errors" +) + +const columns = 2 +const path = "/store/nodes.csv" + +var ( + errNotFound = errors.New("file not found") + errWriteFile = errors.New("failed de write file") + errOpenFile = errors.New("failed to open file") + errReadFile = errors.New("failed to read file") + errEmptyLine = errors.New("empty or incomplete line found in file") +) + +// Node represents an OPC-UA node +type Node struct { + ServerURI string + NodeID string +} + +// Save stores a successfull subscription +func Save(serverURI, nodeID string) error { + file, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY, os.ModePerm) + if err != nil { + errors.Wrap(errWriteFile, err) + } + csvWriter := csv.NewWriter(file) + csvWriter.Write([]string{serverURI, nodeID}) + csvWriter.Flush() + + return nil +} + +// ReadAll returns all stored subscriptions +func ReadAll() ([]Node, error) { + if _, err := os.Stat(path); os.IsNotExist(err) { + return nil, errors.Wrap(errNotFound, err) + } + + file, err := os.OpenFile(path, os.O_RDONLY, os.ModePerm) + if err != nil { + return nil, errors.Wrap(errOpenFile, err) + } + defer file.Close() + + reader := csv.NewReader(file) + nodes := []Node{} + for { + l, err := reader.Read() + if err == io.EOF { + break + } + if err != nil { + return nil, errors.Wrap(errReadFile, err) + } + + if len(l) < columns { + return nil, errEmptyLine + } + + nodes = append(nodes, Node{l[0], l[1]}) + } + + return nodes, nil +} diff --git a/opcua/gopcua/subscribe.go b/opcua/gopcua/subscribe.go index b38ce426..d1e6baf8 100644 --- a/opcua/gopcua/subscribe.go +++ b/opcua/gopcua/subscribe.go @@ -14,6 +14,7 @@ import ( "github.com/mainflux/mainflux/errors" "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/opcua" + "github.com/mainflux/mainflux/opcua/db" ) const protocol = "opcua" @@ -124,7 +125,13 @@ func (c client) runHandler(sub *opcuaGopcua.Subscription, cfg opcua.Config) erro go sub.Run(c.ctx) - c.logger.Info(fmt.Sprintf("subscribe to server %s and node_id %s", cfg.ServerURI, cfg.NodeID)) + // Store subscription details + if err := db.Save(cfg.ServerURI, cfg.NodeID); err != nil { + return err + } + + c.logger.Info(fmt.Sprintf("subscribed to server %s and node_id %s", cfg.ServerURI, cfg.NodeID)) + for { select { case <-c.ctx.Done(): diff --git a/opcua/service.go b/opcua/service.go index 6413aada..7ac0e49f 100644 --- a/opcua/service.go +++ b/opcua/service.go @@ -120,7 +120,12 @@ func (as *adapterService) ConnectThing(mfxChanID, mfxThingID string) error { as.cfg.NodeID = nodeID as.cfg.ServerURI = serverURI - go as.subscribe(as.cfg) + + go func() { + if err := as.subscriber.Subscribe(as.cfg); err != nil { + as.logger.Warn(fmt.Sprintf("subscription failed: %s", err)) + } + }() c := fmt.Sprintf("%s:%s", mfxChanID, mfxThingID) return as.connectRM.Save(c, c) @@ -138,9 +143,3 @@ func (as *adapterService) DisconnectThing(mfxChanID, mfxThingID string) error { c := fmt.Sprintf("%s:%s", mfxChanID, mfxThingID) return as.connectRM.Remove(c) } - -func (as *adapterService) subscribe(cfg Config) { - if err := as.subscriber.Subscribe(cfg); err != nil { - as.logger.Warn(fmt.Sprintf("subscription failed: %s", err)) - } -}