1
0
mirror of https://github.com/mainflux/mainflux.git synced 2025-04-24 13:48:49 +08:00

MF-313 - Implement basic Cassandra reader (#331)

* Fix logger message in http service

Signed-off-by: Aleksandar Novakovic <aleksandar.novakovic@mainflux.com>

* Inline query and error handling in cassandra writer

Signed-off-by: Aleksandar Novakovic <aleksandar.novakovic@mainflux.com>

* Fix comments and import statement in writer interface

Signed-off-by: Aleksandar Novakovic <aleksandar.novakovic@mainflux.com>

* Add reader common interface and shared HTTP API

Signed-off-by: Aleksandar Novakovic <aleksandar.novakovic@mainflux.com>

* Add Cassandra reader implementation

Signed-off-by: Aleksandar Novakovic <aleksandar.novakovic@mainflux.com>

* Add tests for cassandra reader

Signed-off-by: Aleksandar Novakovic <aleksandar.novakovic@mainflux.com>

* Add swagger doc and readme for readers

Signed-off-by: Aleksandar Novakovic <aleksandar.novakovic@mainflux.com>

* Update make file

Signed-off-by: Aleksandar Novakovic <aleksandar.novakovic@mainflux.com>

* Add docker-compose configuration for cassandra reader

Signed-off-by: Aleksandar Novakovic <aleksandar.novakovic@mainflux.com>

* Add readme file to cassandra reader

Signed-off-by: Aleksandar Novakovic <aleksandar.novakovic@mainflux.com>
This commit is contained in:
Aleksandar Novaković 2018-08-06 17:06:55 +02:00 committed by GitHub
parent 29d146636a
commit 6d48446c16
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 1173 additions and 21 deletions

View File

@ -1,5 +1,5 @@
BUILD_DIR = build
SERVICES = users things http normalizer ws influxdb mongodb cassandra
SERVICES = users things http normalizer ws influxdb mongodb cassandra-writer cassandra-reader
DOCKERS = $(addprefix docker_,$(SERVICES))
CGO_ENABLED ?= 0
GOOS ?= linux

View File

@ -0,0 +1,127 @@
package main
import (
"fmt"
"net/http"
"os"
"os/signal"
"strings"
"syscall"
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
"github.com/gocql/gocql"
"github.com/mainflux/mainflux"
log "github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/readers"
"github.com/mainflux/mainflux/readers/api"
"github.com/mainflux/mainflux/readers/cassandra"
thingsapi "github.com/mainflux/mainflux/things/api/grpc"
stdprometheus "github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc"
)
const (
sep = ","
defPort = "8180"
defCluster = "127.0.0.1"
defKeyspace = "mainflux"
defThingsURL = "localhost:8181"
envPort = "MF_CASSANDRA_READER_PORT"
envCluster = "MF_CASSANDRA_READER_DB_CLUSTER"
envKeyspace = "MF_CASSANDRA_READER_DB_KEYSPACE"
envThingsURL = "MF_THINGS_URL"
)
type config struct {
port string
cluster string
keyspace string
thingsURL string
}
func main() {
cfg := loadConfig()
logger := log.New(os.Stdout)
session := connectToCassandra(cfg.cluster, cfg.keyspace, logger)
defer session.Close()
conn := connectToThings(cfg.thingsURL, logger)
defer conn.Close()
tc := thingsapi.NewClient(conn)
repo := newService(session, logger)
errs := make(chan error, 2)
go startHTTPServer(repo, tc, cfg.port, errs, logger)
go func() {
c := make(chan os.Signal)
signal.Notify(c, syscall.SIGINT)
errs <- fmt.Errorf("%s", <-c)
}()
err := <-errs
logger.Error(fmt.Sprintf("Cassandra writer service terminated: %s", err))
}
func loadConfig() config {
return config{
port: mainflux.Env(envPort, defPort),
cluster: mainflux.Env(envCluster, defCluster),
keyspace: mainflux.Env(envKeyspace, defKeyspace),
thingsURL: mainflux.Env(envThingsURL, defThingsURL),
}
}
func connectToCassandra(cluster, keyspace string, logger log.Logger) *gocql.Session {
session, err := cassandra.Connect(strings.Split(cluster, sep), keyspace)
if err != nil {
logger.Error(fmt.Sprintf("Failed to connect to Cassandra cluster: %s", err))
os.Exit(1)
}
return session
}
func connectToThings(url string, logger log.Logger) *grpc.ClientConn {
conn, err := grpc.Dial(url, grpc.WithInsecure())
if err != nil {
logger.Error(fmt.Sprintf("Failed to connect to things service: %s", err))
os.Exit(1)
}
return conn
}
func newService(session *gocql.Session, logger log.Logger) readers.MessageRepository {
repo := cassandra.New(session)
repo = api.LoggingMiddleware(repo, logger)
repo = api.MetricsMiddleware(
repo,
kitprometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: "cassandra",
Subsystem: "message_reader",
Name: "request_count",
Help: "Number of requests received.",
}, []string{"method"}),
kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{
Namespace: "cassandra",
Subsystem: "message_reader",
Name: "request_latency_microseconds",
Help: "Total duration of requests in microseconds.",
}, []string{"method"}),
)
return repo
}
func startHTTPServer(repo readers.MessageRepository, tc mainflux.ThingsServiceClient, port string, errs chan error, logger log.Logger) {
p := fmt.Sprintf(":%s", port)
logger.Info(fmt.Sprintf("Cassandra writer service started, exposed port %s", port))
errs <- http.ListenAndServe(p, api.MakeHandler(repo, tc, "cassandra-reader"))
}

