1
0
mirror of https://github.com/mainflux/mainflux.git synced 2025-04-26 13:48:53 +08:00

NOISSUE - Influxdb batch add (#394)

* Add batch of streams

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Add env variables for batch point setup

Fix InfluxDB reader tests.

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Update Compose and Kubernetes config

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Update env variables

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Update docs

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Remove unused const

Refactor code and docs.

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Handle NewBatchPoints error

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Reduce wait time in  batch save test

Fix typos.

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Update tests

Use single test with multiple cases to test single point save as well as
a batch of points.

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>

* Add an explanation for not resetting ticker

Signed-off-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
This commit is contained in:
Dušan Borovčanin 2018-09-23 01:53:03 +02:00 committed by Drasko DRASKOVIC
parent 8e9d7b4fdb
commit 3de34062db
8 changed files with 222 additions and 86 deletions

View File

@ -12,7 +12,9 @@ import (
"net/http"
"os"
"os/signal"
"strconv"
"syscall"
"time"
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
influxdata "github.com/influxdata/influxdb/client/v2"
@ -27,37 +29,42 @@ import (
const (
queue = "influxdb-writer"
defNatsURL = nats.DefaultURL
defPort = "8180"
defPointName = "messages"
defDBName = "mainflux"
defDBHost = "localhost"
defDBPort = "8086"
defDBUser = "mainflux"
defDBPass = "mainflux"
defNatsURL = nats.DefaultURL
defPort = "8180"
defBatchSize = "5000"
defBatchTimeout = "5"
defDBName = "mainflux"
defDBHost = "localhost"
defDBPort = "8086"
defDBUser = "mainflux"
defDBPass = "mainflux"
envNatsURL = "MF_NATS_URL"
envPort = "MF_INFLUX_WRITER_PORT"
envDBName = "MF_INFLUX_WRITER_DB_NAME"
envDBHost = "MF_INFLUX_WRITER_DB_HOST"
envDBPort = "MF_INFLUX_WRITER_DB_PORT"
envDBUser = "MF_INFLUX_WRITER_DB_USER"
envDBPass = "MF_INFLUX_WRITER_DB_PASS"
envNatsURL = "MF_NATS_URL"
envPort = "MF_INFLUX_WRITER_PORT"
envBatchSize = "MF_INFLUX_WRITER_BATCH_SIZE"
envBatchTimeout = "MF_INFLUX_WRITER_BATCH_TIMEOUT"
envDBName = "MF_INFLUX_WRITER_DB_NAME"
envDBHost = "MF_INFLUX_WRITER_DB_HOST"
envDBPort = "MF_INFLUX_WRITER_DB_PORT"
envDBUser = "MF_INFLUX_WRITER_DB_USER"
envDBPass = "MF_INFLUX_WRITER_DB_PASS"
)
type config struct {
NatsURL string
Port string
DBName string
DBHost string
DBPort string
DBUser string
DBPass string
NatsURL string
Port string
BatchSize int
BatchTimeout int
DBName string
DBHost string
DBPort string
DBUser string
DBPass string
}
func main() {
cfg, clientCfg := loadConfigs()
logger := log.New(os.Stdout)
cfg, clientCfg := loadConfigs(logger)
nc, err := nats.Connect(cfg.NatsURL)
if err != nil {
@ -73,7 +80,8 @@ func main() {
}
defer client.Close()
repo, err := influxdb.New(client, cfg.DBName)
timeout := time.Duration(cfg.BatchTimeout) * time.Second
repo, err := influxdb.New(client, cfg.DBName, cfg.BatchSize, timeout)
if err != nil {
logger.Error(fmt.Sprintf("Failed to create InfluxDB writer: %s", err))
os.Exit(1)
@ -100,7 +108,7 @@ func main() {
logger.Error(fmt.Sprintf("InfluxDB writer service terminated: %s", err))
}
func loadConfigs() (config, influxdata.HTTPConfig) {
func loadConfigs(logger log.Logger) (config, influxdata.HTTPConfig) {
cfg := config{
NatsURL: mainflux.Env(envNatsURL, defNatsURL),
Port: mainflux.Env(envPort, defPort),
@ -111,6 +119,19 @@ func loadConfigs() (config, influxdata.HTTPConfig) {
DBPass: mainflux.Env(envDBPass, defDBPass),
}
var err error
cfg.BatchSize, err = strconv.Atoi(mainflux.Env(envBatchSize, defBatchSize))
if err != nil {
logger.Error(fmt.Sprintf("Invalid value of batch size: %s", err))
os.Exit(1)
}
cfg.BatchTimeout, err = strconv.Atoi(mainflux.Env(envBatchTimeout, defBatchTimeout))
if err != nil {
logger.Error(fmt.Sprintf("Invalid value for batch timeout: %s", err))
os.Exit(1)
}
clientCfg := influxdata.HTTPConfig{
Addr: fmt.Sprintf("http://%s:%s", cfg.DBHost, cfg.DBPort),
Username: cfg.DBUser,
@ -140,6 +161,6 @@ func makeMetrics() (*kitprometheus.Counter, *kitprometheus.Summary) {
func startHTTPService(port string, logger log.Logger, errs chan error) {
p := fmt.Sprintf(":%s", port)
logger.Info(fmt.Sprintf("Influxdb writer service started, exposed port %s", p))
logger.Info(fmt.Sprintf("InfluxDB writer service started, exposed port %s", p))
errs <- http.ListenAndServe(p, influxdb.MakeHandler())
}

View File

@ -35,6 +35,8 @@ services:
environment:
MF_NATS_URL: nats://nats:4222
MF_INFLUX_WRITER_PORT: 8900
MF_INFLUX_WRITER_BATCH_SIZE: 5000
MF_INFLUX_WRITER_BATCH_TIMEOUT: 5
MF_INFLUX_WRITER_DB_NAME: mainflux
MF_INFLUX_WRITER_DB_HOST: mainflux-influxdb
MF_INFLUX_WRITER_DB_PORT: 8086

View File

@ -12,8 +12,8 @@ spec:
component: influxdb-writer
spec:
containers:
- name: mainflux-influxdb
image: mainflux/influxdb:latest
- name: mainflux-influxdb-writer
image: mainflux/influxdb-writer:latest
imagePullPolicy: Always
ports:
- containerPort: 8900
@ -22,6 +22,10 @@ spec:
value: "nats://nats:4222"
- name: MF_INFLUX_WRITER_PORT
value: "8900"
- name: MF_INFLUX_WRITER_BATCH_SIZE
value: "5000"
- name: MF_INFLUX_WRITER_BATCH_TIMEOUT
value: "5"
- name: MF_INFLUX_WRITER_DB_NAME
value: "mainflux"
- name: MF_INFLUX_WRITER_DB_HOST

View File

@ -4,6 +4,7 @@ import (
"fmt"
"os"
"testing"
"time"
influxdata "github.com/influxdata/influxdb/client/v2"
"github.com/mainflux/mainflux"
@ -40,7 +41,7 @@ func TestReadAll(t *testing.T) {
client, err := influxdata.NewHTTPClient(clientCfg)
require.Nil(t, err, fmt.Sprintf("Creating new InfluxDB client expected to succeed: %s.\n", err))
writer, err := writer.New(client, testDB)
writer, err := writer.New(client, testDB, 1, time.Second)
require.Nil(t, err, fmt.Sprintf("Creating new InfluxDB writer expected to succeed: %s.\n", err))
messages := []mainflux.Message{}

View File

@ -4,7 +4,13 @@ Writers provide an implementation of various `message writers`.
Message writers are services that consume normalized (in `SenML` format)
Mainflux messages and store them in specific data store.
Writers are optional services and are treated as a plugins. In order to
run writer services, core services must be up and running. For more info
on the platform core services with its dependencies, please check out
the [Docker Compose][compose] file.
For an in-depth explanation of the usage of `writers`, as well as thorough
understanding of Mainflux, please check out the [official documentation][doc].
[doc]: http://mainflux.readthedocs.io
[compose]: ../docker/docker-compose.yml

View File

@ -8,15 +8,17 @@ The service is configured using the environment variables presented in the
following table. Note that any unset variables will be replaced with their
default values.
| Variable | Description | Default |
|---------------------------|-----------------------------------|-----------------------|
| MF_NATS_URL | NATS instance URL | nats://localhost:4222 |
| MF_INFLUX_WRITER_PORT | Service HTTP port | 8180 |
| MF_INFLUX_WRITER_DB_NAME | InfluxDB database name | mainflux |
| MF_INFLUX_WRITER_DB_HOST | InfluxDB host | localhost |
| MF_INFLUX_WRITER_DB_PORT | Default port of InfluxDB database | 8086 |
| MF_INFLUX_WRITER_DB_USER | Default user of InfluxDB database | mainflux |
| MF_INFLUX_WRITER_DB_PASS | Default password of InfluxDB user | mainflux |
| Variable | Description | Default |
|--------------------------------|---------------------------------------------|-----------------------|
| MF_NATS_URL | NATS instance URL | nats://localhost:4222 |
| MF_INFLUX_WRITER_PORT | Service HTTP port | 8180 |
| MF_INFLUX_WRITER_BATCH_SIZE | Size of the writer points batch | 5000 |
| MF_INFLUX_WRITER_BATCH_TIMEOUT | Time interval in seconds to flush the batch | 1 second |
| MF_INFLUX_WRITER_DB_NAME | InfluxDB database name | mainflux |
| MF_INFLUX_WRITER_DB_HOST | InfluxDB host | localhost |
| MF_INFLUX_WRITER_DB_PORT | Default port of InfluxDB database | 8086 |
| MF_INFLUX_WRITER_DB_USER | Default user of InfluxDB database | mainflux |
| MF_INFLUX_WRITER_DB_PASS | Default password of InfluxDB user | mainflux |
## Deployment
@ -31,6 +33,8 @@ default values.
environment:
MF_NATS_URL: [NATS instance URL]
MF_INFLUX_WRITER_PORT: [Service HTTP port]
MF_INFLUX_WRITER_BATCH_SIZE: [Size of the writer points batch]
MF_INFLUX_WRITER_BATCH_TIMEOUT: [Time interval in seconds to flush the batch]
MF_INFLUX_WRITER_DB_NAME: [InfluxDB name]
MF_INFLUX_WRITER_DB_HOST: [InfluxDB host]
MF_INFLUX_WRITER_DB_PORT: [InfluxDB port]
@ -56,7 +60,7 @@ make influxdb
make install
# Set the environment variables and run the service
MF_NATS_URL=[NATS instance URL] MF_INFLUX_WRITER_PORT=[Service HTTP port] MF_INFLUX_WRITER_DB_NAME=[InfluxDB database name] MF_INFLUX_WRITER_DB_HOST=[InfluxDB database host] MF_INFLUX_WRITER_DB_PORT=[InfluxDB database port] MF_INFLUX_WRITER_DB_USER=[InfluxDB admin user] MF_INFLUX_WRITER_DB_PASS=[InfluxDB admin password] $GOBIN/mainflux-influxdb
MF_NATS_URL=[NATS instance URL] MF_INFLUX_WRITER_PORT=[Service HTTP port] MF_INFLUX_WRITER_BATCH_SIZE=[Size of the writer points batch] MF_INFLUX_WRITER_BATCH_TIMEOUT=[Time interval in seconds to flush the batch] MF_INFLUX_WRITER_DB_NAME=[InfluxDB database name] MF_INFLUX_WRITER_DB_HOST=[InfluxDB database host] MF_INFLUX_WRITER_DB_PORT=[InfluxDB database port] MF_INFLUX_WRITER_DB_USER=[InfluxDB admin user] MF_INFLUX_WRITER_DB_PASS=[InfluxDB admin password] $GOBIN/mainflux-influxdb
```
@ -65,13 +69,13 @@ MF_NATS_URL=[NATS instance URL] MF_INFLUX_WRITER_PORT=[Service HTTP port] MF_INF
This service can be deployed using docker containers.
Docker compose file is available in `<project_root>/docker/addons/influxdb-writer/docker-compose.yml`. Besides database
and writer service, it contains [Grafana platform](https://grafana.com/) which can be used for database
exploration and data visualization and analytics. In order to run all Mainflux core services, as well as mentioned optional ones, execute following command:
exploration and data visualization and analytics. In order to run Mainflux InfluxDB writer, execute the following command:
```bash
docker-compose -f docker/docker-compose.yml -f docker/addons/influxdb-writer/docker-compose.yml up -d
docker-compose -f docker/addons/influxdb-writer/docker-compose.yml up -d
```
_Please note that order matters here. You need to start core services before additional ones, i. e. core services compose file needs to be the first param of the command. Since all services need to be in the same network and writer services are dependent of core ones, you need to start all of them using single command._
_Please note that you need to start core services before the additional ones._
## Usage

View File

@ -9,6 +9,7 @@ package influxdb
import (
"strconv"
"sync"
"time"
"github.com/mainflux/mainflux/writers"
@ -22,34 +23,75 @@ const pointName = "messages"
var _ writers.MessageRepository = (*influxRepo)(nil)
type influxRepo struct {
database string
client influxdata.Client
client influxdata.Client
batch []*influxdata.Point
batchSize int
mu sync.Mutex
tick <-chan time.Time
cfg influxdata.BatchPointsConfig
}
type fields map[string]interface{}
type tags map[string]string
// New returns new InfluxDB writer.
func New(client influxdata.Client, database string) (writers.MessageRepository, error) {
return &influxRepo{database, client}, nil
func New(client influxdata.Client, database string, batchSize int, batchTimeout time.Duration) (writers.MessageRepository, error) {
repo := &influxRepo{
client: client,
cfg: influxdata.BatchPointsConfig{
Database: database,
},
batchSize: batchSize,
batch: []*influxdata.Point{},
}
repo.tick = time.NewTicker(batchTimeout).C
go func() {
for {
<-repo.tick
repo.save()
}
}()
return repo, nil
}
func (repo *influxRepo) Save(msg mainflux.Message) error {
bp, err := influxdata.NewBatchPoints(influxdata.BatchPointsConfig{
Database: repo.database,
})
func (repo *influxRepo) save() error {
repo.mu.Lock()
defer repo.mu.Unlock()
bp, err := influxdata.NewBatchPoints(repo.cfg)
if err != nil {
return err
}
bp.AddPoints(repo.batch)
if err := repo.client.Write(bp); err != nil {
return err
}
// It would be nice to reset ticker at this point, which
// implies creating a new ticker and goroutine. It would
// introduce unnecessary complexity with no justified benefits.
repo.batch = []*influxdata.Point{}
return nil
}
func (repo *influxRepo) Save(msg mainflux.Message) error {
tags, fields := repo.tagsOf(&msg), repo.fieldsOf(&msg)
pt, err := influxdata.NewPoint(pointName, tags, fields, time.Now())
if err != nil {
return err
}
bp.AddPoint(pt)
return repo.client.Write(bp)
repo.mu.Lock()
repo.batch = append(repo.batch, pt)
repo.mu.Unlock()
if len(repo.batch)%repo.batchSize == 0 {
return repo.save()
}
return nil
}
func (repo *influxRepo) tagsOf(msg *mainflux.Message) tags {
@ -57,6 +99,7 @@ func (repo *influxRepo) tagsOf(msg *mainflux.Message) tags {
update := strconv.FormatFloat(msg.UpdateTime, 'f', -1, 64)
channel := strconv.FormatUint(msg.Channel, 10)
publisher := strconv.FormatUint(msg.Publisher, 10)
return tags{
"Channel": channel,
"Publisher": publisher,

View File

@ -11,47 +11,34 @@ import (
"fmt"
"os"
"testing"
"time"
influxdata "github.com/influxdata/influxdb/client/v2"
"github.com/influxdata/influxdb/models"
"github.com/mainflux/mainflux"
log "github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/writers"
writer "github.com/mainflux/mainflux/writers/influxdb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
var (
port string
testLog = log.New(os.Stdout)
testDB = "test"
client influxdata.Client
port string
testLog = log.New(os.Stdout)
testDB = "test"
saveTimeout = 2 * time.Second
saveBatchSize = 20
streamsSize = 250
client influxdata.Client
selectMsgs = fmt.Sprintf("SELECT * FROM test..messages")
dropMsgs = fmt.Sprintf("DROP SERIES FROM messages")
clientCfg = influxdata.HTTPConfig{
Username: "test",
Password: "test",
}
)
// This is utility function to query the database.
func queryDB(cmd string) ([]models.Row, error) {
q := influxdata.Query{
Command: cmd,
Database: testDB,
}
response, err := client.Query(q)
if err != nil {
return nil, err
}
if response.Error() != nil {
return nil, response.Error()
}
// There is only one query, so only one result and
// all data are stored in the same series.
return response.Results[0].Series, nil
}
func TestSave(t *testing.T) {
msg := mainflux.Message{
msg = mainflux.Message{
Channel: 45,
Publisher: 2580,
Protocol: "http",
@ -66,21 +53,89 @@ func TestSave(t *testing.T) {
UpdateTime: 5456565466,
Link: "link",
}
)
q := fmt.Sprintf("SELECT * FROM test..messages\n")
// This is utility function to query the database.
func queryDB(cmd string) ([][]interface{}, error) {
q := influxdata.Query{
Command: cmd,
Database: testDB,
}
response, err := client.Query(q)
if err != nil {
return nil, err
}
if response.Error() != nil {
return nil, response.Error()
}
if len(response.Results[0].Series) == 0 {
return nil, nil
}
// There is only one query, so only one result and
// all data are stored in the same series.
return response.Results[0].Series[0].Values, nil
}
func TestSave(t *testing.T) {
client, err := influxdata.NewHTTPClient(clientCfg)
require.Nil(t, err, fmt.Sprintf("Creating new InfluxDB client expected to succeed: %s.\n", err))
repo, err := writer.New(client, testDB)
// Set batch size to 1 to simulate single point insert.
repo, err := writer.New(client, testDB, 1, saveTimeout)
require.Nil(t, err, fmt.Sprintf("Creating new InfluxDB repo expected to succeed: %s.\n", err))
err = repo.Save(msg)
assert.Nil(t, err, fmt.Sprintf("Save operation expected to succeed: %s.\n", err))
// Set batch size to value > 1 to simulate real batch.
repo1, err := writer.New(client, testDB, saveBatchSize, saveTimeout)
require.Nil(t, err, fmt.Sprintf("Creating new InfluxDB repo expected to succeed: %s.\n", err))
row, err := queryDB(q)
assert.Nil(t, err, fmt.Sprintf("Querying InfluxDB to retrieve data count expected to succeed: %s.\n", err))
cases := []struct {
desc string
repo writers.MessageRepository
previousMsgs int
numOfMsg int
expectedSize int
isBatch bool
}{
{
desc: "save a single message",
repo: repo,
numOfMsg: 1,
expectedSize: 1,
isBatch: false,
},
{
desc: "save a batch of messages",
repo: repo1,
numOfMsg: streamsSize,
expectedSize: streamsSize - (streamsSize % saveBatchSize),
isBatch: true,
},
}
count := len(row)
assert.Equal(t, 1, count, fmt.Sprintf("Expected to have 1 value, found %d instead.\n", count))
for _, tc := range cases {
// Clean previously saved messages.
row, err := queryDB(dropMsgs)
require.Nil(t, err, fmt.Sprintf("Cleaning data from InfluxDB expected to succeed: %s.\n", err))
for i := 0; i < tc.numOfMsg; i++ {
err := tc.repo.Save(msg)
assert.Nil(t, err, fmt.Sprintf("Save operation expected to succeed: %s.\n", err))
}
row, err = queryDB(selectMsgs)
assert.Nil(t, err, fmt.Sprintf("Querying InfluxDB to retrieve data expected to succeed: %s.\n", err))
count := len(row)
assert.Equal(t, tc.expectedSize, count, fmt.Sprintf("Expected to have %d messages saved, found %d instead.\n", tc.expectedSize, count))
if tc.isBatch {
// Sleep for `saveBatchTime` to trigger ticker and check if the reset of the data is saved.
time.Sleep(saveTimeout)
row, err = queryDB(selectMsgs)
assert.Nil(t, err, fmt.Sprintf("Querying InfluxDB to retrieve data count expected to succeed: %s.\n", err))
count = len(row)
assert.Equal(t, tc.numOfMsg, count, fmt.Sprintf("Expected to have %d messages, found %d instead.\n", tc.numOfMsg, count))
}
}
}