// Copyright (c) Mainflux // SPDX-License-Identifier: Apache-2.0 package cassandra import ( "fmt" "github.com/gocql/gocql" "github.com/mainflux/mainflux/errors" "github.com/mainflux/mainflux/readers" "github.com/mainflux/mainflux/transformers/senml" ) var errReadMessages = errors.New("faled to read messages from cassandra database") var _ readers.MessageRepository = (*cassandraRepository)(nil) type cassandraRepository struct { session *gocql.Session } // New instantiates Cassandra message repository. func New(session *gocql.Session) readers.MessageRepository { return cassandraRepository{ session: session, } } func (cr cassandraRepository) ReadAll(chanID string, offset, limit uint64, query map[string]string) (readers.MessagesPage, error) { names := []string{} vals := []interface{}{chanID} for name, val := range query { names = append(names, name) vals = append(vals, val) } vals = append(vals, offset+limit) selectCQL := buildSelectQuery(chanID, offset, limit, names) countCQL := buildCountQuery(chanID, names) iter := cr.session.Query(selectCQL, vals...).Iter() defer iter.Close() scanner := iter.Scanner() // skip first OFFSET rows for i := uint64(0); i < offset; i++ { if !scanner.Next() { break } } page := readers.MessagesPage{ Offset: offset, Limit: limit, Messages: []senml.Message{}, } for scanner.Next() { var msg senml.Message err := scanner.Scan(&msg.Channel, &msg.Subtopic, &msg.Publisher, &msg.Protocol, &msg.Name, &msg.Unit, &msg.Value, &msg.StringValue, &msg.BoolValue, &msg.DataValue, &msg.Sum, &msg.Time, &msg.UpdateTime) if err != nil { return readers.MessagesPage{}, errors.Wrap(errReadMessages, err) } page.Messages = append(page.Messages, msg) } if err := cr.session.Query(countCQL, vals[:len(vals)-1]...).Scan(&page.Total); err != nil { return readers.MessagesPage{}, errors.Wrap(errReadMessages, err) } return page, nil } func buildSelectQuery(chanID string, offset, limit uint64, names []string) string { var condCQL string cql := `SELECT channel, subtopic, publisher, protocol, name, unit, value, string_value, bool_value, data_value, sum, time, update_time FROM messages WHERE channel = ? %s LIMIT ? ALLOW FILTERING` for _, name := range names { switch name { case "channel", "subtopic", "publisher", "name", "protocol": condCQL = fmt.Sprintf(`%s AND %s = ?`, condCQL, name) } } return fmt.Sprintf(cql, condCQL) } func buildCountQuery(chanID string, names []string) string { var condCQL string cql := `SELECT COUNT(*) FROM messages WHERE channel = ? %s ALLOW FILTERING` for _, name := range names { switch name { case "channel", "subtopic", "publisher", "name", "protocol": condCQL = fmt.Sprintf(`%s AND %s = ?`, condCQL, name) } } return fmt.Sprintf(cql, condCQL) }