1
0
mirror of https://github.com/mainflux/mainflux.git synced 2025-04-27 13:48:49 +08:00
Mirko Teodorovic 530f925c4d
MF-1346 - Create Groups API - add grouping of entities (#1334)
* remove owner id

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* add users endpoint for retrieving users from group

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* remove  groups from things and users

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* move groups into auth

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* separate endpoints for users and things

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* fix problems with retrieving members

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* add groups test

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* remove groups from users

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* remove groups from things

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* rename constant

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* add new errors

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* remove unnecessary constants

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* fix validation

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* create groups db mock

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* adding tests

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* revert changes to docker related files

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* remove groups endpoints from users openapi

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* remove groups endpoints from users openapi

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* move constant from postgres to groups

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* move constant from postgres to groups

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* move constant from postgres to groups

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* remove testing group

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* renam typ to groupType

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* add error for max level

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* remove print

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* remove groups.Member interface

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* fix query building and add test cases

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* uncomment tests

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* move groups package

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* remove group type, add bulk assign and unassign

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* update openapi, remove parentID from create request, reorder endpoints

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* update openapi

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* update openapi for users and things

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* fix groups test

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* fix linter errors

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* resolve comments

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* rename assignReq structure

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* refactor mocks, response, remove type from endpoint

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* some refactor, renaming, errors

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* simplify check

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* remove package alias

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* fix naming and comment

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* additional comments

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* add members grpc endpoint test

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* fix retrieving members for different types

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* fix retrieving members for different types

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* remove unecessary structure

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* fix api grpc

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* rename const

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* refactore retrieve parents and children with common function

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* small changes for errors

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* fix compile error

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* fix sorting in mock

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* remove regexp for groups

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* revert as change is made by mistake

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* revert as change is made by mistake

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* refactor groups and keys package

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* fix naming

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* fix naming

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* fix test for timestamp compare

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* fix error handling

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* remove errors not being used

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* var renaming

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* resolve comments

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* minor changes

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* fix test

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* add endpoints for groups into nginx

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* reorganize endpoints, remove some errors

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* reorganize endpoints, remove some errors

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* small fix

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* fix linter errors

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* minor changes

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* resolve comments

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* fix group save path problem

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* description constant

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* rename variables

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* fix validation

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* get back return

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>

* fix compile

Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com>
2021-03-04 10:29:03 +01:00

379 lines
12 KiB
Go

// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package main
import (
"fmt"
"io"
"io/ioutil"
"log"
"net"
"net/http"
"os"
"os/signal"
"strconv"
"syscall"
"time"
"github.com/mainflux/mainflux/things/tracing"
"github.com/jmoiron/sqlx"
opentracing "github.com/opentracing/opentracing-go"
"google.golang.org/grpc/credentials"
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
"github.com/go-redis/redis"
"github.com/mainflux/mainflux"
authapi "github.com/mainflux/mainflux/auth/api/grpc"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/pkg/uuid"
"github.com/mainflux/mainflux/things"
"github.com/mainflux/mainflux/things/api"
authgrpcapi "github.com/mainflux/mainflux/things/api/auth/grpc"
authhttpapi "github.com/mainflux/mainflux/things/api/auth/http"
thhttpapi "github.com/mainflux/mainflux/things/api/things/http"
"github.com/mainflux/mainflux/things/postgres"
rediscache "github.com/mainflux/mainflux/things/redis"
localusers "github.com/mainflux/mainflux/things/users"
stdprometheus "github.com/prometheus/client_golang/prometheus"
jconfig "github.com/uber/jaeger-client-go/config"
"google.golang.org/grpc"
)
const (
defLogLevel = "error"
defDBHost = "localhost"
defDBPort = "5432"
defDBUser = "mainflux"
defDBPass = "mainflux"
defDB = "things"
defDBSSLMode = "disable"
defDBSSLCert = ""
defDBSSLKey = ""
defDBSSLRootCert = ""
defClientTLS = "false"
defCACerts = ""
defCacheURL = "localhost:6379"
defCachePass = ""
defCacheDB = "0"
defESURL = "localhost:6379"
defESPass = ""
defESDB = "0"
defHTTPPort = "8182"
defAuthHTTPPort = "8180"
defAuthGRPCPort = "8181"
defServerCert = ""
defServerKey = ""
defSingleUserEmail = ""
defSingleUserToken = ""
defJaegerURL = ""
defAuthURL = "localhost:8181"
defAuthTimeout = "1s"
envLogLevel = "MF_THINGS_LOG_LEVEL"
envDBHost = "MF_THINGS_DB_HOST"
envDBPort = "MF_THINGS_DB_PORT"
envDBUser = "MF_THINGS_DB_USER"
envDBPass = "MF_THINGS_DB_PASS"
envDB = "MF_THINGS_DB"
envDBSSLMode = "MF_THINGS_DB_SSL_MODE"
envDBSSLCert = "MF_THINGS_DB_SSL_CERT"
envDBSSLKey = "MF_THINGS_DB_SSL_KEY"
envDBSSLRootCert = "MF_THINGS_DB_SSL_ROOT_CERT"
envClientTLS = "MF_THINGS_CLIENT_TLS"
envCACerts = "MF_THINGS_CA_CERTS"
envCacheURL = "MF_THINGS_CACHE_URL"
envCachePass = "MF_THINGS_CACHE_PASS"
envCacheDB = "MF_THINGS_CACHE_DB"
envESURL = "MF_THINGS_ES_URL"
envESPass = "MF_THINGS_ES_PASS"
envESDB = "MF_THINGS_ES_DB"
envHTTPPort = "MF_THINGS_HTTP_PORT"
envAuthHTTPPort = "MF_THINGS_AUTH_HTTP_PORT"
envAuthGRPCPort = "MF_THINGS_AUTH_GRPC_PORT"
envServerCert = "MF_THINGS_SERVER_CERT"
envServerKey = "MF_THINGS_SERVER_KEY"
envSingleUserEmail = "MF_THINGS_SINGLE_USER_EMAIL"
envSingleUserToken = "MF_THINGS_SINGLE_USER_TOKEN"
envJaegerURL = "MF_JAEGER_URL"
envAuthURL = "MF_AUTH_GRPC_URL"
envAuthTimeout = "MF_AUTH_GRPC_TIMEOUT"
)
type config struct {
logLevel string
dbConfig postgres.Config
clientTLS bool
caCerts string
cacheURL string
cachePass string
cacheDB string
esURL string
esPass string
esDB string
httpPort string
authHTTPPort string
authGRPCPort string
serverCert string
serverKey string
singleUserEmail string
singleUserToken string
jaegerURL string
authURL string
authTimeout time.Duration
}
func main() {
cfg := loadConfig()
logger, err := logger.New(os.Stdout, cfg.logLevel)
if err != nil {
log.Fatalf(err.Error())
}
thingsTracer, thingsCloser := initJaeger("things", cfg.jaegerURL, logger)
defer thingsCloser.Close()
cacheClient := connectToRedis(cfg.cacheURL, cfg.cachePass, cfg.cacheDB, logger)
esClient := connectToRedis(cfg.esURL, cfg.esPass, cfg.esDB, logger)
db := connectToDB(cfg.dbConfig, logger)
defer db.Close()
authTracer, authCloser := initJaeger("auth", cfg.jaegerURL, logger)
defer authCloser.Close()
auth, close := createAuthClient(cfg, authTracer, logger)
if close != nil {
defer close()
}
dbTracer, dbCloser := initJaeger("things_db", cfg.jaegerURL, logger)
defer dbCloser.Close()
cacheTracer, cacheCloser := initJaeger("things_cache", cfg.jaegerURL, logger)
defer cacheCloser.Close()
svc := newService(auth, dbTracer, cacheTracer, db, cacheClient, esClient, logger)
errs := make(chan error, 2)
go startHTTPServer(thhttpapi.MakeHandler(thingsTracer, svc), cfg.httpPort, cfg, logger, errs)
go startHTTPServer(authhttpapi.MakeHandler(thingsTracer, svc), cfg.authHTTPPort, cfg, logger, errs)
go startGRPCServer(svc, thingsTracer, cfg, logger, errs)
go func() {
c := make(chan os.Signal)
signal.Notify(c, syscall.SIGINT)
errs <- fmt.Errorf("%s", <-c)
}()
err = <-errs
logger.Error(fmt.Sprintf("Things service terminated: %s", err))
}
func loadConfig() config {
tls, err := strconv.ParseBool(mainflux.Env(envClientTLS, defClientTLS))
if err != nil {
log.Fatalf("Invalid value passed for %s\n", envClientTLS)
}
authTimeout, err := time.ParseDuration(mainflux.Env(envAuthTimeout, defAuthTimeout))
if err != nil {
log.Fatalf("Invalid %s value: %s", envAuthTimeout, err.Error())
}
dbConfig := postgres.Config{
Host: mainflux.Env(envDBHost, defDBHost),
Port: mainflux.Env(envDBPort, defDBPort),
User: mainflux.Env(envDBUser, defDBUser),
Pass: mainflux.Env(envDBPass, defDBPass),
Name: mainflux.Env(envDB, defDB),
SSLMode: mainflux.Env(envDBSSLMode, defDBSSLMode),
SSLCert: mainflux.Env(envDBSSLCert, defDBSSLCert),
SSLKey: mainflux.Env(envDBSSLKey, defDBSSLKey),
SSLRootCert: mainflux.Env(envDBSSLRootCert, defDBSSLRootCert),
}
return config{
logLevel: mainflux.Env(envLogLevel, defLogLevel),
dbConfig: dbConfig,
clientTLS: tls,
caCerts: mainflux.Env(envCACerts, defCACerts),
cacheURL: mainflux.Env(envCacheURL, defCacheURL),
cachePass: mainflux.Env(envCachePass, defCachePass),
cacheDB: mainflux.Env(envCacheDB, defCacheDB),
esURL: mainflux.Env(envESURL, defESURL),
esPass: mainflux.Env(envESPass, defESPass),
esDB: mainflux.Env(envESDB, defESDB),
httpPort: mainflux.Env(envHTTPPort, defHTTPPort),
authHTTPPort: mainflux.Env(envAuthHTTPPort, defAuthHTTPPort),
authGRPCPort: mainflux.Env(envAuthGRPCPort, defAuthGRPCPort),
serverCert: mainflux.Env(envServerCert, defServerCert),
serverKey: mainflux.Env(envServerKey, defServerKey),
singleUserEmail: mainflux.Env(envSingleUserEmail, defSingleUserEmail),
singleUserToken: mainflux.Env(envSingleUserToken, defSingleUserToken),
jaegerURL: mainflux.Env(envJaegerURL, defJaegerURL),
authURL: mainflux.Env(envAuthURL, defAuthURL),
authTimeout: authTimeout,
}
}
func initJaeger(svcName, url string, logger logger.Logger) (opentracing.Tracer, io.Closer) {
if url == "" {
return opentracing.NoopTracer{}, ioutil.NopCloser(nil)
}
tracer, closer, err := jconfig.Configuration{
ServiceName: svcName,
Sampler: &jconfig.SamplerConfig{
Type: "const",
Param: 1,
},
Reporter: &jconfig.ReporterConfig{
LocalAgentHostPort: url,
LogSpans: true,
},
}.NewTracer()
if err != nil {
logger.Error(fmt.Sprintf("Failed to init Jaeger client: %s", err))
os.Exit(1)
}
return tracer, closer
}
func connectToRedis(cacheURL, cachePass string, cacheDB string, logger logger.Logger) *redis.Client {
db, err := strconv.Atoi(cacheDB)
if err != nil {
logger.Error(fmt.Sprintf("Failed to connect to cache: %s", err))
os.Exit(1)
}
return redis.NewClient(&redis.Options{
Addr: cacheURL,
Password: cachePass,
DB: db,
})
}
func connectToDB(dbConfig postgres.Config, logger logger.Logger) *sqlx.DB {
db, err := postgres.Connect(dbConfig)
if err != nil {
logger.Error(fmt.Sprintf("Failed to connect to postgres: %s", err))
os.Exit(1)
}
return db
}
func createAuthClient(cfg config, tracer opentracing.Tracer, logger logger.Logger) (mainflux.AuthServiceClient, func() error) {
if cfg.singleUserEmail != "" && cfg.singleUserToken != "" {
return localusers.NewSingleUserService(cfg.singleUserEmail, cfg.singleUserToken), nil
}
conn := connectToAuth(cfg, logger)
return authapi.NewClient(tracer, conn, cfg.authTimeout), conn.Close
}
func connectToAuth(cfg config, logger logger.Logger) *grpc.ClientConn {
var opts []grpc.DialOption
if cfg.clientTLS {
if cfg.caCerts != "" {
tpc, err := credentials.NewClientTLSFromFile(cfg.caCerts, "")
if err != nil {
logger.Error(fmt.Sprintf("Failed to create tls credentials: %s", err))
os.Exit(1)
}
opts = append(opts, grpc.WithTransportCredentials(tpc))
}
} else {
opts = append(opts, grpc.WithInsecure())
logger.Info("gRPC communication is not encrypted")
}
conn, err := grpc.Dial(cfg.authURL, opts...)
if err != nil {
logger.Error(fmt.Sprintf("Failed to connect to auth service: %s", err))
os.Exit(1)
}
return conn
}
func newService(auth mainflux.AuthServiceClient, dbTracer opentracing.Tracer, cacheTracer opentracing.Tracer, db *sqlx.DB, cacheClient *redis.Client, esClient *redis.Client, logger logger.Logger) things.Service {
database := postgres.NewDatabase(db)
thingsRepo := postgres.NewThingRepository(database)
thingsRepo = tracing.ThingRepositoryMiddleware(dbTracer, thingsRepo)
channelsRepo := postgres.NewChannelRepository(database)
channelsRepo = tracing.ChannelRepositoryMiddleware(dbTracer, channelsRepo)
chanCache := rediscache.NewChannelCache(cacheClient)
chanCache = tracing.ChannelCacheMiddleware(cacheTracer, chanCache)
thingCache := rediscache.NewThingCache(cacheClient)
thingCache = tracing.ThingCacheMiddleware(cacheTracer, thingCache)
idProvider := uuid.New()
svc := things.New(auth, thingsRepo, channelsRepo, chanCache, thingCache, idProvider)
svc = rediscache.NewEventStoreMiddleware(svc, esClient)
svc = api.LoggingMiddleware(svc, logger)
svc = api.MetricsMiddleware(
svc,
kitprometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: "things",
Subsystem: "api",
Name: "request_count",
Help: "Number of requests received.",
}, []string{"method"}),
kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{
Namespace: "things",
Subsystem: "api",
Name: "request_latency_microseconds",
Help: "Total duration of requests in microseconds.",
}, []string{"method"}),
)
return svc
}
func startHTTPServer(handler http.Handler, port string, cfg config, logger logger.Logger, errs chan error) {
p := fmt.Sprintf(":%s", port)
if cfg.serverCert != "" || cfg.serverKey != "" {
logger.Info(fmt.Sprintf("Things service started using https on port %s with cert %s key %s",
port, cfg.serverCert, cfg.serverKey))
errs <- http.ListenAndServeTLS(p, cfg.serverCert, cfg.serverKey, handler)
return
}
logger.Info(fmt.Sprintf("Things service started using http on port %s", cfg.httpPort))
errs <- http.ListenAndServe(p, handler)
}
func startGRPCServer(svc things.Service, tracer opentracing.Tracer, cfg config, logger logger.Logger, errs chan error) {
p := fmt.Sprintf(":%s", cfg.authGRPCPort)
listener, err := net.Listen("tcp", p)
if err != nil {
logger.Error(fmt.Sprintf("Failed to listen on port %s: %s", cfg.authGRPCPort, err))
os.Exit(1)
}
var server *grpc.Server
if cfg.serverCert != "" || cfg.serverKey != "" {
creds, err := credentials.NewServerTLSFromFile(cfg.serverCert, cfg.serverKey)
if err != nil {
logger.Error(fmt.Sprintf("Failed to load things certificates: %s", err))
os.Exit(1)
}
logger.Info(fmt.Sprintf("Things gRPC service started using https on port %s with cert %s key %s",
cfg.authGRPCPort, cfg.serverCert, cfg.serverKey))
server = grpc.NewServer(grpc.Creds(creds))
} else {
logger.Info(fmt.Sprintf("Things gRPC service started using http on port %s", cfg.authGRPCPort))
server = grpc.NewServer()
}
mainflux.RegisterThingsServiceServer(server, authgrpcapi.NewServer(tracer, svc))
errs <- server.Serve(listener)
}