diff --git a/cmd/cassandra-reader/main.go b/cmd/cassandra-reader/main.go index e028f830..b26e109a 100644 --- a/cmd/cassandra-reader/main.go +++ b/cmd/cassandra-reader/main.go @@ -66,16 +66,25 @@ func main() { } } - // Create new auth grpc client - auth, authHandler, err := authclient.Setup(svcName) + ac, acHandler, err := authclient.Setup(svcName) if err != nil { logger.Error(err.Error()) exitCode = 1 return } - defer authHandler.Close() + defer acHandler.Close() - logger.Info("Successfully connected to auth grpc server " + authHandler.Secure()) + logger.Info("Successfully connected to auth grpc server " + acHandler.Secure()) + + tc, tcHandler, err := authclient.SetupAuthz(svcName) + if err != nil { + logger.Error(err.Error()) + exitCode = 1 + return + } + defer tcHandler.Close() + + logger.Info("Successfully connected to things grpc server " + tcHandler.Secure()) // Create new cassandra client csdSession, err := cassandraclient.Setup(envPrefixDB) @@ -96,7 +105,7 @@ func main() { exitCode = 1 return } - hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(repo, auth, svcName, cfg.InstanceID), logger) + hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(repo, ac, tc, svcName, cfg.InstanceID), logger) if cfg.SendTelemetry { chc := chclient.New(svcName, mainflux.Version, logger, cancel) diff --git a/cmd/influxdb-reader/main.go b/cmd/influxdb-reader/main.go index 27f10565..2c16ecb8 100644 --- a/cmd/influxdb-reader/main.go +++ b/cmd/influxdb-reader/main.go @@ -65,15 +65,25 @@ func main() { } } - auth, authHandler, err := authclient.Setup(svcName) + ac, acHandler, err := authclient.Setup(svcName) if err != nil { logger.Error(err.Error()) exitCode = 1 return } - defer authHandler.Close() + defer acHandler.Close() - logger.Info("Successfully connected to auth grpc server " + authHandler.Secure()) + logger.Info("Successfully connected to auth grpc server " + acHandler.Secure()) + + tc, tcHandler, err := authclient.SetupAuthz(svcName) + if err != nil { + logger.Error(err.Error()) + exitCode = 1 + return + } + defer tcHandler.Close() + + logger.Info("Successfully connected to things grpc server " + tcHandler.Secure()) influxDBConfig := influxdbclient.Config{} if err := env.Parse(&influxDBConfig, env.Options{Prefix: envPrefixDB}); err != nil { @@ -104,7 +114,7 @@ func main() { exitCode = 1 return } - hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(repo, auth, svcName, cfg.InstanceID), logger) + hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(repo, ac, tc, svcName, cfg.InstanceID), logger) if cfg.SendTelemetry { chc := chclient.New(svcName, mainflux.Version, logger, cancel) diff --git a/cmd/mongodb-reader/main.go b/cmd/mongodb-reader/main.go index 665b1345..91c8efef 100644 --- a/cmd/mongodb-reader/main.go +++ b/cmd/mongodb-reader/main.go @@ -74,15 +74,25 @@ func main() { repo := newService(db, logger) - auth, authHandler, err := authclient.Setup(svcName) + ac, acHandler, err := authclient.Setup(svcName) if err != nil { logger.Fatal(err.Error()) exitCode = 1 return } - defer authHandler.Close() + defer acHandler.Close() - logger.Info("Successfully connected to auth grpc server " + authHandler.Secure()) + logger.Info("Successfully connected to auth grpc server " + acHandler.Secure()) + + tc, tcHandler, err := authclient.SetupAuthz(svcName) + if err != nil { + logger.Error(err.Error()) + exitCode = 1 + return + } + defer tcHandler.Close() + + logger.Info("Successfully connected to things grpc server " + tcHandler.Secure()) httpServerConfig := server.Config{Port: defSvcHTTPPort} if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHTTP}); err != nil { @@ -90,7 +100,7 @@ func main() { exitCode = 1 return } - hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(repo, auth, svcName, cfg.InstanceID), logger) + hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(repo, ac, tc, svcName, cfg.InstanceID), logger) if cfg.SendTelemetry { chc := chclient.New(svcName, mainflux.Version, logger, cancel) diff --git a/cmd/postgres-reader/main.go b/cmd/postgres-reader/main.go index 06bff73d..7f490644 100644 --- a/cmd/postgres-reader/main.go +++ b/cmd/postgres-reader/main.go @@ -80,15 +80,25 @@ func main() { } defer db.Close() - auth, authHandler, err := authclient.Setup(svcName) + ac, acHandler, err := authclient.Setup(svcName) if err != nil { logger.Error(err.Error()) exitCode = 1 return } - defer authHandler.Close() + defer acHandler.Close() - logger.Info("Successfully connected to auth grpc server " + authHandler.Secure()) + logger.Info("Successfully connected to auth grpc server " + acHandler.Secure()) + + tc, tcHandler, err := authclient.SetupAuthz(svcName) + if err != nil { + logger.Error(err.Error()) + exitCode = 1 + return + } + defer tcHandler.Close() + + logger.Info("Successfully connected to things grpc server " + tcHandler.Secure()) repo := newService(db, logger) @@ -98,7 +108,7 @@ func main() { exitCode = 1 return } - hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(repo, auth, svcName, cfg.InstanceID), logger) + hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(repo, ac, tc, svcName, cfg.InstanceID), logger) if cfg.SendTelemetry { chc := chclient.New(svcName, mainflux.Version, logger, cancel) diff --git a/cmd/timescale-reader/main.go b/cmd/timescale-reader/main.go index 5570f3a5..6f1c2292 100644 --- a/cmd/timescale-reader/main.go +++ b/cmd/timescale-reader/main.go @@ -80,15 +80,25 @@ func main() { repo := newService(db, logger) - auth, authHandler, err := authclient.Setup(svcName) + ac, acHandler, err := authclient.Setup(svcName) if err != nil { logger.Error(err.Error()) exitCode = 1 return } - defer authHandler.Close() + defer acHandler.Close() - logger.Info("Successfully connected to auth grpc server " + authHandler.Secure()) + logger.Info("Successfully connected to auth grpc server " + acHandler.Secure()) + + tc, tcHandler, err := authclient.SetupAuthz(svcName) + if err != nil { + logger.Error(err.Error()) + exitCode = 1 + return + } + defer tcHandler.Close() + + logger.Info("Successfully connected to things grpc server " + tcHandler.Secure()) httpServerConfig := server.Config{Port: defSvcHTTPPort} if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHTTP}); err != nil { @@ -96,7 +106,7 @@ func main() { exitCode = 1 return } - hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(repo, auth, svcName, cfg.InstanceID), logger) + hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(repo, ac, tc, svcName, cfg.InstanceID), logger) if cfg.SendTelemetry { chc := chclient.New(svcName, mainflux.Version, logger, cancel) diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 507b1770..a7609b53 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -376,11 +376,11 @@ services: MF_MQTT_ADAPTER_WS_TARGET_PATH: ${MF_MQTT_ADAPTER_WS_TARGET_PATH} MF_MQTT_ADAPTER_INSTANCE: ${MF_MQTT_ADAPTER_INSTANCE} MF_MQTT_ADAPTER_ES_URL: ${MF_ES_URL} - MF_AUTH_GRPC_URL: ${MF_THINGS_AUTH_GRPC_URL} - MF_AUTH_GRPC_TIMEOUT: ${MF_THINGS_AUTH_GRPC_TIMEOUT} - MF_AUTH_GRPC_CLIENT_CERT: ${MF_THINGS_AUTH_GRPC_CLIENT_CERT:+/client.crt} - MF_AUTH_GRPC_CLIENT_KEY: ${MF_THINGS_AUTH_GRPC_CLIENT_KEY:+/client.key} - MF_AUTH_GRPC_SERVER_CA_CERTS: ${MF_THINGS_AUTH_GRPC_SERVER_CA_CERTS:+/server_ca.crt} + MF_THINGS_AUTH_GRPC_URL: ${MF_THINGS_AUTH_GRPC_URL} + MF_THINGS_AUTH_GRPC_TIMEOUT: ${MF_THINGS_AUTH_GRPC_TIMEOUT} + MF_THINGS_AUTH_GRPC_CLIENT_CERT: ${MF_THINGS_AUTH_GRPC_CLIENT_CERT:+/things-grpc-client.crt} + MF_THINGS_AUTH_GRPC_CLIENT_KEY: ${MF_THINGS_AUTH_GRPC_CLIENT_KEY:+/things-grpc-client.key} + MF_THINGS_AUTH_GRPC_SERVER_CA_CERTS: ${MF_THINGS_AUTH_GRPC_SERVER_CA_CERTS:+/things-grpc-server-ca.crt} MF_JAEGER_URL: ${MF_JAEGER_URL} MF_MESSAGE_BROKER_URL: ${MF_MESSAGE_BROKER_URL} MF_SEND_TELEMETRY: ${MF_SEND_TELEMETRY} @@ -417,11 +417,11 @@ services: MF_HTTP_ADAPTER_PORT: ${MF_HTTP_ADAPTER_PORT} MF_HTTP_ADAPTER_SERVER_CERT: ${MF_HTTP_ADAPTER_SERVER_CERT} MF_HTTP_ADAPTER_SERVER_KEY: ${MF_HTTP_ADAPTER_SERVER_KEY} - MF_AUTH_GRPC_URL: ${MF_THINGS_AUTH_GRPC_URL} - MF_AUTH_GRPC_TIMEOUT: ${MF_THINGS_AUTH_GRPC_TIMEOUT} - MF_AUTH_GRPC_CLIENT_CERT: ${MF_THINGS_AUTH_GRPC_CLIENT_CERT:+/client.crt} - MF_AUTH_GRPC_CLIENT_KEY: ${MF_THINGS_AUTH_GRPC_CLIENT_KEY:+/client.key} - MF_AUTH_GRPC_SERVER_CA_CERTS: ${MF_THINGS_AUTH_GRPC_SERVER_CA_CERTS:+/server_ca.crt} + MF_THINGS_AUTH_GRPC_URL: ${MF_THINGS_AUTH_GRPC_URL} + MF_THINGS_AUTH_GRPC_TIMEOUT: ${MF_THINGS_AUTH_GRPC_TIMEOUT} + MF_THINGS_AUTH_GRPC_CLIENT_CERT: ${MF_THINGS_AUTH_GRPC_CLIENT_CERT:+/things-grpc-client.crt} + MF_THINGS_AUTH_GRPC_CLIENT_KEY: ${MF_THINGS_AUTH_GRPC_CLIENT_KEY:+/things-grpc-client.key} + MF_THINGS_AUTH_GRPC_SERVER_CA_CERTS: ${MF_THINGS_AUTH_GRPC_SERVER_CA_CERTS:+/things-grpc-server-ca.crt} MF_MESSAGE_BROKER_URL: ${MF_MESSAGE_BROKER_URL} MF_JAEGER_URL: ${MF_JAEGER_URL} MF_SEND_TELEMETRY: ${MF_SEND_TELEMETRY} @@ -474,11 +474,11 @@ services: MF_COAP_ADAPTER_HTTP_PORT: ${MF_COAP_ADAPTER_HTTP_PORT} MF_COAP_ADAPTER_HTTP_SERVER_CERT: ${MF_COAP_ADAPTER_HTTP_SERVER_CERT} MF_COAP_ADAPTER_HTTP_SERVER_KEY: ${MF_COAP_ADAPTER_HTTP_SERVER_KEY} - MF_AUTH_GRPC_URL: ${MF_THINGS_AUTH_GRPC_URL} - MF_AUTH_GRPC_TIMEOUT: ${MF_THINGS_AUTH_GRPC_TIMEOUT} - MF_AUTH_GRPC_CLIENT_CERT: ${MF_THINGS_AUTH_GRPC_CLIENT_CERT:+/client.crt} - MF_AUTH_GRPC_CLIENT_KEY: ${MF_THINGS_AUTH_GRPC_CLIENT_KEY:+/client.key} - MF_AUTH_GRPC_SERVER_CA_CERTS: ${MF_THINGS_AUTH_GRPC_SERVER_CA_CERTS:+/server_ca.crt} + MF_THINGS_AUTH_GRPC_URL: ${MF_THINGS_AUTH_GRPC_URL} + MF_THINGS_AUTH_GRPC_TIMEOUT: ${MF_THINGS_AUTH_GRPC_TIMEOUT} + MF_THINGS_AUTH_GRPC_CLIENT_CERT: ${MF_THINGS_AUTH_GRPC_CLIENT_CERT:+/things-grpc-client.crt} + MF_THINGS_AUTH_GRPC_CLIENT_KEY: ${MF_THINGS_AUTH_GRPC_CLIENT_KEY:+/things-grpc-client.key} + MF_THINGS_AUTH_GRPC_SERVER_CA_CERTS: ${MF_THINGS_AUTH_GRPC_SERVER_CA_CERTS:+/things-grpc-server-ca.crt} MF_MESSAGE_BROKER_URL: ${MF_MESSAGE_BROKER_URL} MF_JAEGER_URL: ${MF_JAEGER_URL} MF_SEND_TELEMETRY: ${MF_SEND_TELEMETRY} @@ -519,11 +519,11 @@ services: MF_WS_ADAPTER_HTTP_PORT: ${MF_WS_ADAPTER_HTTP_PORT} MF_WS_ADAPTER_HTTP_SERVER_CERT: ${MF_WS_ADAPTER_HTTP_SERVER_CERT} MF_WS_ADAPTER_HTTP_SERVER_KEY: ${MF_WS_ADAPTER_HTTP_SERVER_KEY} - MF_AUTH_GRPC_URL: ${MF_THINGS_AUTH_GRPC_URL} - MF_AUTH_GRPC_TIMEOUT: ${MF_THINGS_AUTH_GRPC_TIMEOUT} - MF_AUTH_GRPC_CLIENT_CERT: ${MF_THINGS_AUTH_GRPC_CLIENT_CERT:+/client.crt} - MF_AUTH_GRPC_CLIENT_KEY: ${MF_THINGS_AUTH_GRPC_CLIENT_KEY:+/client.key} - MF_AUTH_GRPC_SERVER_CA_CERTS: ${MF_THINGS_AUTH_GRPC_SERVER_CA_CERTS:+/server_ca.crt} + MF_THINGS_AUTH_GRPC_URL: ${MF_THINGS_AUTH_GRPC_URL} + MF_THINGS_AUTH_GRPC_TIMEOUT: ${MF_THINGS_AUTH_GRPC_TIMEOUT} + MF_THINGS_AUTH_GRPC_CLIENT_CERT: ${MF_THINGS_AUTH_GRPC_CLIENT_CERT:+/things-grpc-client.crt} + MF_THINGS_AUTH_GRPC_CLIENT_KEY: ${MF_THINGS_AUTH_GRPC_CLIENT_KEY:+/things-grpc-client.key} + MF_THINGS_AUTH_GRPC_SERVER_CA_CERTS: ${MF_THINGS_AUTH_GRPC_SERVER_CA_CERTS:+/things-grpc-server-ca.crt} MF_MESSAGE_BROKER_URL: ${MF_MESSAGE_BROKER_URL} MF_JAEGER_URL: ${MF_JAEGER_URL} MF_SEND_TELEMETRY: ${MF_SEND_TELEMETRY} diff --git a/internal/clients/grpc/auth/client.go b/internal/clients/grpc/auth/client.go index 1ddca8cc..bd9b232d 100644 --- a/internal/clients/grpc/auth/client.go +++ b/internal/clients/grpc/auth/client.go @@ -12,7 +12,10 @@ import ( thingsauth "github.com/mainflux/mainflux/things/api/grpc" ) -const envAuthGrpcPrefix = "MF_AUTH_GRPC_" +const ( + envAuthGrpcPrefix = "MF_AUTH_GRPC_" + envAuthzGrpcPrefix = "MF_THINGS_AUTH_GRPC_" +) var errGrpcConfig = errors.New("failed to load grpc configuration") @@ -33,7 +36,7 @@ func Setup(svcName string) (mainflux.AuthServiceClient, grpcclient.ClientHandler // Setup loads Auth gRPC configuration from environment variable and creates new Auth gRPC API. func SetupAuthz(svcName string) (mainflux.AuthzServiceClient, grpcclient.ClientHandler, error) { config := grpcclient.Config{} - if err := env.Parse(&config, env.Options{Prefix: envAuthGrpcPrefix}); err != nil { + if err := env.Parse(&config, env.Options{Prefix: envAuthzGrpcPrefix}); err != nil { return nil, nil, errors.Wrap(errGrpcConfig, err) } c, ch, err := grpcclient.Setup(config, svcName) diff --git a/readers/api/endpoint.go b/readers/api/endpoint.go index e9933cbd..708578bc 100644 --- a/readers/api/endpoint.go +++ b/readers/api/endpoint.go @@ -13,16 +13,17 @@ import ( "github.com/mainflux/mainflux/readers" ) -func listMessagesEndpoint(svc readers.MessageRepository, ac mainflux.AuthServiceClient) endpoint.Endpoint { +func listMessagesEndpoint(svc readers.MessageRepository, uauth mainflux.AuthServiceClient, taauth mainflux.AuthzServiceClient) endpoint.Endpoint { return func(ctx context.Context, request interface{}) (interface{}, error) { req := request.(listMessagesReq) - if err := req.validate(); err != nil { return nil, errors.Wrap(apiutil.ErrValidation, err) } - if err := authorize(ctx, req, ac); err != nil { - return nil, errors.Wrap(apiutil.ErrValidation, errors.Wrap(errors.ErrAuthorization, err)) + + if err := authorize(ctx, req, uauth, taauth); err != nil { + return nil, errors.Wrap(errors.ErrAuthorization, err) } + page, err := svc.ReadAll(req.chanID, req.pageMeta) if err != nil { return nil, err diff --git a/readers/api/endpoint_test.go b/readers/api/endpoint_test.go index 10252970..4a1f1faa 100644 --- a/readers/api/endpoint_test.go +++ b/readers/api/endpoint_test.go @@ -11,14 +11,17 @@ import ( "testing" "time" + "github.com/mainflux/mainflux" authmocks "github.com/mainflux/mainflux/auth/mocks" "github.com/mainflux/mainflux/internal/apiutil" + "github.com/mainflux/mainflux/internal/testsutil" "github.com/mainflux/mainflux/pkg/transformers/senml" - "github.com/mainflux/mainflux/pkg/uuid" "github.com/mainflux/mainflux/readers" "github.com/mainflux/mainflux/readers/api" "github.com/mainflux/mainflux/readers/mocks" + thmocks "github.com/mainflux/mainflux/things/mocks" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" ) const ( @@ -42,14 +45,10 @@ var ( vb = true vd = "dataValue" sum float64 = 42 - - idProvider = uuid.New() ) -func newServer(repo readers.MessageRepository) *httptest.Server { - auth := new(authmocks.Service) - - mux := api.MakeHandler(repo, auth, svcName, instanceID) +func newServer(repo readers.MessageRepository, ac *authmocks.Service, tc *thmocks.Service) *httptest.Server { + mux := api.MakeHandler(repo, ac, tc, svcName, instanceID) return httptest.NewServer(mux) } @@ -77,12 +76,9 @@ func (tr testRequest) make() (*http.Response, error) { } func TestReadAll(t *testing.T) { - chanID, err := idProvider.ID() - assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) - pubID, err := idProvider.ID() - assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) - pubID2, err := idProvider.ID() - assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + chanID := testsutil.GenerateUUID(t) + pubID := testsutil.GenerateUUID(t) + pubID2 := testsutil.GenerateUUID(t) now := time.Now().Unix() @@ -130,7 +126,9 @@ func TestReadAll(t *testing.T) { } repo := mocks.NewMessageRepository(chanID, fromSenml(messages)) - ts := newServer(repo) + auth := new(authmocks.Service) + tauth := new(thmocks.Service) + ts := newServer(repo, auth, tauth) defer ts.Close() cases := []struct { @@ -145,7 +143,7 @@ func TestReadAll(t *testing.T) { { desc: "read page with valid offset and limit", url: fmt.Sprintf("%s/channels/%s/messages?offset=0&limit=10", ts.URL, chanID), - key: thingToken, + token: userToken, status: http.StatusOK, res: pageRes{ Total: uint64(len(messages)), @@ -201,7 +199,7 @@ func TestReadAll(t *testing.T) { { desc: "read page with invalid token as thing", url: fmt.Sprintf("%s/channels/%s/messages?offset=0&limit=10", ts.URL, chanID), - token: invalid, + token: authmocks.InvalidValue, status: http.StatusUnauthorized, }, { @@ -482,7 +480,7 @@ func TestReadAll(t *testing.T) { { desc: "read page with invalid token as user", url: fmt.Sprintf("%s/channels/%s/messages?offset=0&limit=10", ts.URL, chanID), - token: invalid, + token: authmocks.InvalidValue, status: http.StatusUnauthorized, }, { @@ -697,7 +695,6 @@ func TestReadAll(t *testing.T) { token: userToken, status: http.StatusBadRequest, }, - { desc: "read page with non-float to as user", url: fmt.Sprintf("%s/channels/%s/messages?to=ABCD", ts.URL, chanID), @@ -717,6 +714,11 @@ func TestReadAll(t *testing.T) { } for _, tc := range cases { + repoCall := auth.On("Identify", mock.Anything, mock.Anything).Return(&mainflux.IdentityRes{Id: testsutil.GenerateUUID(t)}, nil) + repoCall1 := auth.On("Authorize", mock.Anything, mock.Anything).Return(&mainflux.AuthorizeRes{Authorized: true, Id: testsutil.GenerateUUID(t)}, nil) + if tc.key != "" { + repoCall1 = tauth.On("Authorize", mock.Anything, mock.Anything).Return(&mainflux.AuthorizeRes{Authorized: true, Id: testsutil.GenerateUUID(t)}, nil) + } req := testRequest{ client: ts.Client(), method: http.MethodGet, @@ -735,6 +737,8 @@ func TestReadAll(t *testing.T) { assert.Equal(t, tc.status, res.StatusCode, fmt.Sprintf("%s: expected %d got %d", tc.desc, tc.status, res.StatusCode)) assert.Equal(t, tc.res.Total, page.Total, fmt.Sprintf("%s: expected %d got %d", tc.desc, tc.res.Total, page.Total)) assert.ElementsMatch(t, tc.res.Messages, page.Messages, fmt.Sprintf("%s: got incorrect body from response", tc.desc)) + repoCall.Unset() + repoCall1.Unset() } } diff --git a/readers/api/transport.go b/readers/api/transport.go index 6fa9db24..a83418a9 100644 --- a/readers/api/transport.go +++ b/readers/api/transport.go @@ -39,35 +39,24 @@ const ( defOffset = 0 defFormat = "messages" - thingsKind = "things" - channelsKind = "channels" - usersKind = "users" - tokenKind = "token" - thingType = "thing" - channelType = "channel" - userType = "user" - groupType = "group" - memberRelation = "member" - groupRelation = "group" - administratorRelation = "administrator" - parentGroupRelation = "parent_group" - viewerRelation = "viewer" - adminPermission = "admin" - editPermission = "edit" - viewPermission = "view" + tokenKind = "token" + thingType = "thing" + userType = "user" + subscribePermission = "subscribe" + groupType = "group" ) var errUserAccess = errors.New("user has no permission") // MakeHandler returns a HTTP handler for API endpoints. -func MakeHandler(svc readers.MessageRepository, ac mainflux.AuthServiceClient, svcName, instanceID string) http.Handler { +func MakeHandler(svc readers.MessageRepository, uauth mainflux.AuthServiceClient, taauth mainflux.AuthzServiceClient, svcName, instanceID string) http.Handler { opts := []kithttp.ServerOption{ kithttp.ServerErrorEncoder(encodeError), } mux := bone.New() mux.Get("/channels/:chanID/messages", kithttp.NewServer( - listMessagesEndpoint(svc, ac), + listMessagesEndpoint(svc, uauth, taauth), decodeList, encodeResponse, opts..., @@ -209,6 +198,7 @@ func encodeError(_ context.Context, err error, w http.ResponseWriter) { errors.Contains(err, apiutil.ErrInvalidComparator): w.WriteHeader(http.StatusBadRequest) case errors.Contains(err, errors.ErrAuthentication), + errors.Contains(err, errors.ErrAuthorization), errors.Contains(err, apiutil.ErrBearerToken): w.WriteHeader(http.StatusUnauthorized) case errors.Contains(err, readers.ErrReadMessages): @@ -228,16 +218,31 @@ func encodeError(_ context.Context, err error, w http.ResponseWriter) { } } -func authorize(ctx context.Context, req listMessagesReq, ac mainflux.AuthServiceClient) (err error) { +func authorize(ctx context.Context, req listMessagesReq, uauth mainflux.AuthServiceClient, taauth mainflux.AuthzServiceClient) (err error) { switch { case req.token != "": - if _, err = ac.Authorize(ctx, &mainflux.AuthorizeReq{ + if _, err = uauth.Authorize(ctx, &mainflux.AuthorizeReq{ SubjectType: userType, SubjectKind: tokenKind, Subject: req.token, - Permission: viewPermission, - Object: channelType, - ObjectType: req.chanID, + Permission: subscribePermission, + ObjectType: groupType, + Object: req.chanID, + }); err != nil { + e, ok := status.FromError(err) + if ok && e.Code() == codes.PermissionDenied { + return errors.Wrap(errUserAccess, err) + } + return err + } + return nil + case req.key != "": + if _, err = taauth.Authorize(ctx, &mainflux.AuthorizeReq{ + SubjectType: groupType, + Subject: req.key, + ObjectType: thingType, + Object: req.chanID, + Permission: subscribePermission, }); err != nil { e, ok := status.FromError(err) if ok && e.Code() == codes.PermissionDenied { diff --git a/readers/cassandra/messages_test.go b/readers/cassandra/messages_test.go index 2bc1791a..277a9a6f 100644 --- a/readers/cassandra/messages_test.go +++ b/readers/cassandra/messages_test.go @@ -11,9 +11,9 @@ import ( cwriter "github.com/mainflux/mainflux/consumers/writers/cassandra" casclient "github.com/mainflux/mainflux/internal/clients/cassandra" + "github.com/mainflux/mainflux/internal/testsutil" "github.com/mainflux/mainflux/pkg/transformers/json" "github.com/mainflux/mainflux/pkg/transformers/senml" - "github.com/mainflux/mainflux/pkg/uuid" "github.com/mainflux/mainflux/readers" creader "github.com/mainflux/mainflux/readers/cassandra" "github.com/stretchr/testify/assert" @@ -42,8 +42,6 @@ var ( vb = true vd = "dataValue" sum float64 = 42 - - idProvider = uuid.New() ) func TestReadSenml(t *testing.T) { @@ -58,14 +56,10 @@ func TestReadSenml(t *testing.T) { require.Nil(t, err, fmt.Sprintf("failed to initialize to Cassandra: %s", err)) writer := cwriter.New(session) - chanID, err := idProvider.ID() - assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) - pubID, err := idProvider.ID() - assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) - pubID2, err := idProvider.ID() - assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) - wrongID, err := idProvider.ID() - assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + chanID := testsutil.GenerateUUID(t) + pubID := testsutil.GenerateUUID(t) + pubID2 := testsutil.GenerateUUID(t) + wrongID := testsutil.GenerateUUID(t) m := senml.Message{ Channel: chanID, @@ -422,8 +416,7 @@ func TestReadJSON(t *testing.T) { defer session.Close() writer := cwriter.New(session) - id1, err := idProvider.ID() - require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + id1 := testsutil.GenerateUUID(t) m := json.Message{ Channel: id1, Publisher: id1, @@ -455,8 +448,7 @@ func TestReadJSON(t *testing.T) { err = writer.ConsumeBlocking(context.TODO(), messages1) require.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err)) - id2, err := idProvider.ID() - require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + id2 := testsutil.GenerateUUID(t) m = json.Message{ Channel: id2, Publisher: id2, diff --git a/readers/influxdb/messages_test.go b/readers/influxdb/messages_test.go index eb25318b..9584fca1 100644 --- a/readers/influxdb/messages_test.go +++ b/readers/influxdb/messages_test.go @@ -11,9 +11,9 @@ import ( influxdb2 "github.com/influxdata/influxdb-client-go/v2" iwriter "github.com/mainflux/mainflux/consumers/writers/influxdb" + "github.com/mainflux/mainflux/internal/testsutil" "github.com/mainflux/mainflux/pkg/transformers/json" "github.com/mainflux/mainflux/pkg/transformers/senml" - "github.com/mainflux/mainflux/pkg/uuid" "github.com/mainflux/mainflux/readers" ireader "github.com/mainflux/mainflux/readers/influxdb" "github.com/stretchr/testify/assert" @@ -49,22 +49,15 @@ var ( Bucket: dbBucket, Org: dbOrg, } - idProvider = uuid.New() ) func TestReadSenml(t *testing.T) { asyncWriter := iwriter.NewAsync(client, repoCfg) - chanID, err := idProvider.ID() - assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) - assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) - pubID, err := idProvider.ID() - assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) - assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) - pubID2, err := idProvider.ID() - assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) - wrongID, err := idProvider.ID() - assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + chanID := testsutil.GenerateUUID(t) + pubID := testsutil.GenerateUUID(t) + pubID2 := testsutil.GenerateUUID(t) + wrongID := testsutil.GenerateUUID(t) m := senml.Message{ Channel: chanID, @@ -115,7 +108,7 @@ func TestReadSenml(t *testing.T) { errs := asyncWriter.Errors() asyncWriter.ConsumeAsync(context.TODO(), messages) - err = <-errs + err := <-errs assert.Nil(t, err, fmt.Sprintf("Save operation expected to succeed: %s.\n", err)) reader := ireader.New(client, repoCfg) @@ -566,8 +559,7 @@ func TestReadSenml(t *testing.T) { func TestReadJSON(t *testing.T) { asyncWriter := iwriter.NewAsync(client, repoCfg) - id1, err := idProvider.ID() - assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + id1 := testsutil.GenerateUUID(t) m := json.Message{ Channel: id1, Publisher: id1, @@ -592,11 +584,10 @@ func TestReadJSON(t *testing.T) { errs := asyncWriter.Errors() asyncWriter.ConsumeAsync(context.TODO(), messages1) - err = <-errs + err := <-errs require.Nil(t, err, fmt.Sprintf("Save operation expected to succeed: %s.\n", err)) - id2, err := idProvider.ID() - assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + id2 := testsutil.GenerateUUID(t) m = json.Message{ Channel: id2, Publisher: id2, diff --git a/readers/mongodb/messages_test.go b/readers/mongodb/messages_test.go index edd6da60..f102ee63 100644 --- a/readers/mongodb/messages_test.go +++ b/readers/mongodb/messages_test.go @@ -10,9 +10,9 @@ import ( "time" mwriter "github.com/mainflux/mainflux/consumers/writers/mongodb" + "github.com/mainflux/mainflux/internal/testsutil" "github.com/mainflux/mainflux/pkg/transformers/json" "github.com/mainflux/mainflux/pkg/transformers/senml" - "github.com/mainflux/mainflux/pkg/uuid" "github.com/mainflux/mainflux/readers" mreader "github.com/mainflux/mainflux/readers/mongodb" "github.com/stretchr/testify/assert" @@ -45,8 +45,6 @@ var ( vb = true vd = "dataValue" sum float64 = 42 - - idProvider = uuid.New() ) func TestReadSenml(t *testing.T) { @@ -56,14 +54,10 @@ func TestReadSenml(t *testing.T) { db := client.Database(testDB) writer := mwriter.New(db) - chanID, err := idProvider.ID() - assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) - pubID, err := idProvider.ID() - assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) - pubID2, err := idProvider.ID() - assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) - wrongID, err := idProvider.ID() - assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + chanID := testsutil.GenerateUUID(t) + pubID := testsutil.GenerateUUID(t) + pubID2 := testsutil.GenerateUUID(t) + wrongID := testsutil.GenerateUUID(t) m := senml.Message{ Channel: chanID, @@ -387,8 +381,7 @@ func TestReadJSON(t *testing.T) { db := client.Database(testDB) writer := mwriter.New(db) - id1, err := idProvider.ID() - require.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + id1 := testsutil.GenerateUUID(t) m := json.Message{ Channel: id1, Publisher: id1, @@ -419,8 +412,7 @@ func TestReadJSON(t *testing.T) { err = writer.ConsumeBlocking(context.TODO(), messages1) require.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err)) - id2, err := idProvider.ID() - assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + id2 := testsutil.GenerateUUID(t) m = json.Message{ Channel: id2, Publisher: id2, diff --git a/readers/postgres/messages_test.go b/readers/postgres/messages_test.go index ece6f785..c3dd54a3 100644 --- a/readers/postgres/messages_test.go +++ b/readers/postgres/messages_test.go @@ -10,9 +10,9 @@ import ( "time" pwriter "github.com/mainflux/mainflux/consumers/writers/postgres" + "github.com/mainflux/mainflux/internal/testsutil" "github.com/mainflux/mainflux/pkg/transformers/json" "github.com/mainflux/mainflux/pkg/transformers/senml" - "github.com/mainflux/mainflux/pkg/uuid" "github.com/mainflux/mainflux/readers" preader "github.com/mainflux/mainflux/readers/postgres" "github.com/stretchr/testify/assert" @@ -38,21 +38,15 @@ var ( vb = true vd = "dataValue" sum float64 = 42 - - idProvider = uuid.New() ) func TestReadSenml(t *testing.T) { writer := pwriter.New(db) - chanID, err := idProvider.ID() - assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) - pubID, err := idProvider.ID() - assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) - pubID2, err := idProvider.ID() - assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) - wrongID, err := idProvider.ID() - assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + chanID := testsutil.GenerateUUID(t) + pubID := testsutil.GenerateUUID(t) + pubID2 := testsutil.GenerateUUID(t) + wrongID := testsutil.GenerateUUID(t) m := senml.Message{ Channel: chanID, @@ -99,7 +93,7 @@ func TestReadSenml(t *testing.T) { messages = append(messages, msg) } - err = writer.ConsumeBlocking(context.TODO(), messages) + err := writer.ConsumeBlocking(context.TODO(), messages) require.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err)) reader := preader.New(db) @@ -527,8 +521,7 @@ func TestReadSenml(t *testing.T) { func TestReadJSON(t *testing.T) { writer := pwriter.New(db) - id1, err := idProvider.ID() - assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + id1 := testsutil.GenerateUUID(t) m := json.Message{ Channel: id1, Publisher: id1, @@ -557,11 +550,10 @@ func TestReadJSON(t *testing.T) { msgs1 = append(msgs1, m) } - err = writer.ConsumeBlocking(context.TODO(), messages1) + err := writer.ConsumeBlocking(context.TODO(), messages1) require.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err)) - id2, err := idProvider.ID() - assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + id2 := testsutil.GenerateUUID(t) m = json.Message{ Channel: id2, Publisher: id2, diff --git a/readers/timescale/messages_test.go b/readers/timescale/messages_test.go index e38fe0d4..7ef2f550 100644 --- a/readers/timescale/messages_test.go +++ b/readers/timescale/messages_test.go @@ -10,9 +10,9 @@ import ( "time" twriter "github.com/mainflux/mainflux/consumers/writers/timescale" + "github.com/mainflux/mainflux/internal/testsutil" "github.com/mainflux/mainflux/pkg/transformers/json" "github.com/mainflux/mainflux/pkg/transformers/senml" - "github.com/mainflux/mainflux/pkg/uuid" "github.com/mainflux/mainflux/readers" treader "github.com/mainflux/mainflux/readers/timescale" "github.com/stretchr/testify/assert" @@ -38,21 +38,15 @@ var ( vb = true vd = "dataValue" sum float64 = 42 - - idProvider = uuid.New() ) func TestReadSenml(t *testing.T) { writer := twriter.New(db) - chanID, err := idProvider.ID() - assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) - pubID, err := idProvider.ID() - assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) - pubID2, err := idProvider.ID() - assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) - wrongID, err := idProvider.ID() - assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + chanID := testsutil.GenerateUUID(t) + pubID := testsutil.GenerateUUID(t) + pubID2 := testsutil.GenerateUUID(t) + wrongID := testsutil.GenerateUUID(t) m := senml.Message{ Channel: chanID, @@ -99,7 +93,7 @@ func TestReadSenml(t *testing.T) { messages = append(messages, msg) } - err = writer.ConsumeBlocking(context.TODO(), messages) + err := writer.ConsumeBlocking(context.TODO(), messages) require.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err)) reader := treader.New(db) @@ -527,8 +521,7 @@ func TestReadSenml(t *testing.T) { func TestReadJSON(t *testing.T) { writer := twriter.New(db) - id1, err := idProvider.ID() - assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + id1 := testsutil.GenerateUUID(t) messages1 := json.Messages{ Format: format1, } @@ -559,11 +552,10 @@ func TestReadJSON(t *testing.T) { msgs1 = append(msgs1, mapped) } - err = writer.ConsumeBlocking(context.TODO(), messages1) + err := writer.ConsumeBlocking(context.TODO(), messages1) require.Nil(t, err, fmt.Sprintf("expected no error got %s\n", err)) - id2, err := idProvider.ID() - assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) + id2 := testsutil.GenerateUUID(t) messages2 := json.Messages{ Format: format2, } diff --git a/things/mocks/auth.go b/things/mocks/auth.go new file mode 100644 index 00000000..9959c4e1 --- /dev/null +++ b/things/mocks/auth.go @@ -0,0 +1,31 @@ +// Copyright (c) Mainflux +// SPDX-License-Identifier: Apache-2.0 + +package mocks + +import ( + "context" + + "github.com/mainflux/mainflux" + "github.com/mainflux/mainflux/pkg/errors" + "github.com/stretchr/testify/mock" + "google.golang.org/grpc" +) + +var _ mainflux.AuthzServiceClient = (*Service)(nil) + +type Service struct { + mock.Mock +} + +func (m *Service) Authorize(ctx context.Context, in *mainflux.AuthorizeReq, opts ...grpc.CallOption) (*mainflux.AuthorizeRes, error) { + ret := m.Called(ctx, in) + if in.GetSubject() == WrongID || in.GetSubject() == "" { + return &mainflux.AuthorizeRes{}, errors.ErrAuthorization + } + if in.GetObject() == WrongID || in.GetObject() == "" { + return &mainflux.AuthorizeRes{}, errors.ErrAuthorization + } + + return ret.Get(0).(*mainflux.AuthorizeRes), ret.Error(1) +}