2019-08-30 14:09:12 +02:00
|
|
|
// Copyright (c) Mainflux
|
|
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
|
|
|
|
|
|
package bench
|
|
|
|
|
|
|
|
import (
|
|
|
|
"crypto/tls"
|
|
|
|
"fmt"
|
|
|
|
"io/ioutil"
|
|
|
|
"log"
|
|
|
|
"os"
|
|
|
|
"strconv"
|
|
|
|
"sync"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/BurntSushi/toml"
|
|
|
|
)
|
|
|
|
|
|
|
|
// Keep struct names exported, otherwise Viper unmarshaling won't work
|
|
|
|
type mqttBrokerConfig struct {
|
|
|
|
URL string `toml:"url" mapstructure:"url"`
|
|
|
|
}
|
|
|
|
|
|
|
|
type mqttMessageConfig struct {
|
|
|
|
Size int `toml:"size" mapstructure:"size"`
|
|
|
|
Format string `toml:"format" mapstructure:"format"`
|
|
|
|
QoS int `toml:"qos" mapstructure:"qos"`
|
|
|
|
Retain bool `toml:"retain" mapstructure:"retain"`
|
|
|
|
}
|
|
|
|
|
|
|
|
type mqttTLSConfig struct {
|
|
|
|
MTLS bool `toml:"mtls" mapstructure:"mtls"`
|
|
|
|
SkipTLSVer bool `toml:"skiptlsver" mapstructure:"skiptlsver"`
|
|
|
|
CA string `toml:"ca" mapstructure:"ca"`
|
|
|
|
}
|
|
|
|
|
|
|
|
type mqttConfig struct {
|
|
|
|
Broker mqttBrokerConfig `toml:"broker" mapstructure:"broker"`
|
|
|
|
Message mqttMessageConfig `toml:"message" mapstructure:"message"`
|
|
|
|
TLS mqttTLSConfig `toml:"tls" mapstructure:"tls"`
|
|
|
|
}
|
|
|
|
|
|
|
|
type testConfig struct {
|
|
|
|
Count int `toml:"count" mapstructure:"count"`
|
|
|
|
Pubs int `toml:"pubs" mapstructure:"pubs"`
|
|
|
|
Subs int `toml:"subs" mapstructure:"subs"`
|
|
|
|
}
|
|
|
|
|
|
|
|
type logConfig struct {
|
|
|
|
Quiet bool `toml:"quiet" mapstructure:"quiet"`
|
|
|
|
}
|
|
|
|
|
|
|
|
type mainfluxFile struct {
|
|
|
|
ConnFile string `toml:"connections_file" mapstructure:"connections_file"`
|
|
|
|
}
|
|
|
|
|
2019-09-05 11:39:58 +00:00
|
|
|
type mfThing struct {
|
|
|
|
ThingID string `toml:"thing_id" mapstructure:"thing_id"`
|
|
|
|
ThingKey string `toml:"thing_key" mapstructure:"thing_key"`
|
|
|
|
MTLSCert string `toml:"mtls_cert" mapstructure:"mtls_cert"`
|
|
|
|
MTLSKey string `toml:"mtls_key" mapstructure:"mtls_key"`
|
|
|
|
}
|
|
|
|
|
|
|
|
type mfChannel struct {
|
2019-08-30 14:09:12 +02:00
|
|
|
ChannelID string `toml:"channel_id" mapstructure:"channel_id"`
|
|
|
|
}
|
|
|
|
|
|
|
|
type mainflux struct {
|
2019-09-05 11:39:58 +00:00
|
|
|
Things []mfThing `toml:"things" mapstructure:"things"`
|
|
|
|
Channels []mfChannel `toml:"channels" mapstructure:"channels"`
|
2019-08-30 14:09:12 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
// Config struct holds benchmark configuration
|
|
|
|
type Config struct {
|
|
|
|
MQTT mqttConfig `toml:"mqtt" mapstructure:"mqtt"`
|
|
|
|
Test testConfig `toml:"test" mapstructure:"test"`
|
|
|
|
Log logConfig `toml:"log" mapstructure:"log"`
|
|
|
|
Mf mainfluxFile `toml:"mainflux" mapstructure:"mainflux"`
|
|
|
|
}
|
|
|
|
|
|
|
|
// JSONResults are used to export results as a JSON document
|
|
|
|
type JSONResults struct {
|
2019-09-05 11:39:58 +00:00
|
|
|
Runs []*runResults `json:"runs"`
|
|
|
|
Totals *totalResults `json:"totals"`
|
2019-08-30 14:09:12 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
// Benchmark - main benckhmarking function
|
|
|
|
func Benchmark(cfg Config) {
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
var err error
|
|
|
|
|
|
|
|
checkConnection(cfg.MQTT.Broker.URL, 1)
|
2019-09-05 11:39:58 +00:00
|
|
|
subTimes := make(subTimes)
|
2019-08-30 14:09:12 +02:00
|
|
|
var caByte []byte
|
|
|
|
if cfg.MQTT.TLS.MTLS {
|
|
|
|
caFile, err := os.Open(cfg.MQTT.TLS.CA)
|
|
|
|
defer caFile.Close()
|
|
|
|
if err != nil {
|
|
|
|
fmt.Println(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
caByte, _ = ioutil.ReadAll(caFile)
|
|
|
|
}
|
|
|
|
|
|
|
|
payload := string(make([]byte, cfg.MQTT.Message.Size))
|
|
|
|
|
|
|
|
mf := mainflux{}
|
|
|
|
if _, err := toml.DecodeFile(cfg.Mf.ConnFile, &mf); err != nil {
|
|
|
|
log.Fatalf("Cannot load Mainflux connections config %s \nuse tools/provision to create file", cfg.Mf.ConnFile)
|
|
|
|
}
|
|
|
|
|
2019-09-05 11:39:58 +00:00
|
|
|
resCh := make(chan *runResults)
|
2019-08-30 14:09:12 +02:00
|
|
|
done := make(chan bool)
|
|
|
|
|
2019-09-05 11:39:58 +00:00
|
|
|
n := len(mf.Channels)
|
2019-08-30 14:09:12 +02:00
|
|
|
var cert tls.Certificate
|
|
|
|
|
|
|
|
// Subscribers
|
|
|
|
for i := 0; i < cfg.Test.Subs; i++ {
|
2019-09-05 11:39:58 +00:00
|
|
|
mfChann := mf.Channels[i%n]
|
|
|
|
mfThing := mf.Things[i%n]
|
2019-08-30 14:09:12 +02:00
|
|
|
|
|
|
|
if cfg.MQTT.TLS.MTLS {
|
2019-09-05 11:39:58 +00:00
|
|
|
cert, err = tls.X509KeyPair([]byte(mfThing.MTLSCert), []byte(mfThing.MTLSKey))
|
2019-08-30 14:09:12 +02:00
|
|
|
if err != nil {
|
|
|
|
log.Fatal(err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
c := &Client{
|
|
|
|
ID: strconv.Itoa(i),
|
|
|
|
BrokerURL: cfg.MQTT.Broker.URL,
|
2019-09-05 11:39:58 +00:00
|
|
|
BrokerUser: mfThing.ThingID,
|
|
|
|
BrokerPass: mfThing.ThingKey,
|
|
|
|
MsgTopic: fmt.Sprintf("channels/%s/messages/test", mfChann.ChannelID),
|
2019-08-30 14:09:12 +02:00
|
|
|
MsgSize: cfg.MQTT.Message.Size,
|
|
|
|
MsgCount: cfg.Test.Count,
|
|
|
|
MsgQoS: byte(cfg.MQTT.Message.QoS),
|
|
|
|
Quiet: cfg.Log.Quiet,
|
|
|
|
MTLS: cfg.MQTT.TLS.MTLS,
|
|
|
|
SkipTLSVer: cfg.MQTT.TLS.SkipTLSVer,
|
|
|
|
CA: caByte,
|
|
|
|
ClientCert: cert,
|
|
|
|
Retain: cfg.MQTT.Message.Retain,
|
|
|
|
Message: payload,
|
|
|
|
}
|
|
|
|
|
|
|
|
wg.Add(1)
|
|
|
|
|
|
|
|
go c.runSubscriber(&wg, &subTimes, &done)
|
|
|
|
}
|
|
|
|
|
|
|
|
wg.Wait()
|
|
|
|
|
2019-09-05 11:39:58 +00:00
|
|
|
start := time.Now()
|
2019-08-30 14:09:12 +02:00
|
|
|
// Publishers
|
|
|
|
for i := 0; i < cfg.Test.Pubs; i++ {
|
2019-09-05 11:39:58 +00:00
|
|
|
mfChann := mf.Channels[i%n]
|
|
|
|
mfThing := mf.Things[i%n]
|
2019-08-30 14:09:12 +02:00
|
|
|
|
|
|
|
if cfg.MQTT.TLS.MTLS {
|
2019-09-05 11:39:58 +00:00
|
|
|
cert, err = tls.X509KeyPair([]byte(mfThing.MTLSCert), []byte(mfThing.MTLSKey))
|
2019-08-30 14:09:12 +02:00
|
|
|
if err != nil {
|
|
|
|
log.Fatal(err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
c := &Client{
|
|
|
|
ID: strconv.Itoa(i),
|
|
|
|
BrokerURL: cfg.MQTT.Broker.URL,
|
2019-09-05 11:39:58 +00:00
|
|
|
BrokerUser: mfThing.ThingID,
|
|
|
|
BrokerPass: mfThing.ThingKey,
|
|
|
|
MsgTopic: fmt.Sprintf("channels/%s/messages/test", mfChann.ChannelID),
|
2019-08-30 14:09:12 +02:00
|
|
|
MsgSize: cfg.MQTT.Message.Size,
|
|
|
|
MsgCount: cfg.Test.Count,
|
|
|
|
MsgQoS: byte(cfg.MQTT.Message.QoS),
|
|
|
|
Quiet: cfg.Log.Quiet,
|
|
|
|
MTLS: cfg.MQTT.TLS.MTLS,
|
|
|
|
SkipTLSVer: cfg.MQTT.TLS.SkipTLSVer,
|
|
|
|
CA: caByte,
|
|
|
|
ClientCert: cert,
|
|
|
|
Retain: cfg.MQTT.Message.Retain,
|
|
|
|
Message: payload,
|
|
|
|
}
|
|
|
|
|
|
|
|
go c.runPublisher(resCh)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Collect the results
|
2019-09-05 11:39:58 +00:00
|
|
|
var results []*runResults
|
2019-08-30 14:09:12 +02:00
|
|
|
if cfg.Test.Pubs > 0 {
|
2019-09-05 11:39:58 +00:00
|
|
|
results = make([]*runResults, cfg.Test.Pubs)
|
2019-08-30 14:09:12 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
for i := 0; i < cfg.Test.Pubs; i++ {
|
|
|
|
results[i] = <-resCh
|
|
|
|
}
|
|
|
|
|
|
|
|
totalTime := time.Now().Sub(start)
|
|
|
|
totals := calculateTotalResults(results, totalTime, &subTimes)
|
|
|
|
if totals == nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// Print sats
|
|
|
|
printResults(results, totals, cfg.MQTT.Message.Format, cfg.Log.Quiet)
|
|
|
|
}
|