diff --git a/cmd/influxdb-writer/main.go b/cmd/influxdb-writer/main.go index 752c3a59..416d210c 100644 --- a/cmd/influxdb-writer/main.go +++ b/cmd/influxdb-writer/main.go @@ -12,7 +12,9 @@ import ( "net/http" "os" "os/signal" + "strconv" "syscall" + "time" kitprometheus "github.com/go-kit/kit/metrics/prometheus" influxdata "github.com/influxdata/influxdb/client/v2" @@ -27,37 +29,42 @@ import ( const ( queue = "influxdb-writer" - defNatsURL = nats.DefaultURL - defPort = "8180" - defPointName = "messages" - defDBName = "mainflux" - defDBHost = "localhost" - defDBPort = "8086" - defDBUser = "mainflux" - defDBPass = "mainflux" + defNatsURL = nats.DefaultURL + defPort = "8180" + defBatchSize = "5000" + defBatchTimeout = "5" + defDBName = "mainflux" + defDBHost = "localhost" + defDBPort = "8086" + defDBUser = "mainflux" + defDBPass = "mainflux" - envNatsURL = "MF_NATS_URL" - envPort = "MF_INFLUX_WRITER_PORT" - envDBName = "MF_INFLUX_WRITER_DB_NAME" - envDBHost = "MF_INFLUX_WRITER_DB_HOST" - envDBPort = "MF_INFLUX_WRITER_DB_PORT" - envDBUser = "MF_INFLUX_WRITER_DB_USER" - envDBPass = "MF_INFLUX_WRITER_DB_PASS" + envNatsURL = "MF_NATS_URL" + envPort = "MF_INFLUX_WRITER_PORT" + envBatchSize = "MF_INFLUX_WRITER_BATCH_SIZE" + envBatchTimeout = "MF_INFLUX_WRITER_BATCH_TIMEOUT" + envDBName = "MF_INFLUX_WRITER_DB_NAME" + envDBHost = "MF_INFLUX_WRITER_DB_HOST" + envDBPort = "MF_INFLUX_WRITER_DB_PORT" + envDBUser = "MF_INFLUX_WRITER_DB_USER" + envDBPass = "MF_INFLUX_WRITER_DB_PASS" ) type config struct { - NatsURL string - Port string - DBName string - DBHost string - DBPort string - DBUser string - DBPass string + NatsURL string + Port string + BatchSize int + BatchTimeout int + DBName string + DBHost string + DBPort string + DBUser string + DBPass string } func main() { - cfg, clientCfg := loadConfigs() logger := log.New(os.Stdout) + cfg, clientCfg := loadConfigs(logger) nc, err := nats.Connect(cfg.NatsURL) if err != nil { @@ -73,7 +80,8 @@ func main() { } defer client.Close() - repo, err := influxdb.New(client, cfg.DBName) + timeout := time.Duration(cfg.BatchTimeout) * time.Second + repo, err := influxdb.New(client, cfg.DBName, cfg.BatchSize, timeout) if err != nil { logger.Error(fmt.Sprintf("Failed to create InfluxDB writer: %s", err)) os.Exit(1) @@ -100,7 +108,7 @@ func main() { logger.Error(fmt.Sprintf("InfluxDB writer service terminated: %s", err)) } -func loadConfigs() (config, influxdata.HTTPConfig) { +func loadConfigs(logger log.Logger) (config, influxdata.HTTPConfig) { cfg := config{ NatsURL: mainflux.Env(envNatsURL, defNatsURL), Port: mainflux.Env(envPort, defPort), @@ -111,6 +119,19 @@ func loadConfigs() (config, influxdata.HTTPConfig) { DBPass: mainflux.Env(envDBPass, defDBPass), } + var err error + cfg.BatchSize, err = strconv.Atoi(mainflux.Env(envBatchSize, defBatchSize)) + if err != nil { + logger.Error(fmt.Sprintf("Invalid value of batch size: %s", err)) + os.Exit(1) + } + + cfg.BatchTimeout, err = strconv.Atoi(mainflux.Env(envBatchTimeout, defBatchTimeout)) + if err != nil { + logger.Error(fmt.Sprintf("Invalid value for batch timeout: %s", err)) + os.Exit(1) + } + clientCfg := influxdata.HTTPConfig{ Addr: fmt.Sprintf("http://%s:%s", cfg.DBHost, cfg.DBPort), Username: cfg.DBUser, @@ -140,6 +161,6 @@ func makeMetrics() (*kitprometheus.Counter, *kitprometheus.Summary) { func startHTTPService(port string, logger log.Logger, errs chan error) { p := fmt.Sprintf(":%s", port) - logger.Info(fmt.Sprintf("Influxdb writer service started, exposed port %s", p)) + logger.Info(fmt.Sprintf("InfluxDB writer service started, exposed port %s", p)) errs <- http.ListenAndServe(p, influxdb.MakeHandler()) } diff --git a/docker/addons/influxdb-writer/docker-compose.yml b/docker/addons/influxdb-writer/docker-compose.yml index c7dd189a..bfd3b7d1 100644 --- a/docker/addons/influxdb-writer/docker-compose.yml +++ b/docker/addons/influxdb-writer/docker-compose.yml @@ -35,6 +35,8 @@ services: environment: MF_NATS_URL: nats://nats:4222 MF_INFLUX_WRITER_PORT: 8900 + MF_INFLUX_WRITER_BATCH_SIZE: 5000 + MF_INFLUX_WRITER_BATCH_TIMEOUT: 5 MF_INFLUX_WRITER_DB_NAME: mainflux MF_INFLUX_WRITER_DB_HOST: mainflux-influxdb MF_INFLUX_WRITER_DB_PORT: 8086 diff --git a/k8s/addons/influxdb/writer.yml b/k8s/addons/influxdb/writer.yml index a354bd52..4d85c9ed 100644 --- a/k8s/addons/influxdb/writer.yml +++ b/k8s/addons/influxdb/writer.yml @@ -12,8 +12,8 @@ spec: component: influxdb-writer spec: containers: - - name: mainflux-influxdb - image: mainflux/influxdb:latest + - name: mainflux-influxdb-writer + image: mainflux/influxdb-writer:latest imagePullPolicy: Always ports: - containerPort: 8900 @@ -22,6 +22,10 @@ spec: value: "nats://nats:4222" - name: MF_INFLUX_WRITER_PORT value: "8900" + - name: MF_INFLUX_WRITER_BATCH_SIZE + value: "5000" + - name: MF_INFLUX_WRITER_BATCH_TIMEOUT + value: "5" - name: MF_INFLUX_WRITER_DB_NAME value: "mainflux" - name: MF_INFLUX_WRITER_DB_HOST diff --git a/readers/influxdb/messages_test.go b/readers/influxdb/messages_test.go index 85077ea9..c5942335 100644 --- a/readers/influxdb/messages_test.go +++ b/readers/influxdb/messages_test.go @@ -4,6 +4,7 @@ import ( "fmt" "os" "testing" + "time" influxdata "github.com/influxdata/influxdb/client/v2" "github.com/mainflux/mainflux" @@ -40,7 +41,7 @@ func TestReadAll(t *testing.T) { client, err := influxdata.NewHTTPClient(clientCfg) require.Nil(t, err, fmt.Sprintf("Creating new InfluxDB client expected to succeed: %s.\n", err)) - writer, err := writer.New(client, testDB) + writer, err := writer.New(client, testDB, 1, time.Second) require.Nil(t, err, fmt.Sprintf("Creating new InfluxDB writer expected to succeed: %s.\n", err)) messages := []mainflux.Message{} diff --git a/writers/README.md b/writers/README.md index 43ad3773..34f5de17 100644 --- a/writers/README.md +++ b/writers/README.md @@ -4,7 +4,13 @@ Writers provide an implementation of various `message writers`. Message writers are services that consume normalized (in `SenML` format) Mainflux messages and store them in specific data store. +Writers are optional services and are treated as a plugins. In order to +run writer services, core services must be up and running. For more info +on the platform core services with its dependencies, please check out +the [Docker Compose][compose] file. + For an in-depth explanation of the usage of `writers`, as well as thorough understanding of Mainflux, please check out the [official documentation][doc]. [doc]: http://mainflux.readthedocs.io +[compose]: ../docker/docker-compose.yml diff --git a/writers/influxdb/README.md b/writers/influxdb/README.md index a2d5d04a..563e4f98 100644 --- a/writers/influxdb/README.md +++ b/writers/influxdb/README.md @@ -8,15 +8,17 @@ The service is configured using the environment variables presented in the following table. Note that any unset variables will be replaced with their default values. -| Variable | Description | Default | -|---------------------------|-----------------------------------|-----------------------| -| MF_NATS_URL | NATS instance URL | nats://localhost:4222 | -| MF_INFLUX_WRITER_PORT | Service HTTP port | 8180 | -| MF_INFLUX_WRITER_DB_NAME | InfluxDB database name | mainflux | -| MF_INFLUX_WRITER_DB_HOST | InfluxDB host | localhost | -| MF_INFLUX_WRITER_DB_PORT | Default port of InfluxDB database | 8086 | -| MF_INFLUX_WRITER_DB_USER | Default user of InfluxDB database | mainflux | -| MF_INFLUX_WRITER_DB_PASS | Default password of InfluxDB user | mainflux | +| Variable | Description | Default | +|--------------------------------|---------------------------------------------|-----------------------| +| MF_NATS_URL | NATS instance URL | nats://localhost:4222 | +| MF_INFLUX_WRITER_PORT | Service HTTP port | 8180 | +| MF_INFLUX_WRITER_BATCH_SIZE | Size of the writer points batch | 5000 | +| MF_INFLUX_WRITER_BATCH_TIMEOUT | Time interval in seconds to flush the batch | 1 second | +| MF_INFLUX_WRITER_DB_NAME | InfluxDB database name | mainflux | +| MF_INFLUX_WRITER_DB_HOST | InfluxDB host | localhost | +| MF_INFLUX_WRITER_DB_PORT | Default port of InfluxDB database | 8086 | +| MF_INFLUX_WRITER_DB_USER | Default user of InfluxDB database | mainflux | +| MF_INFLUX_WRITER_DB_PASS | Default password of InfluxDB user | mainflux | ## Deployment @@ -31,6 +33,8 @@ default values. environment: MF_NATS_URL: [NATS instance URL] MF_INFLUX_WRITER_PORT: [Service HTTP port] + MF_INFLUX_WRITER_BATCH_SIZE: [Size of the writer points batch] + MF_INFLUX_WRITER_BATCH_TIMEOUT: [Time interval in seconds to flush the batch] MF_INFLUX_WRITER_DB_NAME: [InfluxDB name] MF_INFLUX_WRITER_DB_HOST: [InfluxDB host] MF_INFLUX_WRITER_DB_PORT: [InfluxDB port] @@ -56,7 +60,7 @@ make influxdb make install # Set the environment variables and run the service -MF_NATS_URL=[NATS instance URL] MF_INFLUX_WRITER_PORT=[Service HTTP port] MF_INFLUX_WRITER_DB_NAME=[InfluxDB database name] MF_INFLUX_WRITER_DB_HOST=[InfluxDB database host] MF_INFLUX_WRITER_DB_PORT=[InfluxDB database port] MF_INFLUX_WRITER_DB_USER=[InfluxDB admin user] MF_INFLUX_WRITER_DB_PASS=[InfluxDB admin password] $GOBIN/mainflux-influxdb +MF_NATS_URL=[NATS instance URL] MF_INFLUX_WRITER_PORT=[Service HTTP port] MF_INFLUX_WRITER_BATCH_SIZE=[Size of the writer points batch] MF_INFLUX_WRITER_BATCH_TIMEOUT=[Time interval in seconds to flush the batch] MF_INFLUX_WRITER_DB_NAME=[InfluxDB database name] MF_INFLUX_WRITER_DB_HOST=[InfluxDB database host] MF_INFLUX_WRITER_DB_PORT=[InfluxDB database port] MF_INFLUX_WRITER_DB_USER=[InfluxDB admin user] MF_INFLUX_WRITER_DB_PASS=[InfluxDB admin password] $GOBIN/mainflux-influxdb ``` @@ -65,13 +69,13 @@ MF_NATS_URL=[NATS instance URL] MF_INFLUX_WRITER_PORT=[Service HTTP port] MF_INF This service can be deployed using docker containers. Docker compose file is available in `/docker/addons/influxdb-writer/docker-compose.yml`. Besides database and writer service, it contains [Grafana platform](https://grafana.com/) which can be used for database -exploration and data visualization and analytics. In order to run all Mainflux core services, as well as mentioned optional ones, execute following command: +exploration and data visualization and analytics. In order to run Mainflux InfluxDB writer, execute the following command: ```bash -docker-compose -f docker/docker-compose.yml -f docker/addons/influxdb-writer/docker-compose.yml up -d +docker-compose -f docker/addons/influxdb-writer/docker-compose.yml up -d ``` -_Please note that order matters here. You need to start core services before additional ones, i. e. core services compose file needs to be the first param of the command. Since all services need to be in the same network and writer services are dependent of core ones, you need to start all of them using single command._ +_Please note that you need to start core services before the additional ones._ ## Usage diff --git a/writers/influxdb/messages.go b/writers/influxdb/messages.go index fa647a80..a2d0dbbf 100644 --- a/writers/influxdb/messages.go +++ b/writers/influxdb/messages.go @@ -9,6 +9,7 @@ package influxdb import ( "strconv" + "sync" "time" "github.com/mainflux/mainflux/writers" @@ -22,34 +23,75 @@ const pointName = "messages" var _ writers.MessageRepository = (*influxRepo)(nil) type influxRepo struct { - database string - client influxdata.Client + client influxdata.Client + batch []*influxdata.Point + batchSize int + mu sync.Mutex + tick <-chan time.Time + cfg influxdata.BatchPointsConfig } type fields map[string]interface{} type tags map[string]string // New returns new InfluxDB writer. -func New(client influxdata.Client, database string) (writers.MessageRepository, error) { - return &influxRepo{database, client}, nil +func New(client influxdata.Client, database string, batchSize int, batchTimeout time.Duration) (writers.MessageRepository, error) { + repo := &influxRepo{ + client: client, + cfg: influxdata.BatchPointsConfig{ + Database: database, + }, + batchSize: batchSize, + batch: []*influxdata.Point{}, + } + + repo.tick = time.NewTicker(batchTimeout).C + go func() { + for { + <-repo.tick + repo.save() + } + }() + + return repo, nil } -func (repo *influxRepo) Save(msg mainflux.Message) error { - bp, err := influxdata.NewBatchPoints(influxdata.BatchPointsConfig{ - Database: repo.database, - }) +func (repo *influxRepo) save() error { + repo.mu.Lock() + defer repo.mu.Unlock() + bp, err := influxdata.NewBatchPoints(repo.cfg) if err != nil { return err } + bp.AddPoints(repo.batch) + + if err := repo.client.Write(bp); err != nil { + return err + } + + // It would be nice to reset ticker at this point, which + // implies creating a new ticker and goroutine. It would + // introduce unnecessary complexity with no justified benefits. + repo.batch = []*influxdata.Point{} + return nil +} + +func (repo *influxRepo) Save(msg mainflux.Message) error { tags, fields := repo.tagsOf(&msg), repo.fieldsOf(&msg) pt, err := influxdata.NewPoint(pointName, tags, fields, time.Now()) if err != nil { return err } - bp.AddPoint(pt) - return repo.client.Write(bp) + repo.mu.Lock() + repo.batch = append(repo.batch, pt) + repo.mu.Unlock() + + if len(repo.batch)%repo.batchSize == 0 { + return repo.save() + } + return nil } func (repo *influxRepo) tagsOf(msg *mainflux.Message) tags { @@ -57,6 +99,7 @@ func (repo *influxRepo) tagsOf(msg *mainflux.Message) tags { update := strconv.FormatFloat(msg.UpdateTime, 'f', -1, 64) channel := strconv.FormatUint(msg.Channel, 10) publisher := strconv.FormatUint(msg.Publisher, 10) + return tags{ "Channel": channel, "Publisher": publisher, diff --git a/writers/influxdb/messages_test.go b/writers/influxdb/messages_test.go index f2ebe95c..3936425a 100644 --- a/writers/influxdb/messages_test.go +++ b/writers/influxdb/messages_test.go @@ -11,47 +11,34 @@ import ( "fmt" "os" "testing" + "time" influxdata "github.com/influxdata/influxdb/client/v2" - "github.com/influxdata/influxdb/models" "github.com/mainflux/mainflux" log "github.com/mainflux/mainflux/logger" + "github.com/mainflux/mainflux/writers" writer "github.com/mainflux/mainflux/writers/influxdb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) var ( - port string - testLog = log.New(os.Stdout) - testDB = "test" - client influxdata.Client + port string + testLog = log.New(os.Stdout) + testDB = "test" + saveTimeout = 2 * time.Second + saveBatchSize = 20 + streamsSize = 250 + client influxdata.Client + selectMsgs = fmt.Sprintf("SELECT * FROM test..messages") + dropMsgs = fmt.Sprintf("DROP SERIES FROM messages") + clientCfg = influxdata.HTTPConfig{ Username: "test", Password: "test", } -) -// This is utility function to query the database. -func queryDB(cmd string) ([]models.Row, error) { - q := influxdata.Query{ - Command: cmd, - Database: testDB, - } - response, err := client.Query(q) - if err != nil { - return nil, err - } - if response.Error() != nil { - return nil, response.Error() - } - // There is only one query, so only one result and - // all data are stored in the same series. - return response.Results[0].Series, nil -} - -func TestSave(t *testing.T) { - msg := mainflux.Message{ + msg = mainflux.Message{ Channel: 45, Publisher: 2580, Protocol: "http", @@ -66,21 +53,89 @@ func TestSave(t *testing.T) { UpdateTime: 5456565466, Link: "link", } +) - q := fmt.Sprintf("SELECT * FROM test..messages\n") +// This is utility function to query the database. +func queryDB(cmd string) ([][]interface{}, error) { + q := influxdata.Query{ + Command: cmd, + Database: testDB, + } + response, err := client.Query(q) + if err != nil { + return nil, err + } + if response.Error() != nil { + return nil, response.Error() + } + if len(response.Results[0].Series) == 0 { + return nil, nil + } + // There is only one query, so only one result and + // all data are stored in the same series. + return response.Results[0].Series[0].Values, nil +} +func TestSave(t *testing.T) { client, err := influxdata.NewHTTPClient(clientCfg) require.Nil(t, err, fmt.Sprintf("Creating new InfluxDB client expected to succeed: %s.\n", err)) - repo, err := writer.New(client, testDB) + // Set batch size to 1 to simulate single point insert. + repo, err := writer.New(client, testDB, 1, saveTimeout) require.Nil(t, err, fmt.Sprintf("Creating new InfluxDB repo expected to succeed: %s.\n", err)) - err = repo.Save(msg) - assert.Nil(t, err, fmt.Sprintf("Save operation expected to succeed: %s.\n", err)) + // Set batch size to value > 1 to simulate real batch. + repo1, err := writer.New(client, testDB, saveBatchSize, saveTimeout) + require.Nil(t, err, fmt.Sprintf("Creating new InfluxDB repo expected to succeed: %s.\n", err)) - row, err := queryDB(q) - assert.Nil(t, err, fmt.Sprintf("Querying InfluxDB to retrieve data count expected to succeed: %s.\n", err)) + cases := []struct { + desc string + repo writers.MessageRepository + previousMsgs int + numOfMsg int + expectedSize int + isBatch bool + }{ + { + desc: "save a single message", + repo: repo, + numOfMsg: 1, + expectedSize: 1, + isBatch: false, + }, + { + desc: "save a batch of messages", + repo: repo1, + numOfMsg: streamsSize, + expectedSize: streamsSize - (streamsSize % saveBatchSize), + isBatch: true, + }, + } - count := len(row) - assert.Equal(t, 1, count, fmt.Sprintf("Expected to have 1 value, found %d instead.\n", count)) + for _, tc := range cases { + // Clean previously saved messages. + row, err := queryDB(dropMsgs) + require.Nil(t, err, fmt.Sprintf("Cleaning data from InfluxDB expected to succeed: %s.\n", err)) + + for i := 0; i < tc.numOfMsg; i++ { + err := tc.repo.Save(msg) + assert.Nil(t, err, fmt.Sprintf("Save operation expected to succeed: %s.\n", err)) + } + + row, err = queryDB(selectMsgs) + assert.Nil(t, err, fmt.Sprintf("Querying InfluxDB to retrieve data expected to succeed: %s.\n", err)) + + count := len(row) + assert.Equal(t, tc.expectedSize, count, fmt.Sprintf("Expected to have %d messages saved, found %d instead.\n", tc.expectedSize, count)) + + if tc.isBatch { + // Sleep for `saveBatchTime` to trigger ticker and check if the reset of the data is saved. + time.Sleep(saveTimeout) + + row, err = queryDB(selectMsgs) + assert.Nil(t, err, fmt.Sprintf("Querying InfluxDB to retrieve data count expected to succeed: %s.\n", err)) + count = len(row) + assert.Equal(t, tc.numOfMsg, count, fmt.Sprintf("Expected to have %d messages, found %d instead.\n", tc.numOfMsg, count)) + } + } }