From 148c1aca0a22ae2fe9ce3d61c91b9428430baedc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Du=C5=A1an=20Borov=C4=8Danin?= Date: Mon, 1 Oct 2018 17:36:53 +0200 Subject: [PATCH] MF-404 - InfluxDB batch size and timeout zero values (#411) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Add zero value check for batch size and timeout Signed-off-by: Dušan Borovčanin * Update InfluxDB writer tests Test creation of InfluxDB writer with invalid (zero) batch size and timeout. Signed-off-by: Dušan Borovčanin * Fix config load Loading batch size and timeout from env variables were missing in loadConfigs method. Signed-off-by: Dušan Borovčanin --- cmd/influxdb-writer/main.go | 18 ++++++++++-------- writers/influxdb/messages.go | 14 ++++++++++++++ writers/influxdb/messages_test.go | 31 +++++++++++++++++++++++++++++++ 3 files changed, 55 insertions(+), 8 deletions(-) diff --git a/cmd/influxdb-writer/main.go b/cmd/influxdb-writer/main.go index d51efeb9..19564287 100644 --- a/cmd/influxdb-writer/main.go +++ b/cmd/influxdb-writer/main.go @@ -128,14 +128,16 @@ func main() { func loadConfigs() (config, influxdata.HTTPConfig) { cfg := config{ - NatsURL: mainflux.Env(envNatsURL, defNatsURL), - LogLevel: mainflux.Env(envLogLevel, defLogLevel), - Port: mainflux.Env(envPort, defPort), - DBName: mainflux.Env(envDBName, defDBName), - DBHost: mainflux.Env(envDBHost, defDBHost), - DBPort: mainflux.Env(envDBPort, defDBPort), - DBUser: mainflux.Env(envDBUser, defDBUser), - DBPass: mainflux.Env(envDBPass, defDBPass), + NatsURL: mainflux.Env(envNatsURL, defNatsURL), + LogLevel: mainflux.Env(envLogLevel, defLogLevel), + Port: mainflux.Env(envPort, defPort), + BatchSize: mainflux.Env(envBatchSize, defBatchSize), + BatchTimeout: mainflux.Env(envBatchTimeout, defBatchTimeout), + DBName: mainflux.Env(envDBName, defDBName), + DBHost: mainflux.Env(envDBHost, defDBHost), + DBPort: mainflux.Env(envDBPort, defDBPort), + DBUser: mainflux.Env(envDBUser, defDBUser), + DBPass: mainflux.Env(envDBPass, defDBPass), } clientCfg := influxdata.HTTPConfig{ diff --git a/writers/influxdb/messages.go b/writers/influxdb/messages.go index a2d0dbbf..40fec9d8 100644 --- a/writers/influxdb/messages.go +++ b/writers/influxdb/messages.go @@ -8,6 +8,7 @@ package influxdb import ( + "errors" "strconv" "sync" "time" @@ -22,6 +23,11 @@ const pointName = "messages" var _ writers.MessageRepository = (*influxRepo)(nil) +var ( + errZeroValueSize = errors.New("zero value batch size") + errZeroValueTimeout = errors.New("zero value batch timeout") +) + type influxRepo struct { client influxdata.Client batch []*influxdata.Point @@ -36,6 +42,14 @@ type tags map[string]string // New returns new InfluxDB writer. func New(client influxdata.Client, database string, batchSize int, batchTimeout time.Duration) (writers.MessageRepository, error) { + if batchSize == 0 { + return &influxRepo{}, errZeroValueSize + } + + if batchTimeout == 0 { + return &influxRepo{}, errZeroValueTimeout + } + repo := &influxRepo{ client: client, cfg: influxdata.BatchPointsConfig{ diff --git a/writers/influxdb/messages_test.go b/writers/influxdb/messages_test.go index e28a49bb..65c1e706 100644 --- a/writers/influxdb/messages_test.go +++ b/writers/influxdb/messages_test.go @@ -75,6 +75,37 @@ func queryDB(cmd string) ([][]interface{}, error) { return response.Results[0].Series[0].Values, nil } +func TestNewWriter(t *testing.T) { + client, err := influxdata.NewHTTPClient(clientCfg) + require.Nil(t, err, fmt.Sprintf("Creating new InfluxDB client expected to succeed: %s.\n", err)) + + cases := []struct { + desc string + batchSize int + err error + batchTimeout time.Duration + errText string + }{ + { + desc: "Create writer with zero value batch size", + batchSize: 0, + batchTimeout: time.Duration(5 * time.Second), + errText: "zero value batch size", + }, + { + desc: "Create writer with zero value batch timeout", + batchSize: 5, + batchTimeout: time.Duration(0 * time.Second), + errText: "zero value batch timeout", + }, + } + + for _, tc := range cases { + _, err := writer.New(client, testDB, tc.batchSize, tc.batchTimeout) + assert.Equal(t, tc.errText, err.Error(), fmt.Sprintf("%s expected to have error \"%s\", but got \"%s\"", tc.desc, tc.errText, err)) + } +} + func TestSave(t *testing.T) { client, err := influxdata.NewHTTPClient(clientCfg) require.Nil(t, err, fmt.Sprintf("Creating new InfluxDB client expected to succeed: %s.\n", err))