View File

@ -52,7 +52,7 @@ func main() {
conn, err := grpc.Dial(cfg.ThingsURL, grpc.WithInsecure())
if err != nil {
logger.Error(fmt.Sprintf("Failed to connect to users service: %s", err))
logger.Error(fmt.Sprintf("Failed to connect to things service: %s", err))
os.Exit(1)
}
defer conn.Close()

View File

@ -0,0 +1,28 @@
###
# This docker-compose file contains optional Cassandra and cassandra-reader. Since these are optional, this file is
# dependent of docker-compose file from <project_root>/docker. In order to run
# these optional service, execute command:
# docker-compose -f docker/docker-compose.yml -f docker/addons/cassandra-reader/docker-compose.yml up
# from project root.
###
version: "3"
networks:
docker_mainflux-base-net:
external: true
services:
cassandra-reader:
image: mainflux/cassandra-reader:latest
container_name: mainflux-cassandra-reader
restart: on-failure
environment:
MF_THINGS_URL: things:8183
MF_CASSANDRA_READER_PORT: 8903
MF_CASSANDRA_READER_DB_CLUSTER: mainflux-cassandra
MF_CASSANDRA_READER_DB_KEYSPACE: mainflux
ports:
- 8903:8903
networks:
- docker_mainflux-base-net

View File

@ -2,24 +2,29 @@
# This docker-compose file contains optional Cassandra and cassandra-writer. Since these are optional, this file is
# dependent of docker-compose file from <project_root>/docker. In order to run
# these optional service, execute command:
# docker-compose -f docker/docker-compose.yml -f docker/addons/cassandra/docker-compose.yml up
# docker-compose -f docker/docker-compose.yml -f docker/addons/cassandra-writer/docker-compose.yml up
# from project root.
###
version: "3"
networks:
docker_mainflux-base-net:
external: true
services:
cassandra:
image: cassandra:3.11.2
container_name: mainflux-cassandra
restart: on-failure
networks:
- docker_mainflux-base-net
cassandra-writer:
image: mainflux/cassandra:latest
image: mainflux/cassandra-writer:latest
container_name: mainflux-cassandra-writer
depends_on:
- cassandra
- nats
expose:
- 8902
restart: on-failure
@ -30,3 +35,5 @@ services:
MF_CASSANDRA_WRITER_DB_KEYSPACE: mainflux
ports:
- 8902:8902
networks:
- docker_mainflux-base-net

View File

@ -1,3 +1,3 @@
docker-compose -f docker/docker-compose.yml -f docker/addons/cassandra/docker-compose.yml up -d
docker-compose -f docker/addons/cassandra-writer/docker-compose.yml up -d
sleep 20
docker exec mainflux-cassandra cqlsh -e "CREATE KEYSPACE IF NOT EXISTS mainflux WITH replication = {'class':'SimpleStrategy','replication_factor':'1'};"

View File

@ -8,6 +8,10 @@
version: "3"
networks:
mainflux-base-net:
driver: bridge
services:
nginx:
image: nginx:1.13-alpine
@ -22,11 +26,15 @@ services:
- 80:80
- 443:443
- 8883:8883
networks:
- mainflux-base-net
nats:
image: nats:1.1.0
container_name: mainflux-nats
restart: on-failure
networks:
- mainflux-base-net
users-db:
image: postgres:10.2-alpine
@ -36,6 +44,8 @@ services:
POSTGRES_USER: mainflux
POSTGRES_PASSWORD: mainflux
POSTGRES_DB: users
networks:
- mainflux-base-net
users:
image: mainflux/users:latest
@ -57,6 +67,8 @@ services:
MF_USERS_SECRET: secret
ports:
- 8180:8180
networks:
- mainflux-base-net
things-db:
image: postgres:10.2-alpine
@ -66,6 +78,8 @@ services:
POSTGRES_USER: mainflux
POSTGRES_PASSWORD: mainflux
POSTGRES_DB: things
networks:
- mainflux-base-net
things:
image: mainflux/things:latest
@ -89,6 +103,8 @@ services:
MF_THINGS_SECRET: secret
ports:
- 8182:8182
networks:
- mainflux-base-net
normalizer:
image: mainflux/normalizer:latest
@ -101,6 +117,8 @@ services:
environment:
MF_NATS_URL: nats://nats:4222
MF_NORMALIZER_PORT: 8184
networks:
- mainflux-base-net
dashflux:
image: mainflux/dashflux:latest
@ -108,6 +126,8 @@ services:
restart: on-failure
ports:
- 3000:3000
networks:
- mainflux-base-net
ws-adapter:
image: mainflux/ws:latest
@ -124,6 +144,8 @@ services:
MF_THINGS_URL: things:8183
ports:
- 8186:8186
networks:
- mainflux-base-net
http-adapter:
image: mainflux/http:latest
@ -140,11 +162,15 @@ services:
MF_THINGS_URL: things:8183
ports:
- 8185:8185
networks:
- mainflux-base-net
redis:
image: redis:4.0.9-alpine
container_name: mainflux-redis
restart: on-failure
networks:
- mainflux-base-net
mqtt-adapter:
image: mainflux/mqtt:latest
@ -163,3 +189,5 @@ services:
ports:
- 1883:1883
- 8880:8880
networks:
- mainflux-base-net

10
readers/README.md Normal file
View File

@ -0,0 +1,10 @@
# Readers
Readers provide an implementation of various `message readers`.
Message readers are services that consume normalized (in `SenML` format)
Mainflux messages from data storage and opens HTTP API for message consumption.
For an in-depth explanation of the usage of `reader`, as well as thorough
understanding of Mainflux, please check out the [official documentation][doc].
[doc]: http://mainflux.readthedocs.io

22
readers/api/endpoint.go Normal file
View File

@ -0,0 +1,22 @@
package api
import (
"context"
"github.com/go-kit/kit/endpoint"
"github.com/mainflux/mainflux/readers"
)
func listMessagesEndpoint(svc readers.MessageRepository) endpoint.Endpoint {
return func(_ context.Context, request interface{}) (interface{}, error) {
req := request.(listMessagesReq)
if err := req.validate(); err != nil {
return nil, err
}
messages := svc.ReadAll(req.chanID, req.offset, req.limit)
return listMessagesRes{Messages: messages}, nil
}
}

View File

@ -0,0 +1,155 @@
package api_test
import (
"fmt"
"net/http"
"net/http/httptest"
"strconv"
"testing"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/readers"
"github.com/mainflux/mainflux/readers/api"
"github.com/mainflux/mainflux/readers/mocks"
"github.com/stretchr/testify/assert"
)
const (
svcName = "test-service"
token = "1"
invalid = "invalid"
numOfMessages = 42
chanID uint64 = 1
)
func newService() readers.MessageRepository {
messages := []mainflux.Message{}
for i := 0; i < numOfMessages; i++ {
messages = append(messages, mainflux.Message{
Channel: chanID,
Publisher: 1,
Protocol: "mqtt",
})
}
return mocks.NewMessageRepository(map[uint64][]mainflux.Message{
chanID: messages,
})
}
func newServer(repo readers.MessageRepository, tc mainflux.ThingsServiceClient) *httptest.Server {
mux := api.MakeHandler(repo, tc, svcName)
return httptest.NewServer(mux)
}
type testRequest struct {
client *http.Client
method string
url string
token string
}
func (tr testRequest) make() (*http.Response, error) {
req, err := http.NewRequest(tr.method, tr.url, nil)
if err != nil {
return nil, err
}
if tr.token != "" {
req.Header.Set("Authorization", tr.token)
}
return tr.client.Do(req)
}
func TestReadAll(t *testing.T) {
svc := newService()
tc := mocks.NewThingsService()
ts := newServer(svc, tc)
defer ts.Close()
id := strconv.FormatUint(chanID, 10)
cases := map[string]struct {
url string
token string
status int
}{
"read page with valid offset and limit": {
url: fmt.Sprintf("%s/channels/%s/messages?offset=0&limit=10", ts.URL, id),
token: token,
status: http.StatusOK,
},
"read page with negative offset": {
url: fmt.Sprintf("%s/channels/%s/messages?offset=-1&limit=10", ts.URL, id),
token: token,
status: http.StatusBadRequest,
},
"read page with negative limit": {
url: fmt.Sprintf("%s/channels/%s/messages?offset=0&limit=-10", ts.URL, id),
token: token,
status: http.StatusBadRequest,
},
"read page with zero limit": {
url: fmt.Sprintf("%s/channels/%s/messages?offset=0&limit=0", ts.URL, id),
token: token,
status: http.StatusBadRequest,
},
"read page with non-integer offset": {
url: fmt.Sprintf("%s/channels/%s/messages?offset=abc&limit=10", ts.URL, id),
token: token,
status: http.StatusBadRequest,
},
"read page with non-integer limit": {
url: fmt.Sprintf("%s/channels/%s/messages?offset=0&limit=abc", ts.URL, id),
token: token,
status: http.StatusBadRequest,
},
"read page with invalid channel id": {
url: fmt.Sprintf("%s/channels/abc/messages?offset=0&limit=10", ts.URL),
token: token,
status: http.StatusBadRequest,
},
"read page with invalid token": {
url: fmt.Sprintf("%s/channels/%s/messages?offset=0&limit=10", ts.URL, id),
token: invalid,
status: http.StatusForbidden,
},
"read page with multiple offset": {
url: fmt.Sprintf("%s/channels/%s/messages?offset=0&offset=1&limit=10", ts.URL, id),
token: token,
status: http.StatusBadRequest,
},
"read page with multiple limit": {
url: fmt.Sprintf("%s/channels/%s/messages?offset=0&limit=20&limit=10", ts.URL, id),
token: token,
status: http.StatusBadRequest,
},
"read page with empty token": {
url: fmt.Sprintf("%s/channels/%s/messages?offset=0&limit=10", ts.URL, id),
token: "",
status: http.StatusForbidden,
},
"read page with default offset": {
url: fmt.Sprintf("%s/channels/%s/messages?limit=10", ts.URL, id),
token: token,
status: http.StatusOK,
},
"read page with default limit": {
url: fmt.Sprintf("%s/channels/%s/messages?offset=0", ts.URL, id),
token: token,
status: http.StatusOK,
},
}
for desc, tc := range cases {
req := testRequest{
client: ts.Client(),
method: http.MethodGet,
url: tc.url,
token: tc.token,
}
res, err := req.make()
assert.Nil(t, err, fmt.Sprintf("%s: unexpected error %s", desc, err))
assert.Equal(t, tc.status, res.StatusCode, fmt.Sprintf("%s: expected %d got %d", desc, tc.status, res.StatusCode))
}
}

36
readers/api/logging.go Normal file
View File

@ -0,0 +1,36 @@
// +build !test
package api
import (
"fmt"
"time"
"github.com/mainflux/mainflux"
log "github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/readers"
)
var _ readers.MessageRepository = (*loggingMiddleware)(nil)
type loggingMiddleware struct {
logger log.Logger
svc readers.MessageRepository
}
// LoggingMiddleware adds logging facilities to the core service.
func LoggingMiddleware(svc readers.MessageRepository, logger log.Logger) readers.MessageRepository {
return &loggingMiddleware{
logger: logger,
svc: svc,
}
}
func (lm *loggingMiddleware) ReadAll(chanID, offset, limit uint64) []mainflux.Message {
defer func(begin time.Time) {
lm.logger.Info(fmt.Sprintf(`Method read_all for offset %d and limit %d took
%s to complete without errors.`, offset, limit, time.Since(begin)))
}(time.Now())
return lm.svc.ReadAll(chanID, offset, limit)
}

38
readers/api/metrics.go Normal file
View File

@ -0,0 +1,38 @@
// +build !test
package api
import (
"time"
"github.com/go-kit/kit/metrics"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/readers"
)
var _ readers.MessageRepository = (*metricsMiddleware)(nil)
type metricsMiddleware struct {
counter metrics.Counter
latency metrics.Histogram
svc readers.MessageRepository
}
// MetricsMiddleware instruments core service by tracking request count and
// latency.
func MetricsMiddleware(svc readers.MessageRepository, counter metrics.Counter, latency metrics.Histogram) readers.MessageRepository {
return &metricsMiddleware{
counter: counter,
latency: latency,
svc: svc,
}
}
func (mm *metricsMiddleware) ReadAll(chanID, offset, limit uint64) []mainflux.Message {
defer func(begin time.Time) {
mm.counter.With("method", "read_all").Add(1)
mm.latency.With("method", "read_all").Observe(time.Since(begin).Seconds())
}(time.Now())
return mm.svc.ReadAll(chanID, offset, limit)
}

19
readers/api/requests.go Normal file
View File

@ -0,0 +1,19 @@
package api
type apiReq interface {
validate() error
}
type listMessagesReq struct {
chanID uint64
offset uint64
limit uint64
}
func (req listMessagesReq) validate() error {
if req.limit < 1 {
return errInvalidRequest
}
return nil
}

25
readers/api/responses.go Normal file
View File

@ -0,0 +1,25 @@
package api
import (
"net/http"
"github.com/mainflux/mainflux"
)
var _ mainflux.Response = (*listMessagesRes)(nil)
type listMessagesRes struct {
Messages []mainflux.Message `json:"messages"`
}
func (res listMessagesRes) Headers() map[string]string {
return map[string]string{}
}
func (res listMessagesRes) Code() int {
return http.StatusOK
}
func (res listMessagesRes) Empty() bool {
return false
}

150
readers/api/transport.go Normal file
View File

@ -0,0 +1,150 @@
package api
import (
"context"
"encoding/json"
"errors"
"net/http"
"strconv"
"time"
kithttp "github.com/go-kit/kit/transport/http"
"github.com/go-zoo/bone"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/readers"
"github.com/prometheus/client_golang/prometheus/promhttp"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
const (
contentType = "application/json"
defLimit = 10
defOffset = 0
)
var (
errInvalidRequest = errors.New("received invalid request")
errUnauthorizedAccess = errors.New("missing or invalid credentials provided")
auth mainflux.ThingsServiceClient
)
// MakeHandler returns a HTTP handler for API endpoints.
func MakeHandler(svc readers.MessageRepository, tc mainflux.ThingsServiceClient, svcName string) http.Handler {
auth = tc
opts := []kithttp.ServerOption{
kithttp.ServerErrorEncoder(encodeError),
}
mux := bone.New()
mux.Get("/channels/:chanID/messages", kithttp.NewServer(
listMessagesEndpoint(svc),
decodeList,
encodeResponse,
opts...,
))
mux.GetFunc("/version", mainflux.Version(svcName))
mux.Handle("/metrics", promhttp.Handler())
return mux
}
func decodeList(_ context.Context, r *http.Request) (interface{}, error) {
chanID, err := strconv.ParseUint(bone.GetValue(r, "chanID"), 10, 64)
if err != nil {
return nil, errInvalidRequest
}
if err := authorize(r, chanID); err != nil {
return nil, err
}
offset, err := getQuery(r, "offset", defOffset)
if err != nil {
return nil, err
}
limit, err := getQuery(r, "limit", defLimit)
if err != nil {
return nil, err
}
req := listMessagesReq{
chanID: chanID,
offset: offset,
limit: limit,
}
return req, nil
}
func encodeResponse(_ context.Context, w http.ResponseWriter, response interface{}) error {
w.Header().Set("Content-Type", contentType)
if ar, ok := response.(mainflux.Response); ok {
for k, v := range ar.Headers() {
w.Header().Set(k, v)
}
w.WriteHeader(ar.Code())
if ar.Empty() {
return nil
}
}
return json.NewEncoder(w).Encode(response)
}
func encodeError(_ context.Context, err error, w http.ResponseWriter) {
switch err {
case nil:
case errInvalidRequest:
w.WriteHeader(http.StatusBadRequest)
case errUnauthorizedAccess:
w.WriteHeader(http.StatusForbidden)
default:
w.WriteHeader(http.StatusInternalServerError)
}
}
func authorize(r *http.Request, chanID uint64) error {
token := r.Header.Get("Authorization")
if token == "" {
return errUnauthorizedAccess
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
_, err := auth.CanAccess(ctx, &mainflux.AccessReq{Token: token, ChanID: chanID})
if err != nil {
e, ok := status.FromError(err)
if ok && e.Code() == codes.PermissionDenied {
return errUnauthorizedAccess
}
return err
}
return nil
}
func getQuery(req *http.Request, name string, fallback uint64) (uint64, error) {
vals := bone.GetQuery(req, name)
if len(vals) == 0 {
return fallback, nil
}
if len(vals) > 1 {
return 0, errInvalidRequest
}
val, err := strconv.ParseUint(vals[0], 10, 64)
if err != nil {
return 0, errInvalidRequest
}
return uint64(val), nil
}

View File

@ -0,0 +1,72 @@
# Cassandra reader
Cassandra reader provides message repository implementation for Cassandra.
## Configuration
The service is configured using the environment variables presented in the
following table. Note that any unset variables will be replaced with their
default values.
| Variable | Description | Default |
|---------------------------------|---------------------------------------------|----------------|
| MF_CASSANDRA_READER_PORT | Service HTTP port | 8180 |
| MF_CASSANDRA_READER_DB_CLUSTER | Cassandra cluster comma separated addresses | 127.0.0.1 |
| MF_CASSANDRA_READER_DB_KEYSPACE | Cassandra keyspace name | mainflux |
| MF_THINGS_URL | Things service URL | localhost:8181 |
## Deployment
```yaml
version: "2"
cassandra-reader:
image: mainflux/cassandra-reader:[version]
container_name: [instance name]
expose:
- [Service HTTP port]
restart: on-failure
environment:
MF_THINGS_URL: [Things service URL]
MF_CASSANDRA_READER_PORT: [Service HTTP port]
MF_CASSANDRA_READER_DB_CLUSTER: [Cassandra cluster comma separated addresses]
MF_CASSANDRA_READER_DB_KEYSPACE: [Cassandra keyspace name]
ports:
- [host machine port]:[configured HTTP port]
```
To start the service, execute the following shell script:
```bash
# download the latest version of the service
go get github.com/mainflux/mainflux
cd $GOPATH/src/github.com/mainflux/mainflux
# compile the cassandra
make cassandra-reader
# copy binary to bin
make install
# Set the environment variables and run the service
MF_THINGS_URL=[Things service URL] MF_CASSANDRA_READER_PORT=[Service HTTP port] MF_CASSANDRA_READER_DB_CLUSTER=[Cassandra cluster comma separated addresses] MF_CASSANDRA_READER_DB_KEYSPACE=[Cassandra keyspace name] $GOBIN/mainflux-cassandra-reader
```
### Using docker-compose
This service can be deployed using docker containers. Docker compose file is
available in `<project_root>/docker/addons/cassandra-reader/docker-compose.yml`.
In order to run all Mainflux core services, as well as mentioned optional ones,
execute following command:
```bash
./docker/addons/cassandra-reader/init.sh
```
## Usage
Service exposes HTTP API for fetching messages.
[doc]: ../swagger.yml

2
readers/cassandra/doc.go Normal file
View File

@ -0,0 +1,2 @@
// Package cassandra contains Cassandra specific reader implementation.
package cassandra

12
readers/cassandra/init.go Normal file
View File

@ -0,0 +1,12 @@
package cassandra
import "github.com/gocql/gocql"
// Connect establishes connection to the Cassandra cluster.
func Connect(hosts []string, keyspace string) (*gocql.Session, error) {
cluster := gocql.NewCluster(hosts...)
cluster.Keyspace = keyspace
cluster.Consistency = gocql.Quorum
return cluster.CreateSession()
}

View File

@ -0,0 +1,50 @@
package cassandra
import (
"github.com/gocql/gocql"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/readers"
)
var _ readers.MessageRepository = (*cassandraRepository)(nil)
type cassandraRepository struct {
session *gocql.Session
}
// New instantiates Cassandra message repository.
func New(session *gocql.Session) readers.MessageRepository {
return cassandraRepository{session: session}
}
func (cr cassandraRepository) ReadAll(chanID, offset, limit uint64) []mainflux.Message {
cql := `SELECT channel, publisher, protocol, name, unit,
value, string_value, bool_value, data_value, value_sum, time,
update_time, link FROM messages WHERE channel = ? LIMIT ?
ALLOW FILTERING`
iter := cr.session.Query(cql, chanID, offset+limit).Iter()
scanner := iter.Scanner()
// skip first OFFSET rows
for i := uint64(0); i < offset; i++ {
if !scanner.Next() {
break
}
}
page := []mainflux.Message{}
for scanner.Next() {
var msg mainflux.Message
scanner.Scan(&msg.Channel, &msg.Publisher, &msg.Protocol,
&msg.Name, &msg.Unit, &msg.Value, &msg.StringValue, &msg.BoolValue,
&msg.DataValue, &msg.ValueSum, &msg.Time, &msg.UpdateTime, &msg.Link)
page = append(page, msg)
}
if err := iter.Close(); err != nil {
return []mainflux.Message{}
}
return page
}

View File

@ -0,0 +1,74 @@
package cassandra_test
import (
"fmt"
"testing"
"github.com/mainflux/mainflux"
readers "github.com/mainflux/mainflux/readers/cassandra"
writers "github.com/mainflux/mainflux/writers/cassandra"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const (
keyspace = "mainflux"
chanID = 1
numOfMessages = 42
)
var (
addr = "localhost"
msg = mainflux.Message{
Channel: chanID,
Publisher: 1,
Protocol: "mqtt",
}
)
func TestReadAll(t *testing.T) {
session, err := readers.Connect([]string{addr}, keyspace)
require.Nil(t, err, fmt.Sprintf("failed to connect to Cassandra: %s", err))
defer session.Close()
writer := writers.New(session)
messages := []mainflux.Message{}
for i := 0; i < numOfMessages; i++ {
err := writer.Save(msg)
require.Nil(t, err, fmt.Sprintf("failed to store message to Cassandra: %s", err))
messages = append(messages, msg)
}
reader := readers.New(session)
cases := map[string]struct {
chanID uint64
offset uint64
limit uint64
messages []mainflux.Message
}{
"read message page for existing channel": {
chanID: chanID,
offset: 0,
limit: 10,
messages: messages[0:10],
},
"read message page for non-existent channel": {
chanID: 2,
offset: 0,
limit: 10,
messages: []mainflux.Message{},
},
"read message last page": {
chanID: chanID,
offset: 40,
limit: 10,
messages: messages[40:42],
},
}
for desc, tc := range cases {
result := reader.ReadAll(tc.chanID, tc.offset, tc.limit)
assert.ElementsMatch(t, tc.messages, result, fmt.Sprintf("%s: expected %v got %v", desc, tc.messages, result))
}
}

View File

@ -0,0 +1,72 @@
package cassandra_test
import (
"fmt"
"os"
"testing"
"github.com/gocql/gocql"
log "github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/writers/cassandra"
dockertest "gopkg.in/ory-am/dockertest.v3"
)
var logger = log.New(os.Stdout)
func TestMain(m *testing.M) {
pool, err := dockertest.NewPool("")
if err != nil {
logger.Error(fmt.Sprintf("Could not connect to docker: %s", err))
}
container, err := pool.Run("cassandra", "3.11.2", []string{})
if err != nil {
logger.Error(fmt.Sprintf("Could not start container: %s", err))
}
port := container.GetPort("9042/tcp")
addr = fmt.Sprintf("%s:%s", addr, port)
err = pool.Retry(func() error {
if err := createKeyspace([]string{addr}); err != nil {
return err
}
session, err := cassandra.Connect([]string{addr}, keyspace)
if err != nil {
return err
}
defer session.Close()
return nil
})
if err != nil {
logger.Error(fmt.Sprintf("Could not connect to docker: %s", err))
os.Exit(1)
}
code := m.Run()
if err := pool.Purge(container); err != nil {
logger.Error(fmt.Sprintf("Could not purge container: %s", err))
}
os.Exit(code)
}
func createKeyspace(hosts []string) error {
cluster := gocql.NewCluster(hosts...)
cluster.Consistency = gocql.Quorum
session, err := cluster.CreateSession()
if err != nil {
return err
}
defer session.Close()
keyspaceCQL := fmt.Sprintf(`CREATE KEYSPACE IF NOT EXISTS %s WITH replication =
{'class':'SimpleStrategy','replication_factor':'1'}`, keyspace)
return session.Query(keyspaceCQL).Exec()
}

17
readers/messages.go Normal file
View File

@ -0,0 +1,17 @@
package readers
import (
"errors"
"github.com/mainflux/mainflux"
)
// ErrNotFound indicates that requested entity doesn't exist.
var ErrNotFound = errors.New("entity not found")
// MessageRepository specifies message reader API.
type MessageRepository interface {
// ReadAll skips given number of messages for given channel and returns next
// limited number of messages.
ReadAll(uint64, uint64, uint64) []mainflux.Message
}

45
readers/mocks/messages.go Normal file
View File

@ -0,0 +1,45 @@
package mocks
import (
"sync"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/readers"
)
var _ readers.MessageRepository = (*messageRepositoryMock)(nil)
type messageRepositoryMock struct {
mutex sync.Mutex
messages map[uint64][]mainflux.Message
}
// NewMessageRepository returns mock implementation of message repository.
func NewMessageRepository(messages map[uint64][]mainflux.Message) readers.MessageRepository {
return &messageRepositoryMock{
mutex: sync.Mutex{},
messages: messages,
}
}
func (repo *messageRepositoryMock) ReadAll(chanID, offset, limit uint64) []mainflux.Message {
repo.mutex.Lock()
defer repo.mutex.Unlock()
end := offset + limit
numOfMessages := uint64(len(repo.messages[chanID]))
if offset < 0 || offset >= numOfMessages {
return []mainflux.Message{}
}
if limit < 1 {
return []mainflux.Message{}
}
if offset+limit > numOfMessages {
end = numOfMessages
}
return repo.messages[chanID][offset:end]
}

40
readers/mocks/things.go Normal file
View File

@ -0,0 +1,40 @@
package mocks
import (
"context"
"strconv"
"github.com/mainflux/mainflux"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
var errUnathorized = status.Error(codes.PermissionDenied, "missing or invalid credentials provided")
var _ mainflux.ThingsServiceClient = (*thingsServiceMock)(nil)
type thingsServiceMock struct{}
// NewThingsService returns mock implementation of things service
func NewThingsService() mainflux.ThingsServiceClient {
return thingsServiceMock{}
}
func (svc thingsServiceMock) CanAccess(ctx context.Context, in *mainflux.AccessReq, opts ...grpc.CallOption) (*mainflux.ThingID, error) {
token := in.GetToken()
if token == "invalid" {
return nil, errUnathorized
}
id, err := strconv.ParseUint(token, 10, 64)
if err != nil {
return nil, errUnathorized
}
return &mainflux.ThingID{Value: id}, nil
}
func (svc thingsServiceMock) Identify(_ context.Context, _ *mainflux.Token, _ ...grpc.CallOption) (*mainflux.ThingID, error) {
return nil, nil
}

122
readers/swagger.yml Normal file
View File

@ -0,0 +1,122 @@
swagger: "2.0"
info:
title: Mainflux reader service
description: HTTP API for reading messages.
version: "1.0.0"
consumes:
- "application/json"
produces:
- "application/json"
paths:
/channels/{chanId}/messages:
get:
summary: Retrieves messages sent to single channel
description: |
Retrieves a list of messages sent to specific channel. Due to
performance concerns, data is retrieved in subsets. The API readers must
ensure that the entire dataset is consumed either by making subsequent
requests, or by increasing the subset size of the initial request.
tags:
- messages
parameters:
- $ref: "#/parameters/Authorization"
- $ref: "#/parameters/Limit"
- $ref: "#/parameters/Offset"
- $ref: "#/parameters/ChanId"
responses:
200:
description: Data retrieved.
schema:
$ref: "#/definitions/MessageList"
400:
description: Failed due to malformed query parameters.
403:
description: Missing or invalid access token provided.
500:
$ref: "#/responses/ServiceError"
responses:
ServiceError:
description: Unexpected server-side error occured.
definitions:
MessageList:
type: object
properties:
channels:
type: array
minItems: 0
uniqueItems: true
items:
type: object
properties:
Channel:
type: integer
description: Unique channel id.
Publisher:
type: integer
description: Unique publisher id.
Protocol:
type: string
description: Protocol name.
Name:
type: string
description: Measured parameter name.
Unit:
type: string
description: Value unit.
Value:
type: number
description: Measured value in number.
StringValue:
type: string
description: Measured value in string format.
BoolValue:
type: boolean
description: Measured value in boolean format.
DataValue:
type: string
description: Measured value in binary format.
ValueSum:
type: number
description: Sum value.
Time:
type: number
description: Time of measurement.
UpdateTime:
type: number
description: Time of updating measurement.
Link:
type: string
parameters:
Authorization:
name: Authorization
description: Thing access token.
in: header
type: string
required: true
ChanId:
name: chanId
description: Unique channel identifier.
in: path
type: integer
minimum: 1
required: true
Limit:
name: limit
description: Size of the subset to retrieve.
in: query
type: integer
default: 10
maximum: 100
minimum: 1
required: false
Offset:
name: offset
description: Number of items to skip during retrieval.
in: query
type: integer
default: 0
minimum: 0
required: false

View File

@ -20,7 +20,7 @@ default values.
```yaml
version: "2"
cassandra-writer:
image: mainflux/cassandra:[version]
image: mainflux/cassandra-writer:[version]
container_name: [instance name]
expose:
- [Service HTTP port]
@ -43,26 +43,26 @@ go get github.com/mainflux/mainflux
cd $GOPATH/src/github.com/mainflux/mainflux
# compile the cassandra
make cassandra
# compile the cassandra writer
make cassandra-writer
# copy binary to bin
make install
# Set the environment variables and run the service
MF_NATS_URL=[NATS instance URL] MF_CASSANDRA_WRITER_PORT=[Service HTTP port] MF_CASSANDRA_WRITER_DB_CLUSTER=[Cassandra cluster comma separated addresses] MF_CASSANDRA_WRITER_DB_KEYSPACE=[Cassandra keyspace name] $GOBIN/mainflux-cassandra
MF_NATS_URL=[NATS instance URL] MF_CASSANDRA_WRITER_PORT=[Service HTTP port] MF_CASSANDRA_WRITER_DB_CLUSTER=[Cassandra cluster comma separated addresses] MF_CASSANDRA_WRITER_DB_KEYSPACE=[Cassandra keyspace name] $GOBIN/mainflux-cassandra-writer
```
### Using docker-compose
This service can be deployed using docker containers. Docker compose file is
available in `<project_root>/docker/addons/cassandra/docker-compose.yml`. In
order to run all Mainflux core services, as well as mentioned optional ones,
available in `<project_root>/docker/addons/cassandra-writer/docker-compose.yml`.
In order to run all Mainflux core services, as well as mentioned optional ones,
execute following command:
```bash
./docker/addons/cassandra/init.sh
./docker/addons/cassandra-writer/init.sh
```
## Usage

View File

@ -16,7 +16,7 @@ const table = `CREATE TABLE IF NOT EXISTS messages (
value_sum double,
time double,
update_time double,
link text,
link text
)`
// Connect establishes connection to the Cassandra cluster.
@ -30,8 +30,7 @@ func Connect(hosts []string, keyspace string) (*gocql.Session, error) {
return nil, err
}
err = session.Query(table).Exec()
if err != nil {
if err := session.Query(table).Exec(); err != nil {
return nil, err
}

View File

@ -13,8 +13,12 @@ import (
const keyspace = "mainflux"
var (
msg = mainflux.Message{}
addr = "localhost"
msg = mainflux.Message{
Channel: 1,
Publisher: 1,
Protocol: "mqtt",
}
)
func TestSave(t *testing.T) {

View File

@ -1,10 +1,8 @@
package writers
import (
"github.com/mainflux/mainflux"
)
import "github.com/mainflux/mainflux"
// MessageRepository specifies message reading API.
// MessageRepository specifies message writing API.
type MessageRepository interface {
// Save method is used to save published message. A non-nil