From 6d48446c169f0bf13264a959d8b4d0d67f459abf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleksandar=20Novakovi=C4=87?= Date: Mon, 6 Aug 2018 17:06:55 +0200 Subject: [PATCH] MF-313 - Implement basic Cassandra reader (#331) * Fix logger message in http service Signed-off-by: Aleksandar Novakovic * Inline query and error handling in cassandra writer Signed-off-by: Aleksandar Novakovic * Fix comments and import statement in writer interface Signed-off-by: Aleksandar Novakovic * Add reader common interface and shared HTTP API Signed-off-by: Aleksandar Novakovic * Add Cassandra reader implementation Signed-off-by: Aleksandar Novakovic * Add tests for cassandra reader Signed-off-by: Aleksandar Novakovic * Add swagger doc and readme for readers Signed-off-by: Aleksandar Novakovic * Update make file Signed-off-by: Aleksandar Novakovic * Add docker-compose configuration for cassandra reader Signed-off-by: Aleksandar Novakovic * Add readme file to cassandra reader Signed-off-by: Aleksandar Novakovic --- Makefile | 2 +- cmd/cassandra-reader/main.go | 127 ++++++++++++++ cmd/{cassandra => cassandra-writer}/main.go | 0 cmd/http/main.go | 2 +- .../cassandra-reader/docker-compose.yml | 28 ++++ .../docker-compose.yml | 13 +- .../{cassandra => cassandra-writer}/init.sh | 2 +- docker/docker-compose.yml | 28 ++++ readers/README.md | 10 ++ readers/api/endpoint.go | 22 +++ readers/api/endpoint_test.go | 155 ++++++++++++++++++ readers/api/logging.go | 36 ++++ readers/api/metrics.go | 38 +++++ readers/api/requests.go | 19 +++ readers/api/responses.go | 25 +++ readers/api/transport.go | 150 +++++++++++++++++ readers/cassandra/README.md | 72 ++++++++ readers/cassandra/doc.go | 2 + readers/cassandra/init.go | 12 ++ readers/cassandra/messages.go | 50 ++++++ readers/cassandra/messages_test.go | 74 +++++++++ readers/cassandra/setup_test.go | 72 ++++++++ readers/messages.go | 17 ++ readers/mocks/messages.go | 45 +++++ readers/mocks/things.go | 40 +++++ readers/swagger.yml | 122 ++++++++++++++ writers/cassandra/README.md | 14 +- writers/cassandra/init.go | 5 +- writers/cassandra/messages_test.go | 6 +- writers/messages.go | 6 +- 30 files changed, 1173 insertions(+), 21 deletions(-) create mode 100644 cmd/cassandra-reader/main.go rename cmd/{cassandra => cassandra-writer}/main.go (100%) create mode 100644 docker/addons/cassandra-reader/docker-compose.yml rename docker/addons/{cassandra => cassandra-writer}/docker-compose.yml (78%) rename docker/addons/{cassandra => cassandra-writer}/init.sh (62%) create mode 100644 readers/README.md create mode 100644 readers/api/endpoint.go create mode 100644 readers/api/endpoint_test.go create mode 100644 readers/api/logging.go create mode 100644 readers/api/metrics.go create mode 100644 readers/api/requests.go create mode 100644 readers/api/responses.go create mode 100644 readers/api/transport.go create mode 100644 readers/cassandra/README.md create mode 100644 readers/cassandra/doc.go create mode 100644 readers/cassandra/init.go create mode 100644 readers/cassandra/messages.go create mode 100644 readers/cassandra/messages_test.go create mode 100644 readers/cassandra/setup_test.go create mode 100644 readers/messages.go create mode 100644 readers/mocks/messages.go create mode 100644 readers/mocks/things.go create mode 100644 readers/swagger.yml diff --git a/Makefile b/Makefile index ecbdbd03..026fa35a 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ BUILD_DIR = build -SERVICES = users things http normalizer ws influxdb mongodb cassandra +SERVICES = users things http normalizer ws influxdb mongodb cassandra-writer cassandra-reader DOCKERS = $(addprefix docker_,$(SERVICES)) CGO_ENABLED ?= 0 GOOS ?= linux diff --git a/cmd/cassandra-reader/main.go b/cmd/cassandra-reader/main.go new file mode 100644 index 00000000..56ec06e2 --- /dev/null +++ b/cmd/cassandra-reader/main.go @@ -0,0 +1,127 @@ +package main + +import ( + "fmt" + "net/http" + "os" + "os/signal" + "strings" + "syscall" + + kitprometheus "github.com/go-kit/kit/metrics/prometheus" + "github.com/gocql/gocql" + "github.com/mainflux/mainflux" + log "github.com/mainflux/mainflux/logger" + "github.com/mainflux/mainflux/readers" + "github.com/mainflux/mainflux/readers/api" + "github.com/mainflux/mainflux/readers/cassandra" + thingsapi "github.com/mainflux/mainflux/things/api/grpc" + stdprometheus "github.com/prometheus/client_golang/prometheus" + "google.golang.org/grpc" +) + +const ( + sep = "," + + defPort = "8180" + defCluster = "127.0.0.1" + defKeyspace = "mainflux" + defThingsURL = "localhost:8181" + + envPort = "MF_CASSANDRA_READER_PORT" + envCluster = "MF_CASSANDRA_READER_DB_CLUSTER" + envKeyspace = "MF_CASSANDRA_READER_DB_KEYSPACE" + envThingsURL = "MF_THINGS_URL" +) + +type config struct { + port string + cluster string + keyspace string + thingsURL string +} + +func main() { + cfg := loadConfig() + + logger := log.New(os.Stdout) + + session := connectToCassandra(cfg.cluster, cfg.keyspace, logger) + defer session.Close() + + conn := connectToThings(cfg.thingsURL, logger) + defer conn.Close() + + tc := thingsapi.NewClient(conn) + repo := newService(session, logger) + + errs := make(chan error, 2) + + go startHTTPServer(repo, tc, cfg.port, errs, logger) + + go func() { + c := make(chan os.Signal) + signal.Notify(c, syscall.SIGINT) + errs <- fmt.Errorf("%s", <-c) + }() + + err := <-errs + logger.Error(fmt.Sprintf("Cassandra writer service terminated: %s", err)) +} + +func loadConfig() config { + return config{ + port: mainflux.Env(envPort, defPort), + cluster: mainflux.Env(envCluster, defCluster), + keyspace: mainflux.Env(envKeyspace, defKeyspace), + thingsURL: mainflux.Env(envThingsURL, defThingsURL), + } +} + +func connectToCassandra(cluster, keyspace string, logger log.Logger) *gocql.Session { + session, err := cassandra.Connect(strings.Split(cluster, sep), keyspace) + if err != nil { + logger.Error(fmt.Sprintf("Failed to connect to Cassandra cluster: %s", err)) + os.Exit(1) + } + + return session +} + +func connectToThings(url string, logger log.Logger) *grpc.ClientConn { + conn, err := grpc.Dial(url, grpc.WithInsecure()) + if err != nil { + logger.Error(fmt.Sprintf("Failed to connect to things service: %s", err)) + os.Exit(1) + } + + return conn +} + +func newService(session *gocql.Session, logger log.Logger) readers.MessageRepository { + repo := cassandra.New(session) + repo = api.LoggingMiddleware(repo, logger) + repo = api.MetricsMiddleware( + repo, + kitprometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: "cassandra", + Subsystem: "message_reader", + Name: "request_count", + Help: "Number of requests received.", + }, []string{"method"}), + kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{ + Namespace: "cassandra", + Subsystem: "message_reader", + Name: "request_latency_microseconds", + Help: "Total duration of requests in microseconds.", + }, []string{"method"}), + ) + + return repo +} + +func startHTTPServer(repo readers.MessageRepository, tc mainflux.ThingsServiceClient, port string, errs chan error, logger log.Logger) { + p := fmt.Sprintf(":%s", port) + logger.Info(fmt.Sprintf("Cassandra writer service started, exposed port %s", port)) + errs <- http.ListenAndServe(p, api.MakeHandler(repo, tc, "cassandra-reader")) +} diff --git a/cmd/cassandra/main.go b/cmd/cassandra-writer/main.go similarity index 100% rename from cmd/cassandra/main.go rename to cmd/cassandra-writer/main.go diff --git a/cmd/http/main.go b/cmd/http/main.go index f57106b5..87363207 100644 --- a/cmd/http/main.go +++ b/cmd/http/main.go @@ -52,7 +52,7 @@ func main() { conn, err := grpc.Dial(cfg.ThingsURL, grpc.WithInsecure()) if err != nil { - logger.Error(fmt.Sprintf("Failed to connect to users service: %s", err)) + logger.Error(fmt.Sprintf("Failed to connect to things service: %s", err)) os.Exit(1) } defer conn.Close() diff --git a/docker/addons/cassandra-reader/docker-compose.yml b/docker/addons/cassandra-reader/docker-compose.yml new file mode 100644 index 00000000..6c96930f --- /dev/null +++ b/docker/addons/cassandra-reader/docker-compose.yml @@ -0,0 +1,28 @@ +### +# This docker-compose file contains optional Cassandra and cassandra-reader. Since these are optional, this file is +# dependent of docker-compose file from /docker. In order to run +# these optional service, execute command: +# docker-compose -f docker/docker-compose.yml -f docker/addons/cassandra-reader/docker-compose.yml up +# from project root. +### + +version: "3" + +networks: + docker_mainflux-base-net: + external: true + +services: + cassandra-reader: + image: mainflux/cassandra-reader:latest + container_name: mainflux-cassandra-reader + restart: on-failure + environment: + MF_THINGS_URL: things:8183 + MF_CASSANDRA_READER_PORT: 8903 + MF_CASSANDRA_READER_DB_CLUSTER: mainflux-cassandra + MF_CASSANDRA_READER_DB_KEYSPACE: mainflux + ports: + - 8903:8903 + networks: + - docker_mainflux-base-net diff --git a/docker/addons/cassandra/docker-compose.yml b/docker/addons/cassandra-writer/docker-compose.yml similarity index 78% rename from docker/addons/cassandra/docker-compose.yml rename to docker/addons/cassandra-writer/docker-compose.yml index 45b7f2cb..23e37707 100644 --- a/docker/addons/cassandra/docker-compose.yml +++ b/docker/addons/cassandra-writer/docker-compose.yml @@ -2,24 +2,29 @@ # This docker-compose file contains optional Cassandra and cassandra-writer. Since these are optional, this file is # dependent of docker-compose file from /docker. In order to run # these optional service, execute command: -# docker-compose -f docker/docker-compose.yml -f docker/addons/cassandra/docker-compose.yml up +# docker-compose -f docker/docker-compose.yml -f docker/addons/cassandra-writer/docker-compose.yml up # from project root. ### version: "3" +networks: + docker_mainflux-base-net: + external: true + services: cassandra: image: cassandra:3.11.2 container_name: mainflux-cassandra restart: on-failure + networks: + - docker_mainflux-base-net cassandra-writer: - image: mainflux/cassandra:latest + image: mainflux/cassandra-writer:latest container_name: mainflux-cassandra-writer depends_on: - cassandra - - nats expose: - 8902 restart: on-failure @@ -30,3 +35,5 @@ services: MF_CASSANDRA_WRITER_DB_KEYSPACE: mainflux ports: - 8902:8902 + networks: + - docker_mainflux-base-net diff --git a/docker/addons/cassandra/init.sh b/docker/addons/cassandra-writer/init.sh similarity index 62% rename from docker/addons/cassandra/init.sh rename to docker/addons/cassandra-writer/init.sh index cf97b6b5..d5c19149 100755 --- a/docker/addons/cassandra/init.sh +++ b/docker/addons/cassandra-writer/init.sh @@ -1,3 +1,3 @@ -docker-compose -f docker/docker-compose.yml -f docker/addons/cassandra/docker-compose.yml up -d +docker-compose -f docker/addons/cassandra-writer/docker-compose.yml up -d sleep 20 docker exec mainflux-cassandra cqlsh -e "CREATE KEYSPACE IF NOT EXISTS mainflux WITH replication = {'class':'SimpleStrategy','replication_factor':'1'};" diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 71399668..55c1aa18 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -8,6 +8,10 @@ version: "3" +networks: + mainflux-base-net: + driver: bridge + services: nginx: image: nginx:1.13-alpine @@ -22,11 +26,15 @@ services: - 80:80 - 443:443 - 8883:8883 + networks: + - mainflux-base-net nats: image: nats:1.1.0 container_name: mainflux-nats restart: on-failure + networks: + - mainflux-base-net users-db: image: postgres:10.2-alpine @@ -36,6 +44,8 @@ services: POSTGRES_USER: mainflux POSTGRES_PASSWORD: mainflux POSTGRES_DB: users + networks: + - mainflux-base-net users: image: mainflux/users:latest @@ -57,6 +67,8 @@ services: MF_USERS_SECRET: secret ports: - 8180:8180 + networks: + - mainflux-base-net things-db: image: postgres:10.2-alpine @@ -66,6 +78,8 @@ services: POSTGRES_USER: mainflux POSTGRES_PASSWORD: mainflux POSTGRES_DB: things + networks: + - mainflux-base-net things: image: mainflux/things:latest @@ -89,6 +103,8 @@ services: MF_THINGS_SECRET: secret ports: - 8182:8182 + networks: + - mainflux-base-net normalizer: image: mainflux/normalizer:latest @@ -101,6 +117,8 @@ services: environment: MF_NATS_URL: nats://nats:4222 MF_NORMALIZER_PORT: 8184 + networks: + - mainflux-base-net dashflux: image: mainflux/dashflux:latest @@ -108,6 +126,8 @@ services: restart: on-failure ports: - 3000:3000 + networks: + - mainflux-base-net ws-adapter: image: mainflux/ws:latest @@ -124,6 +144,8 @@ services: MF_THINGS_URL: things:8183 ports: - 8186:8186 + networks: + - mainflux-base-net http-adapter: image: mainflux/http:latest @@ -140,11 +162,15 @@ services: MF_THINGS_URL: things:8183 ports: - 8185:8185 + networks: + - mainflux-base-net redis: image: redis:4.0.9-alpine container_name: mainflux-redis restart: on-failure + networks: + - mainflux-base-net mqtt-adapter: image: mainflux/mqtt:latest @@ -163,3 +189,5 @@ services: ports: - 1883:1883 - 8880:8880 + networks: + - mainflux-base-net diff --git a/readers/README.md b/readers/README.md new file mode 100644 index 00000000..f145ac99 --- /dev/null +++ b/readers/README.md @@ -0,0 +1,10 @@ +# Readers + +Readers provide an implementation of various `message readers`. +Message readers are services that consume normalized (in `SenML` format) +Mainflux messages from data storage and opens HTTP API for message consumption. + +For an in-depth explanation of the usage of `reader`, as well as thorough +understanding of Mainflux, please check out the [official documentation][doc]. + +[doc]: http://mainflux.readthedocs.io diff --git a/readers/api/endpoint.go b/readers/api/endpoint.go new file mode 100644 index 00000000..cb30f19c --- /dev/null +++ b/readers/api/endpoint.go @@ -0,0 +1,22 @@ +package api + +import ( + "context" + + "github.com/go-kit/kit/endpoint" + "github.com/mainflux/mainflux/readers" +) + +func listMessagesEndpoint(svc readers.MessageRepository) endpoint.Endpoint { + return func(_ context.Context, request interface{}) (interface{}, error) { + req := request.(listMessagesReq) + + if err := req.validate(); err != nil { + return nil, err + } + + messages := svc.ReadAll(req.chanID, req.offset, req.limit) + + return listMessagesRes{Messages: messages}, nil + } +} diff --git a/readers/api/endpoint_test.go b/readers/api/endpoint_test.go new file mode 100644 index 00000000..2203484e --- /dev/null +++ b/readers/api/endpoint_test.go @@ -0,0 +1,155 @@ +package api_test + +import ( + "fmt" + "net/http" + "net/http/httptest" + "strconv" + "testing" + + "github.com/mainflux/mainflux" + "github.com/mainflux/mainflux/readers" + "github.com/mainflux/mainflux/readers/api" + "github.com/mainflux/mainflux/readers/mocks" + "github.com/stretchr/testify/assert" +) + +const ( + svcName = "test-service" + token = "1" + invalid = "invalid" + numOfMessages = 42 + chanID uint64 = 1 +) + +func newService() readers.MessageRepository { + messages := []mainflux.Message{} + for i := 0; i < numOfMessages; i++ { + messages = append(messages, mainflux.Message{ + Channel: chanID, + Publisher: 1, + Protocol: "mqtt", + }) + } + + return mocks.NewMessageRepository(map[uint64][]mainflux.Message{ + chanID: messages, + }) +} + +func newServer(repo readers.MessageRepository, tc mainflux.ThingsServiceClient) *httptest.Server { + mux := api.MakeHandler(repo, tc, svcName) + return httptest.NewServer(mux) +} + +type testRequest struct { + client *http.Client + method string + url string + token string +} + +func (tr testRequest) make() (*http.Response, error) { + req, err := http.NewRequest(tr.method, tr.url, nil) + if err != nil { + return nil, err + } + if tr.token != "" { + req.Header.Set("Authorization", tr.token) + } + + return tr.client.Do(req) +} + +func TestReadAll(t *testing.T) { + svc := newService() + tc := mocks.NewThingsService() + ts := newServer(svc, tc) + defer ts.Close() + + id := strconv.FormatUint(chanID, 10) + + cases := map[string]struct { + url string + token string + status int + }{ + "read page with valid offset and limit": { + url: fmt.Sprintf("%s/channels/%s/messages?offset=0&limit=10", ts.URL, id), + token: token, + status: http.StatusOK, + }, + "read page with negative offset": { + url: fmt.Sprintf("%s/channels/%s/messages?offset=-1&limit=10", ts.URL, id), + token: token, + status: http.StatusBadRequest, + }, + "read page with negative limit": { + url: fmt.Sprintf("%s/channels/%s/messages?offset=0&limit=-10", ts.URL, id), + token: token, + status: http.StatusBadRequest, + }, + "read page with zero limit": { + url: fmt.Sprintf("%s/channels/%s/messages?offset=0&limit=0", ts.URL, id), + token: token, + status: http.StatusBadRequest, + }, + "read page with non-integer offset": { + url: fmt.Sprintf("%s/channels/%s/messages?offset=abc&limit=10", ts.URL, id), + token: token, + status: http.StatusBadRequest, + }, + "read page with non-integer limit": { + url: fmt.Sprintf("%s/channels/%s/messages?offset=0&limit=abc", ts.URL, id), + token: token, + status: http.StatusBadRequest, + }, + "read page with invalid channel id": { + url: fmt.Sprintf("%s/channels/abc/messages?offset=0&limit=10", ts.URL), + token: token, + status: http.StatusBadRequest, + }, + "read page with invalid token": { + url: fmt.Sprintf("%s/channels/%s/messages?offset=0&limit=10", ts.URL, id), + token: invalid, + status: http.StatusForbidden, + }, + "read page with multiple offset": { + url: fmt.Sprintf("%s/channels/%s/messages?offset=0&offset=1&limit=10", ts.URL, id), + token: token, + status: http.StatusBadRequest, + }, + "read page with multiple limit": { + url: fmt.Sprintf("%s/channels/%s/messages?offset=0&limit=20&limit=10", ts.URL, id), + token: token, + status: http.StatusBadRequest, + }, + "read page with empty token": { + url: fmt.Sprintf("%s/channels/%s/messages?offset=0&limit=10", ts.URL, id), + token: "", + status: http.StatusForbidden, + }, + "read page with default offset": { + url: fmt.Sprintf("%s/channels/%s/messages?limit=10", ts.URL, id), + token: token, + status: http.StatusOK, + }, + "read page with default limit": { + url: fmt.Sprintf("%s/channels/%s/messages?offset=0", ts.URL, id), + token: token, + status: http.StatusOK, + }, + } + + for desc, tc := range cases { + req := testRequest{ + client: ts.Client(), + method: http.MethodGet, + url: tc.url, + token: tc.token, + } + res, err := req.make() + assert.Nil(t, err, fmt.Sprintf("%s: unexpected error %s", desc, err)) + assert.Equal(t, tc.status, res.StatusCode, fmt.Sprintf("%s: expected %d got %d", desc, tc.status, res.StatusCode)) + } +} diff --git a/readers/api/logging.go b/readers/api/logging.go new file mode 100644 index 00000000..4b511e11 --- /dev/null +++ b/readers/api/logging.go @@ -0,0 +1,36 @@ +// +build !test + +package api + +import ( + "fmt" + "time" + + "github.com/mainflux/mainflux" + log "github.com/mainflux/mainflux/logger" + "github.com/mainflux/mainflux/readers" +) + +var _ readers.MessageRepository = (*loggingMiddleware)(nil) + +type loggingMiddleware struct { + logger log.Logger + svc readers.MessageRepository +} + +// LoggingMiddleware adds logging facilities to the core service. +func LoggingMiddleware(svc readers.MessageRepository, logger log.Logger) readers.MessageRepository { + return &loggingMiddleware{ + logger: logger, + svc: svc, + } +} + +func (lm *loggingMiddleware) ReadAll(chanID, offset, limit uint64) []mainflux.Message { + defer func(begin time.Time) { + lm.logger.Info(fmt.Sprintf(`Method read_all for offset %d and limit %d took + %s to complete without errors.`, offset, limit, time.Since(begin))) + }(time.Now()) + + return lm.svc.ReadAll(chanID, offset, limit) +} diff --git a/readers/api/metrics.go b/readers/api/metrics.go new file mode 100644 index 00000000..fa8e00f4 --- /dev/null +++ b/readers/api/metrics.go @@ -0,0 +1,38 @@ +// +build !test + +package api + +import ( + "time" + + "github.com/go-kit/kit/metrics" + "github.com/mainflux/mainflux" + "github.com/mainflux/mainflux/readers" +) + +var _ readers.MessageRepository = (*metricsMiddleware)(nil) + +type metricsMiddleware struct { + counter metrics.Counter + latency metrics.Histogram + svc readers.MessageRepository +} + +// MetricsMiddleware instruments core service by tracking request count and +// latency. +func MetricsMiddleware(svc readers.MessageRepository, counter metrics.Counter, latency metrics.Histogram) readers.MessageRepository { + return &metricsMiddleware{ + counter: counter, + latency: latency, + svc: svc, + } +} + +func (mm *metricsMiddleware) ReadAll(chanID, offset, limit uint64) []mainflux.Message { + defer func(begin time.Time) { + mm.counter.With("method", "read_all").Add(1) + mm.latency.With("method", "read_all").Observe(time.Since(begin).Seconds()) + }(time.Now()) + + return mm.svc.ReadAll(chanID, offset, limit) +} diff --git a/readers/api/requests.go b/readers/api/requests.go new file mode 100644 index 00000000..886ff1fb --- /dev/null +++ b/readers/api/requests.go @@ -0,0 +1,19 @@ +package api + +type apiReq interface { + validate() error +} + +type listMessagesReq struct { + chanID uint64 + offset uint64 + limit uint64 +} + +func (req listMessagesReq) validate() error { + if req.limit < 1 { + return errInvalidRequest + } + + return nil +} diff --git a/readers/api/responses.go b/readers/api/responses.go new file mode 100644 index 00000000..8221496d --- /dev/null +++ b/readers/api/responses.go @@ -0,0 +1,25 @@ +package api + +import ( + "net/http" + + "github.com/mainflux/mainflux" +) + +var _ mainflux.Response = (*listMessagesRes)(nil) + +type listMessagesRes struct { + Messages []mainflux.Message `json:"messages"` +} + +func (res listMessagesRes) Headers() map[string]string { + return map[string]string{} +} + +func (res listMessagesRes) Code() int { + return http.StatusOK +} + +func (res listMessagesRes) Empty() bool { + return false +} diff --git a/readers/api/transport.go b/readers/api/transport.go new file mode 100644 index 00000000..505adc71 --- /dev/null +++ b/readers/api/transport.go @@ -0,0 +1,150 @@ +package api + +import ( + "context" + "encoding/json" + "errors" + "net/http" + "strconv" + "time" + + kithttp "github.com/go-kit/kit/transport/http" + "github.com/go-zoo/bone" + "github.com/mainflux/mainflux" + "github.com/mainflux/mainflux/readers" + "github.com/prometheus/client_golang/prometheus/promhttp" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +const ( + contentType = "application/json" + defLimit = 10 + defOffset = 0 +) + +var ( + errInvalidRequest = errors.New("received invalid request") + errUnauthorizedAccess = errors.New("missing or invalid credentials provided") + auth mainflux.ThingsServiceClient +) + +// MakeHandler returns a HTTP handler for API endpoints. +func MakeHandler(svc readers.MessageRepository, tc mainflux.ThingsServiceClient, svcName string) http.Handler { + auth = tc + + opts := []kithttp.ServerOption{ + kithttp.ServerErrorEncoder(encodeError), + } + + mux := bone.New() + mux.Get("/channels/:chanID/messages", kithttp.NewServer( + listMessagesEndpoint(svc), + decodeList, + encodeResponse, + opts..., + )) + + mux.GetFunc("/version", mainflux.Version(svcName)) + mux.Handle("/metrics", promhttp.Handler()) + + return mux +} + +func decodeList(_ context.Context, r *http.Request) (interface{}, error) { + chanID, err := strconv.ParseUint(bone.GetValue(r, "chanID"), 10, 64) + if err != nil { + return nil, errInvalidRequest + } + + if err := authorize(r, chanID); err != nil { + return nil, err + } + + offset, err := getQuery(r, "offset", defOffset) + if err != nil { + return nil, err + } + + limit, err := getQuery(r, "limit", defLimit) + if err != nil { + return nil, err + } + + req := listMessagesReq{ + chanID: chanID, + offset: offset, + limit: limit, + } + + return req, nil +} + +func encodeResponse(_ context.Context, w http.ResponseWriter, response interface{}) error { + w.Header().Set("Content-Type", contentType) + + if ar, ok := response.(mainflux.Response); ok { + for k, v := range ar.Headers() { + w.Header().Set(k, v) + } + + w.WriteHeader(ar.Code()) + + if ar.Empty() { + return nil + } + } + + return json.NewEncoder(w).Encode(response) +} + +func encodeError(_ context.Context, err error, w http.ResponseWriter) { + switch err { + case nil: + case errInvalidRequest: + w.WriteHeader(http.StatusBadRequest) + case errUnauthorizedAccess: + w.WriteHeader(http.StatusForbidden) + default: + w.WriteHeader(http.StatusInternalServerError) + } +} + +func authorize(r *http.Request, chanID uint64) error { + token := r.Header.Get("Authorization") + if token == "" { + return errUnauthorizedAccess + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + _, err := auth.CanAccess(ctx, &mainflux.AccessReq{Token: token, ChanID: chanID}) + if err != nil { + e, ok := status.FromError(err) + if ok && e.Code() == codes.PermissionDenied { + return errUnauthorizedAccess + } + return err + } + + return nil +} + +func getQuery(req *http.Request, name string, fallback uint64) (uint64, error) { + vals := bone.GetQuery(req, name) + if len(vals) == 0 { + return fallback, nil + } + + if len(vals) > 1 { + return 0, errInvalidRequest + } + + val, err := strconv.ParseUint(vals[0], 10, 64) + if err != nil { + return 0, errInvalidRequest + } + + return uint64(val), nil +} diff --git a/readers/cassandra/README.md b/readers/cassandra/README.md new file mode 100644 index 00000000..5fa7a7f6 --- /dev/null +++ b/readers/cassandra/README.md @@ -0,0 +1,72 @@ +# Cassandra reader + +Cassandra reader provides message repository implementation for Cassandra. + +## Configuration + +The service is configured using the environment variables presented in the +following table. Note that any unset variables will be replaced with their +default values. + +| Variable | Description | Default | +|---------------------------------|---------------------------------------------|----------------| +| MF_CASSANDRA_READER_PORT | Service HTTP port | 8180 | +| MF_CASSANDRA_READER_DB_CLUSTER | Cassandra cluster comma separated addresses | 127.0.0.1 | +| MF_CASSANDRA_READER_DB_KEYSPACE | Cassandra keyspace name | mainflux | +| MF_THINGS_URL | Things service URL | localhost:8181 | + +## Deployment + +```yaml + version: "2" + cassandra-reader: + image: mainflux/cassandra-reader:[version] + container_name: [instance name] + expose: + - [Service HTTP port] + restart: on-failure + environment: + MF_THINGS_URL: [Things service URL] + MF_CASSANDRA_READER_PORT: [Service HTTP port] + MF_CASSANDRA_READER_DB_CLUSTER: [Cassandra cluster comma separated addresses] + MF_CASSANDRA_READER_DB_KEYSPACE: [Cassandra keyspace name] + ports: + - [host machine port]:[configured HTTP port] +``` + +To start the service, execute the following shell script: + +```bash +# download the latest version of the service +go get github.com/mainflux/mainflux + + +cd $GOPATH/src/github.com/mainflux/mainflux + +# compile the cassandra +make cassandra-reader + +# copy binary to bin +make install + +# Set the environment variables and run the service +MF_THINGS_URL=[Things service URL] MF_CASSANDRA_READER_PORT=[Service HTTP port] MF_CASSANDRA_READER_DB_CLUSTER=[Cassandra cluster comma separated addresses] MF_CASSANDRA_READER_DB_KEYSPACE=[Cassandra keyspace name] $GOBIN/mainflux-cassandra-reader + +``` + +### Using docker-compose + +This service can be deployed using docker containers. Docker compose file is +available in `/docker/addons/cassandra-reader/docker-compose.yml`. +In order to run all Mainflux core services, as well as mentioned optional ones, +execute following command: + +```bash +./docker/addons/cassandra-reader/init.sh +``` + +## Usage + +Service exposes HTTP API for fetching messages. + +[doc]: ../swagger.yml diff --git a/readers/cassandra/doc.go b/readers/cassandra/doc.go new file mode 100644 index 00000000..cbdf4a5d --- /dev/null +++ b/readers/cassandra/doc.go @@ -0,0 +1,2 @@ +// Package cassandra contains Cassandra specific reader implementation. +package cassandra diff --git a/readers/cassandra/init.go b/readers/cassandra/init.go new file mode 100644 index 00000000..7e1b0d62 --- /dev/null +++ b/readers/cassandra/init.go @@ -0,0 +1,12 @@ +package cassandra + +import "github.com/gocql/gocql" + +// Connect establishes connection to the Cassandra cluster. +func Connect(hosts []string, keyspace string) (*gocql.Session, error) { + cluster := gocql.NewCluster(hosts...) + cluster.Keyspace = keyspace + cluster.Consistency = gocql.Quorum + + return cluster.CreateSession() +} diff --git a/readers/cassandra/messages.go b/readers/cassandra/messages.go new file mode 100644 index 00000000..7dbd580b --- /dev/null +++ b/readers/cassandra/messages.go @@ -0,0 +1,50 @@ +package cassandra + +import ( + "github.com/gocql/gocql" + "github.com/mainflux/mainflux" + "github.com/mainflux/mainflux/readers" +) + +var _ readers.MessageRepository = (*cassandraRepository)(nil) + +type cassandraRepository struct { + session *gocql.Session +} + +// New instantiates Cassandra message repository. +func New(session *gocql.Session) readers.MessageRepository { + return cassandraRepository{session: session} +} + +func (cr cassandraRepository) ReadAll(chanID, offset, limit uint64) []mainflux.Message { + cql := `SELECT channel, publisher, protocol, name, unit, + value, string_value, bool_value, data_value, value_sum, time, + update_time, link FROM messages WHERE channel = ? LIMIT ? + ALLOW FILTERING` + + iter := cr.session.Query(cql, chanID, offset+limit).Iter() + scanner := iter.Scanner() + + // skip first OFFSET rows + for i := uint64(0); i < offset; i++ { + if !scanner.Next() { + break + } + } + + page := []mainflux.Message{} + for scanner.Next() { + var msg mainflux.Message + scanner.Scan(&msg.Channel, &msg.Publisher, &msg.Protocol, + &msg.Name, &msg.Unit, &msg.Value, &msg.StringValue, &msg.BoolValue, + &msg.DataValue, &msg.ValueSum, &msg.Time, &msg.UpdateTime, &msg.Link) + page = append(page, msg) + } + + if err := iter.Close(); err != nil { + return []mainflux.Message{} + } + + return page +} diff --git a/readers/cassandra/messages_test.go b/readers/cassandra/messages_test.go new file mode 100644 index 00000000..dfcdd879 --- /dev/null +++ b/readers/cassandra/messages_test.go @@ -0,0 +1,74 @@ +package cassandra_test + +import ( + "fmt" + "testing" + + "github.com/mainflux/mainflux" + readers "github.com/mainflux/mainflux/readers/cassandra" + writers "github.com/mainflux/mainflux/writers/cassandra" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const ( + keyspace = "mainflux" + chanID = 1 + numOfMessages = 42 +) + +var ( + addr = "localhost" + msg = mainflux.Message{ + Channel: chanID, + Publisher: 1, + Protocol: "mqtt", + } +) + +func TestReadAll(t *testing.T) { + session, err := readers.Connect([]string{addr}, keyspace) + require.Nil(t, err, fmt.Sprintf("failed to connect to Cassandra: %s", err)) + defer session.Close() + writer := writers.New(session) + + messages := []mainflux.Message{} + for i := 0; i < numOfMessages; i++ { + err := writer.Save(msg) + require.Nil(t, err, fmt.Sprintf("failed to store message to Cassandra: %s", err)) + messages = append(messages, msg) + } + + reader := readers.New(session) + + cases := map[string]struct { + chanID uint64 + offset uint64 + limit uint64 + messages []mainflux.Message + }{ + "read message page for existing channel": { + chanID: chanID, + offset: 0, + limit: 10, + messages: messages[0:10], + }, + "read message page for non-existent channel": { + chanID: 2, + offset: 0, + limit: 10, + messages: []mainflux.Message{}, + }, + "read message last page": { + chanID: chanID, + offset: 40, + limit: 10, + messages: messages[40:42], + }, + } + + for desc, tc := range cases { + result := reader.ReadAll(tc.chanID, tc.offset, tc.limit) + assert.ElementsMatch(t, tc.messages, result, fmt.Sprintf("%s: expected %v got %v", desc, tc.messages, result)) + } +} diff --git a/readers/cassandra/setup_test.go b/readers/cassandra/setup_test.go new file mode 100644 index 00000000..65f32307 --- /dev/null +++ b/readers/cassandra/setup_test.go @@ -0,0 +1,72 @@ +package cassandra_test + +import ( + "fmt" + "os" + "testing" + + "github.com/gocql/gocql" + log "github.com/mainflux/mainflux/logger" + "github.com/mainflux/mainflux/writers/cassandra" + dockertest "gopkg.in/ory-am/dockertest.v3" +) + +var logger = log.New(os.Stdout) + +func TestMain(m *testing.M) { + pool, err := dockertest.NewPool("") + if err != nil { + logger.Error(fmt.Sprintf("Could not connect to docker: %s", err)) + } + + container, err := pool.Run("cassandra", "3.11.2", []string{}) + if err != nil { + logger.Error(fmt.Sprintf("Could not start container: %s", err)) + } + + port := container.GetPort("9042/tcp") + addr = fmt.Sprintf("%s:%s", addr, port) + + err = pool.Retry(func() error { + if err := createKeyspace([]string{addr}); err != nil { + return err + } + + session, err := cassandra.Connect([]string{addr}, keyspace) + if err != nil { + return err + } + defer session.Close() + + return nil + }) + + if err != nil { + logger.Error(fmt.Sprintf("Could not connect to docker: %s", err)) + os.Exit(1) + } + + code := m.Run() + + if err := pool.Purge(container); err != nil { + logger.Error(fmt.Sprintf("Could not purge container: %s", err)) + } + + os.Exit(code) +} + +func createKeyspace(hosts []string) error { + cluster := gocql.NewCluster(hosts...) + cluster.Consistency = gocql.Quorum + + session, err := cluster.CreateSession() + if err != nil { + return err + } + defer session.Close() + + keyspaceCQL := fmt.Sprintf(`CREATE KEYSPACE IF NOT EXISTS %s WITH replication = + {'class':'SimpleStrategy','replication_factor':'1'}`, keyspace) + + return session.Query(keyspaceCQL).Exec() +} diff --git a/readers/messages.go b/readers/messages.go new file mode 100644 index 00000000..5fdf0fd6 --- /dev/null +++ b/readers/messages.go @@ -0,0 +1,17 @@ +package readers + +import ( + "errors" + + "github.com/mainflux/mainflux" +) + +// ErrNotFound indicates that requested entity doesn't exist. +var ErrNotFound = errors.New("entity not found") + +// MessageRepository specifies message reader API. +type MessageRepository interface { + // ReadAll skips given number of messages for given channel and returns next + // limited number of messages. + ReadAll(uint64, uint64, uint64) []mainflux.Message +} diff --git a/readers/mocks/messages.go b/readers/mocks/messages.go new file mode 100644 index 00000000..ec9455d9 --- /dev/null +++ b/readers/mocks/messages.go @@ -0,0 +1,45 @@ +package mocks + +import ( + "sync" + + "github.com/mainflux/mainflux" + "github.com/mainflux/mainflux/readers" +) + +var _ readers.MessageRepository = (*messageRepositoryMock)(nil) + +type messageRepositoryMock struct { + mutex sync.Mutex + messages map[uint64][]mainflux.Message +} + +// NewMessageRepository returns mock implementation of message repository. +func NewMessageRepository(messages map[uint64][]mainflux.Message) readers.MessageRepository { + return &messageRepositoryMock{ + mutex: sync.Mutex{}, + messages: messages, + } +} + +func (repo *messageRepositoryMock) ReadAll(chanID, offset, limit uint64) []mainflux.Message { + repo.mutex.Lock() + defer repo.mutex.Unlock() + + end := offset + limit + + numOfMessages := uint64(len(repo.messages[chanID])) + if offset < 0 || offset >= numOfMessages { + return []mainflux.Message{} + } + + if limit < 1 { + return []mainflux.Message{} + } + + if offset+limit > numOfMessages { + end = numOfMessages + } + + return repo.messages[chanID][offset:end] +} diff --git a/readers/mocks/things.go b/readers/mocks/things.go new file mode 100644 index 00000000..f796cf1b --- /dev/null +++ b/readers/mocks/things.go @@ -0,0 +1,40 @@ +package mocks + +import ( + "context" + "strconv" + + "github.com/mainflux/mainflux" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +var errUnathorized = status.Error(codes.PermissionDenied, "missing or invalid credentials provided") + +var _ mainflux.ThingsServiceClient = (*thingsServiceMock)(nil) + +type thingsServiceMock struct{} + +// NewThingsService returns mock implementation of things service +func NewThingsService() mainflux.ThingsServiceClient { + return thingsServiceMock{} +} + +func (svc thingsServiceMock) CanAccess(ctx context.Context, in *mainflux.AccessReq, opts ...grpc.CallOption) (*mainflux.ThingID, error) { + token := in.GetToken() + if token == "invalid" { + return nil, errUnathorized + } + + id, err := strconv.ParseUint(token, 10, 64) + if err != nil { + return nil, errUnathorized + } + + return &mainflux.ThingID{Value: id}, nil +} + +func (svc thingsServiceMock) Identify(_ context.Context, _ *mainflux.Token, _ ...grpc.CallOption) (*mainflux.ThingID, error) { + return nil, nil +} diff --git a/readers/swagger.yml b/readers/swagger.yml new file mode 100644 index 00000000..299bc14b --- /dev/null +++ b/readers/swagger.yml @@ -0,0 +1,122 @@ +swagger: "2.0" +info: + title: Mainflux reader service + description: HTTP API for reading messages. + version: "1.0.0" +consumes: + - "application/json" +produces: + - "application/json" +paths: + /channels/{chanId}/messages: + get: + summary: Retrieves messages sent to single channel + description: | + Retrieves a list of messages sent to specific channel. Due to + performance concerns, data is retrieved in subsets. The API readers must + ensure that the entire dataset is consumed either by making subsequent + requests, or by increasing the subset size of the initial request. + tags: + - messages + parameters: + - $ref: "#/parameters/Authorization" + - $ref: "#/parameters/Limit" + - $ref: "#/parameters/Offset" + - $ref: "#/parameters/ChanId" + responses: + 200: + description: Data retrieved. + schema: + $ref: "#/definitions/MessageList" + 400: + description: Failed due to malformed query parameters. + 403: + description: Missing or invalid access token provided. + 500: + $ref: "#/responses/ServiceError" + +responses: + ServiceError: + description: Unexpected server-side error occured. + +definitions: + MessageList: + type: object + properties: + channels: + type: array + minItems: 0 + uniqueItems: true + items: + type: object + properties: + Channel: + type: integer + description: Unique channel id. + Publisher: + type: integer + description: Unique publisher id. + Protocol: + type: string + description: Protocol name. + Name: + type: string + description: Measured parameter name. + Unit: + type: string + description: Value unit. + Value: + type: number + description: Measured value in number. + StringValue: + type: string + description: Measured value in string format. + BoolValue: + type: boolean + description: Measured value in boolean format. + DataValue: + type: string + description: Measured value in binary format. + ValueSum: + type: number + description: Sum value. + Time: + type: number + description: Time of measurement. + UpdateTime: + type: number + description: Time of updating measurement. + Link: + type: string + +parameters: + Authorization: + name: Authorization + description: Thing access token. + in: header + type: string + required: true + ChanId: + name: chanId + description: Unique channel identifier. + in: path + type: integer + minimum: 1 + required: true + Limit: + name: limit + description: Size of the subset to retrieve. + in: query + type: integer + default: 10 + maximum: 100 + minimum: 1 + required: false + Offset: + name: offset + description: Number of items to skip during retrieval. + in: query + type: integer + default: 0 + minimum: 0 + required: false diff --git a/writers/cassandra/README.md b/writers/cassandra/README.md index e62af4fd..9b2c8e3d 100644 --- a/writers/cassandra/README.md +++ b/writers/cassandra/README.md @@ -20,7 +20,7 @@ default values. ```yaml version: "2" cassandra-writer: - image: mainflux/cassandra:[version] + image: mainflux/cassandra-writer:[version] container_name: [instance name] expose: - [Service HTTP port] @@ -43,26 +43,26 @@ go get github.com/mainflux/mainflux cd $GOPATH/src/github.com/mainflux/mainflux -# compile the cassandra -make cassandra +# compile the cassandra writer +make cassandra-writer # copy binary to bin make install # Set the environment variables and run the service -MF_NATS_URL=[NATS instance URL] MF_CASSANDRA_WRITER_PORT=[Service HTTP port] MF_CASSANDRA_WRITER_DB_CLUSTER=[Cassandra cluster comma separated addresses] MF_CASSANDRA_WRITER_DB_KEYSPACE=[Cassandra keyspace name] $GOBIN/mainflux-cassandra +MF_NATS_URL=[NATS instance URL] MF_CASSANDRA_WRITER_PORT=[Service HTTP port] MF_CASSANDRA_WRITER_DB_CLUSTER=[Cassandra cluster comma separated addresses] MF_CASSANDRA_WRITER_DB_KEYSPACE=[Cassandra keyspace name] $GOBIN/mainflux-cassandra-writer ``` ### Using docker-compose This service can be deployed using docker containers. Docker compose file is -available in `/docker/addons/cassandra/docker-compose.yml`. In -order to run all Mainflux core services, as well as mentioned optional ones, +available in `/docker/addons/cassandra-writer/docker-compose.yml`. +In order to run all Mainflux core services, as well as mentioned optional ones, execute following command: ```bash -./docker/addons/cassandra/init.sh +./docker/addons/cassandra-writer/init.sh ``` ## Usage diff --git a/writers/cassandra/init.go b/writers/cassandra/init.go index ba9688cb..a6b1c70b 100644 --- a/writers/cassandra/init.go +++ b/writers/cassandra/init.go @@ -16,7 +16,7 @@ const table = `CREATE TABLE IF NOT EXISTS messages ( value_sum double, time double, update_time double, - link text, + link text )` // Connect establishes connection to the Cassandra cluster. @@ -30,8 +30,7 @@ func Connect(hosts []string, keyspace string) (*gocql.Session, error) { return nil, err } - err = session.Query(table).Exec() - if err != nil { + if err := session.Query(table).Exec(); err != nil { return nil, err } diff --git a/writers/cassandra/messages_test.go b/writers/cassandra/messages_test.go index b49e33e8..771cb722 100644 --- a/writers/cassandra/messages_test.go +++ b/writers/cassandra/messages_test.go @@ -13,8 +13,12 @@ import ( const keyspace = "mainflux" var ( - msg = mainflux.Message{} addr = "localhost" + msg = mainflux.Message{ + Channel: 1, + Publisher: 1, + Protocol: "mqtt", + } ) func TestSave(t *testing.T) { diff --git a/writers/messages.go b/writers/messages.go index 3370c7eb..744de55c 100644 --- a/writers/messages.go +++ b/writers/messages.go @@ -1,10 +1,8 @@ package writers -import ( - "github.com/mainflux/mainflux" -) +import "github.com/mainflux/mainflux" -// MessageRepository specifies message reading API. +// MessageRepository specifies message writing API. type MessageRepository interface { // Save method is used to save published message. A non-nil