1
0
mirror of https://github.com/mainflux/mainflux.git synced 2025-04-26 13:48:53 +08:00

324 lines
8.9 KiB
Go
Raw Normal View History

//
// Copyright (c) 2018
// Mainflux
//
// SPDX-License-Identifier: Apache-2.0
//
package things
2018-05-11 01:00:10 +02:00
import (
"context"
"errors"
"time"
"github.com/mainflux/mainflux"
)
var (
// ErrConflict indicates usage of the existing email during account
// registration.
2018-05-10 23:53:25 +02:00
ErrConflict = errors.New("email already taken")
// ErrMalformedEntity indicates malformed entity specification (e.g.
// invalid username or password).
2018-05-10 23:53:25 +02:00
ErrMalformedEntity = errors.New("malformed entity specification")
// ErrUnauthorizedAccess indicates missing or invalid credentials provided
// when accessing a protected resource.
2018-05-10 23:53:25 +02:00
ErrUnauthorizedAccess = errors.New("missing or invalid credentials provided")
// ErrNotFound indicates a non-existent entity request.
2018-05-10 23:53:25 +02:00
ErrNotFound = errors.New("non-existent entity")
)
// Service specifies an API that must be fullfiled by the domain service
// implementation, and all of its decorators (e.g. logging & metrics).
type Service interface {
// AddThing adds new thing to the user identified by the provided key.
AddThing(string, Thing) (Thing, error)
// UpdateThing updates the thing identified by the provided ID, that
// belongs to the user identified by the provided key.
UpdateThing(string, Thing) error
// ViewThing retrieves data about the thing identified with the provided
// ID, that belongs to the user identified by the provided key.
ViewThing(string, uint64) (Thing, error)
// ListThings retrieves data about subset of things that belongs to the
MF-200 - Enable pagination of result sets (#227) * Add pagination to clients and channels endpoints Signed-off-by: Dušan Borovčanin <borovcanindusan1@gmail.com> * Refactor code Change method signature and rename Bulk methods back to All. Signed-off-by: Dušan Borovčanin <borovcanindusan1@gmail.com> * Rename transport_test.go to endpoint_test.go Signed-off-by: Dušan Borovčanin <borovcanindusan1@gmail.com> * Fix manager tests to support pagination Signed-off-by: Dušan Borovčanin <borovcanindusan1@gmail.com> * Add default offset and limit support Signed-off-by: Dušan Borovčanin <borovcanindusan1@gmail.com> * Update docs Signed-off-by: Dušan Borovčanin <borovcanindusan1@gmail.com> * Update tests to support pagination - Move maxLimitSize checking to request validation. - Add tests to support pagination. Signed-off-by: Dušan Borovčanin <borovcanindusan1@gmail.com> * Fix handling query params for pagination Signed-off-by: Dušan Borovčanin <borovcanindusan1@gmail.com> * Fix empty result set Return empty results if invalid offset and limit is passed to channel and client repository. Update tests accordingly. Signed-off-by: Dušan Borovčanin <borovcanindusan1@gmail.com> * Update manager API docs Signed-off-by: Dušan Borovčanin <borovcanindusan1@gmail.com> * Fix response to invalid limit query param Signed-off-by: Dušan Borovčanin <borovcanindusan1@gmail.com> * Remove offset and limmit checks in repository methods Signed-off-by: Dušan Borovčanin <borovcanindusan1@gmail.com>
2018-04-18 22:36:24 +02:00
// user identified by the provided key.
ListThings(string, int, int) ([]Thing, error)
// RemoveThing removes the thing identified with the provided ID, that
// belongs to the user identified by the provided key.
RemoveThing(string, uint64) error
// CreateChannel adds new channel to the user identified by the provided key.
CreateChannel(string, Channel) (Channel, error)
// UpdateChannel updates the channel identified by the provided ID, that
// belongs to the user identified by the provided key.
UpdateChannel(string, Channel) error
// ViewChannel retrieves data about the channel identified by the provided
// ID, that belongs to the user identified by the provided key.
ViewChannel(string, uint64) (Channel, error)
MF-200 - Enable pagination of result sets (#227) * Add pagination to clients and channels endpoints Signed-off-by: Dušan Borovčanin <borovcanindusan1@gmail.com> * Refactor code Change method signature and rename Bulk methods back to All. Signed-off-by: Dušan Borovčanin <borovcanindusan1@gmail.com> * Rename transport_test.go to endpoint_test.go Signed-off-by: Dušan Borovčanin <borovcanindusan1@gmail.com> * Fix manager tests to support pagination Signed-off-by: Dušan Borovčanin <borovcanindusan1@gmail.com> * Add default offset and limit support Signed-off-by: Dušan Borovčanin <borovcanindusan1@gmail.com> * Update docs Signed-off-by: Dušan Borovčanin <borovcanindusan1@gmail.com> * Update tests to support pagination - Move maxLimitSize checking to request validation. - Add tests to support pagination. Signed-off-by: Dušan Borovčanin <borovcanindusan1@gmail.com> * Fix handling query params for pagination Signed-off-by: Dušan Borovčanin <borovcanindusan1@gmail.com> * Fix empty result set Return empty results if invalid offset and limit is passed to channel and client repository. Update tests accordingly. Signed-off-by: Dušan Borovčanin <borovcanindusan1@gmail.com> * Update manager API docs Signed-off-by: Dušan Borovčanin <borovcanindusan1@gmail.com> * Fix response to invalid limit query param Signed-off-by: Dušan Borovčanin <borovcanindusan1@gmail.com> * Remove offset and limmit checks in repository methods Signed-off-by: Dušan Borovčanin <borovcanindusan1@gmail.com>
2018-04-18 22:36:24 +02:00
// ListChannels retrieves data about subset of channels that belongs to the
// user identified by the provided key.
ListChannels(string, int, int) ([]Channel, error)
// RemoveChannel removes the thing identified by the provided ID, that
// belongs to the user identified by the provided key.
RemoveChannel(string, uint64) error
// Connect adds thing to the channel's list of connected things.
Connect(string, uint64, uint64) error
Use PostgreSQL as primary persistence solution (#175) * Use normalizer as stream source Renamed 'writer' service to 'normalizer' and dropped Cassandra facilities from it. Extracted the common dependencies to 'mainflux' package for easier sharing. Fixed the API docs and unified environment variables. Signed-off-by: Dejan Mijic <dejan@mainflux.com> * Use docker build arguments to specify build Signed-off-by: Dejan Mijic <dejan@mainflux.com> * Remove cassandra libraries Signed-off-by: Dejan Mijic <dejan@mainflux.com> * Update go-kit version to 0.6.0 Signed-off-by: Dejan Mijic <dejan@mainflux.com> * Fix manager configuration Signed-off-by: Dejan Mijic <dejan@mainflux.com> * Refactor docker-compose Merged individual compose files and dropped external links. Remove CoAP container since it is not referenced from NginX config at the moment. Update port mapping in compose and nginx.conf. Dropped bin scripts. Updated service documentation. Signed-off-by: Dejan Mijic <dejan@mainflux.com> * Drop content-type check Signed-off-by: Dejan Mijic <dejan@mainflux.com> * Implement users data access layer in PostgreSQL Signed-off-by: Dejan Mijic <dejan@mainflux.com> * Bump version to 0.1.0 Signed-off-by: Dejan Mijic <dejan@mainflux.com> * Use go-kit logger everywhere (except CoAP) Signed-off-by: Dejan Mijic <dejan@mainflux.com> * Improve factory methods naming Signed-off-by: Dejan Mijic <dejan@mainflux.com> * Implement clients data access layer on PostgreSQL Signed-off-by: Dejan Mijic <dejan@mainflux.com> * Make tests stateless All tests are refactored to use map-based table-driven tests. No cross-tests dependencies is present anymore. Signed-off-by: Dejan Mijic <dejan@mainflux.com> * Remove gitignore Signed-off-by: Dejan Mijic <dejan@mainflux.com> * Fix nginx proxying Signed-off-by: Dejan Mijic <dejan@mainflux.com> * Mark client-user FK explicit Signed-off-by: Dejan Mijic <dejan@mainflux.com> * Update API documentation Signed-off-by: Dejan Mijic <dejan@mainflux.com> * Update channel model Signed-off-by: Dejan Mijic <dejan@mainflux.com> * Add channel PostgreSQL repository tests Signed-off-by: Dejan Mijic <dejan@mainflux.com> * Implement PostgreSQL channels DAO Replaced update queries with raw SQL. Explicitly defined M2M table due to difficulties of ensuring the referential integrity through GORM. Signed-off-by: Dejan Mijic <dejan@mainflux.com> * Expose connection endpoints Signed-off-by: Dejan Mijic <dejan@mainflux.com> * Fix swagger docs and remove DB logging Signed-off-by: Dejan Mijic <dejan@mainflux.com> * Fix nested query remarks Signed-off-by: Dejan Mijic <dejan@mainflux.com> * Add unique indices Signed-off-by: Dejan Mijic <dejan@mainflux.com>
2018-03-11 18:06:01 +01:00
// Disconnect removes thing from the channel's list of connected
// things.
Disconnect(string, uint64, uint64) error
Use PostgreSQL as primary persistence solution (#175) * Use normalizer as stream source Renamed 'writer' service to 'normalizer' and dropped Cassandra facilities from it. Extracted the common dependencies to 'mainflux' package for easier sharing. Fixed the API docs and unified environment variables. Signed-off-by: Dejan Mijic <dejan@mainflux.com> * Use docker build arguments to specify build Signed-off-by: Dejan Mijic <dejan@mainflux.com> * Remove cassandra libraries Signed-off-by: Dejan Mijic <dejan@mainflux.com> * Update go-kit version to 0.6.0 Signed-off-by: Dejan Mijic <dejan@mainflux.com> * Fix manager configuration Signed-off-by: Dejan Mijic <dejan@mainflux.com> * Refactor docker-compose Merged individual compose files and dropped external links. Remove CoAP container since it is not referenced from NginX config at the moment. Update port mapping in compose and nginx.conf. Dropped bin scripts. Updated service documentation. Signed-off-by: Dejan Mijic <dejan@mainflux.com> * Drop content-type check Signed-off-by: Dejan Mijic <dejan@mainflux.com> * Implement users data access layer in PostgreSQL Signed-off-by: Dejan Mijic <dejan@mainflux.com> * Bump version to 0.1.0 Signed-off-by: Dejan Mijic <dejan@mainflux.com> * Use go-kit logger everywhere (except CoAP) Signed-off-by: Dejan Mijic <dejan@mainflux.com> * Improve factory methods naming Signed-off-by: Dejan Mijic <dejan@mainflux.com> * Implement clients data access layer on PostgreSQL Signed-off-by: Dejan Mijic <dejan@mainflux.com> * Make tests stateless All tests are refactored to use map-based table-driven tests. No cross-tests dependencies is present anymore. Signed-off-by: Dejan Mijic <dejan@mainflux.com> * Remove gitignore Signed-off-by: Dejan Mijic <dejan@mainflux.com> * Fix nginx proxying Signed-off-by: Dejan Mijic <dejan@mainflux.com> * Mark client-user FK explicit Signed-off-by: Dejan Mijic <dejan@mainflux.com> * Update API documentation Signed-off-by: Dejan Mijic <dejan@mainflux.com> * Update channel model Signed-off-by: Dejan Mijic <dejan@mainflux.com> * Add channel PostgreSQL repository tests Signed-off-by: Dejan Mijic <dejan@mainflux.com> * Implement PostgreSQL channels DAO Replaced update queries with raw SQL. Explicitly defined M2M table due to difficulties of ensuring the referential integrity through GORM. Signed-off-by: Dejan Mijic <dejan@mainflux.com> * Expose connection endpoints Signed-off-by: Dejan Mijic <dejan@mainflux.com> * Fix swagger docs and remove DB logging Signed-off-by: Dejan Mijic <dejan@mainflux.com> * Fix nested query remarks Signed-off-by: Dejan Mijic <dejan@mainflux.com> * Add unique indices Signed-off-by: Dejan Mijic <dejan@mainflux.com>
2018-03-11 18:06:01 +01:00
// CanAccess determines whether the channel can be accessed using the
// provided key and returns thing's id if access is allowed.
CanAccess(uint64, string) (uint64, error)
// Identify returns thing ID for given thing key.
Identify(string) (uint64, error)
}
2018-05-11 01:00:10 +02:00
var _ Service = (*thingsService)(nil)
2018-05-11 01:00:10 +02:00
type thingsService struct {
users mainflux.UsersServiceClient
things ThingRepository
channels ChannelRepository
channelCache ChannelCache
thingCache ThingCache
idp IdentityProvider
2018-05-11 01:00:10 +02:00
}
// New instantiates the things service implementation.
func New(users mainflux.UsersServiceClient, things ThingRepository, channels ChannelRepository, ccache ChannelCache, tcache ThingCache, idp IdentityProvider) Service {
return &thingsService{
users: users,
things: things,
channels: channels,
channelCache: ccache,
thingCache: tcache,
idp: idp,
2018-05-11 01:00:10 +02:00
}
}
func (ts *thingsService) AddThing(key string, thing Thing) (Thing, error) {
2018-05-11 01:00:10 +02:00
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
res, err := ts.users.Identify(ctx, &mainflux.Token{Value: key})
2018-05-11 01:00:10 +02:00
if err != nil {
return Thing{}, ErrUnauthorizedAccess
2018-05-11 01:00:10 +02:00
}
thing.Owner = res.GetValue()
thing.Key = ts.idp.ID()
id, err := ts.things.Save(thing)
if err != nil {
return Thing{}, err
}
2018-05-11 01:00:10 +02:00
thing.ID = id
return thing, nil
2018-05-11 01:00:10 +02:00
}
func (ts *thingsService) UpdateThing(key string, thing Thing) error {
2018-05-11 01:00:10 +02:00
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
res, err := ts.users.Identify(ctx, &mainflux.Token{Value: key})
2018-05-11 01:00:10 +02:00
if err != nil {
return ErrUnauthorizedAccess
}
thing.Owner = res.GetValue()
2018-05-11 01:00:10 +02:00
return ts.things.Update(thing)
2018-05-11 01:00:10 +02:00
}
func (ts *thingsService) ViewThing(key string, id uint64) (Thing, error) {
2018-05-11 01:00:10 +02:00
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
res, err := ts.users.Identify(ctx, &mainflux.Token{Value: key})
2018-05-11 01:00:10 +02:00
if err != nil {
return Thing{}, ErrUnauthorizedAccess
2018-05-11 01:00:10 +02:00
}
return ts.things.RetrieveByID(res.GetValue(), id)
2018-05-11 01:00:10 +02:00
}
func (ts *thingsService) ListThings(key string, offset, limit int) ([]Thing, error) {
2018-05-11 01:00:10 +02:00
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
res, err := ts.users.Identify(ctx, &mainflux.Token{Value: key})
2018-05-11 01:00:10 +02:00
if err != nil {
return nil, ErrUnauthorizedAccess
}
return ts.things.RetrieveAll(res.GetValue(), offset, limit), nil
2018-05-11 01:00:10 +02:00
}
func (ts *thingsService) RemoveThing(key string, id uint64) error {
2018-05-11 01:00:10 +02:00
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
res, err := ts.users.Identify(ctx, &mainflux.Token{Value: key})
2018-05-11 01:00:10 +02:00
if err != nil {
return ErrUnauthorizedAccess
}
ts.thingCache.Remove(id)
return ts.things.Remove(res.GetValue(), id)
2018-05-11 01:00:10 +02:00
}
func (ts *thingsService) CreateChannel(key string, channel Channel) (Channel, error) {
2018-05-11 01:00:10 +02:00
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
res, err := ts.users.Identify(ctx, &mainflux.Token{Value: key})
2018-05-11 01:00:10 +02:00
if err != nil {
return Channel{}, ErrUnauthorizedAccess
2018-05-11 01:00:10 +02:00
}
channel.Owner = res.GetValue()
id, err := ts.channels.Save(channel)
if err != nil {
return Channel{}, err
}
channel.ID = id
return channel, nil
2018-05-11 01:00:10 +02:00
}
func (ts *thingsService) UpdateChannel(key string, channel Channel) error {
2018-05-11 01:00:10 +02:00
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
res, err := ts.users.Identify(ctx, &mainflux.Token{Value: key})
2018-05-11 01:00:10 +02:00
if err != nil {
return ErrUnauthorizedAccess
}
channel.Owner = res.GetValue()
return ts.channels.Update(channel)
2018-05-11 01:00:10 +02:00
}
func (ts *thingsService) ViewChannel(key string, id uint64) (Channel, error) {
2018-05-11 01:00:10 +02:00
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
res, err := ts.users.Identify(ctx, &mainflux.Token{Value: key})
2018-05-11 01:00:10 +02:00
if err != nil {
return Channel{}, ErrUnauthorizedAccess
}
return ts.channels.RetrieveByID(res.GetValue(), id)
2018-05-11 01:00:10 +02:00
}
func (ts *thingsService) ListChannels(key string, offset, limit int) ([]Channel, error) {
2018-05-11 01:00:10 +02:00
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
res, err := ts.users.Identify(ctx, &mainflux.Token{Value: key})
2018-05-11 01:00:10 +02:00
if err != nil {
return nil, ErrUnauthorizedAccess
}
return ts.channels.RetrieveAll(res.GetValue(), offset, limit), nil
2018-05-11 01:00:10 +02:00
}
func (ts *thingsService) RemoveChannel(key string, id uint64) error {
2018-05-11 01:00:10 +02:00
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
res, err := ts.users.Identify(ctx, &mainflux.Token{Value: key})
2018-05-11 01:00:10 +02:00
if err != nil {
return ErrUnauthorizedAccess
}
ts.channelCache.Remove(id)
return ts.channels.Remove(res.GetValue(), id)
2018-05-11 01:00:10 +02:00
}
func (ts *thingsService) Connect(key string, chanID, thingID uint64) error {
2018-05-11 01:00:10 +02:00
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
res, err := ts.users.Identify(ctx, &mainflux.Token{Value: key})
2018-05-11 01:00:10 +02:00
if err != nil {
return ErrUnauthorizedAccess
}
return ts.channels.Connect(res.GetValue(), chanID, thingID)
2018-05-11 01:00:10 +02:00
}
func (ts *thingsService) Disconnect(key string, chanID, thingID uint64) error {
2018-05-11 01:00:10 +02:00
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
res, err := ts.users.Identify(ctx, &mainflux.Token{Value: key})
2018-05-11 01:00:10 +02:00
if err != nil {
return ErrUnauthorizedAccess
}
ts.channelCache.Disconnect(chanID, thingID)
return ts.channels.Disconnect(res.GetValue(), chanID, thingID)
2018-05-11 01:00:10 +02:00
}
func (ts *thingsService) CanAccess(chanID uint64, key string) (uint64, error) {
thingID, err := ts.hasThing(chanID, key)
if err == nil {
return thingID, nil
}
thingID, err = ts.channels.HasThing(chanID, key)
2018-05-11 01:00:10 +02:00
if err != nil {
return 0, ErrUnauthorizedAccess
2018-05-11 01:00:10 +02:00
}
ts.thingCache.Save(key, thingID)
ts.channelCache.Connect(chanID, thingID)
return thingID, nil
2018-05-11 01:00:10 +02:00
}
func (ts *thingsService) Identify(key string) (uint64, error) {
id, err := ts.thingCache.ID(key)
if err == nil {
return id, nil
}
id, err = ts.things.RetrieveByKey(key)
if err != nil {
return 0, ErrUnauthorizedAccess
}
ts.thingCache.Save(key, id)
return id, nil
}
func (ts *thingsService) hasThing(chanID uint64, key string) (uint64, error) {
thingID, err := ts.thingCache.ID(key)
if err != nil {
return 0, err
}
if connected := ts.channelCache.HasThing(chanID, thingID); !connected {
return 0, ErrUnauthorizedAccess
}
return thingID, nil
}