1
0
mirror of https://github.com/mainflux/mainflux.git synced 2025-04-27 13:48:49 +08:00

Add dedicated env vars for event sourcing (#536)

Signed-off-by: Aleksandar Novakovic <aleksandar.novakovic@mainflux.com>
This commit is contained in:
Aleksandar Novaković 2019-01-08 22:08:24 +01:00 committed by Drasko DRASKOVIC
parent 72fa9db939
commit 8dbe2f4066
3 changed files with 33 additions and 12 deletions

View File

@ -52,6 +52,9 @@ const (
defCacheURL = "localhost:6379"
defCachePass = ""
defCacheDB = "0"
defESURL = "localhost:6379"
defESPass = ""
defESDB = "0"
defHTTPPort = "8180"
defGRPCPort = "8181"
defServerCert = ""
@ -72,6 +75,9 @@ const (
envCacheURL = "MF_THINGS_CACHE_URL"
envCachePass = "MF_THINGS_CACHE_PASS"
envCacheDB = "MF_THINGS_CACHE_DB"
envESURL = "MF_THINGS_ES_URL"
envESPass = "MF_THINGS_ES_PASS"
envESDB = "MF_THINGS_ES_DB"
envHTTPPort = "MF_THINGS_HTTP_PORT"
envGRPCPort = "MF_THINGS_GRPC_PORT"
envUsersURL = "MF_USERS_URL"
@ -87,6 +93,9 @@ type config struct {
cacheURL string
cachePass string
cacheDB string
esURL string
esPass string
esDB string
httpPort string
grpcPort string
usersURL string
@ -101,7 +110,9 @@ func main() {
if err != nil {
log.Fatalf(err.Error())
}
cache := connectToCache(cfg.cacheURL, cfg.cachePass, cfg.cacheDB, logger)
cacheClient := connectToRedis(cfg.cacheURL, cfg.cachePass, cfg.cacheDB, logger)
esClient := connectToRedis(cfg.esURL, cfg.esPass, cfg.esDB, logger)
db := connectToDB(cfg.dbConfig, logger)
defer db.Close()
@ -109,7 +120,7 @@ func main() {
conn := connectToUsers(cfg, logger)
defer conn.Close()
svc := newService(conn, db, cache, logger)
svc := newService(conn, db, cacheClient, esClient, logger)
errs := make(chan error, 2)
go startHTTPServer(svc, cfg, logger, errs)
@ -151,6 +162,9 @@ func loadConfig() config {
cacheURL: mainflux.Env(envCacheURL, defCacheURL),
cachePass: mainflux.Env(envCachePass, defCachePass),
cacheDB: mainflux.Env(envCacheDB, defCacheDB),
esURL: mainflux.Env(envESURL, defESURL),
esPass: mainflux.Env(envESPass, defESPass),
esDB: mainflux.Env(envESDB, defESDB),
httpPort: mainflux.Env(envHTTPPort, defHTTPPort),
grpcPort: mainflux.Env(envGRPCPort, defGRPCPort),
usersURL: mainflux.Env(envUsersURL, defUsersURL),
@ -159,8 +173,7 @@ func loadConfig() config {
}
}
func connectToCache(cacheURL, cachePass string, cacheDB string, logger logger.Logger) *redis.Client {
func connectToRedis(cacheURL, cachePass string, cacheDB string, logger logger.Logger) *redis.Client {
db, err := strconv.Atoi(cacheDB)
if err != nil {
logger.Error(fmt.Sprintf("Failed to connect to cache: %s", err))
@ -208,16 +221,16 @@ func connectToUsers(cfg config, logger logger.Logger) *grpc.ClientConn {
return conn
}
func newService(conn *grpc.ClientConn, db *sql.DB, client *redis.Client, logger logger.Logger) things.Service {
func newService(conn *grpc.ClientConn, db *sql.DB, cacheClient *redis.Client, esClient *redis.Client, logger logger.Logger) things.Service {
users := usersapi.NewClient(conn)
thingsRepo := postgres.NewThingRepository(db, logger)
channelsRepo := postgres.NewChannelRepository(db, logger)
chanCache := rediscache.NewChannelCache(client)
thingCache := rediscache.NewThingCache(client)
chanCache := rediscache.NewChannelCache(cacheClient)
thingCache := rediscache.NewThingCache(cacheClient)
idp := uuid.New()
svc := things.New(users, thingsRepo, channelsRepo, chanCache, thingCache, idp)
svc = rediscache.NewEventStoreMiddleware(svc, client)
svc = rediscache.NewEventStoreMiddleware(svc, esClient)
svc = api.LoggingMiddleware(svc, logger)
svc = api.MetricsMiddleware(
svc,
@ -243,10 +256,10 @@ func startHTTPServer(svc things.Service, cfg config, logger logger.Logger, errs
logger.Info(fmt.Sprintf("Things service started using https on port %s with cert %s key %s",
cfg.httpPort, cfg.serverCert, cfg.serverKey))
errs <- http.ListenAndServeTLS(p, cfg.serverCert, cfg.serverKey, httpapi.MakeHandler(svc))
} else {
logger.Info(fmt.Sprintf("Things service started using http on port %s", cfg.httpPort))
errs <- http.ListenAndServe(p, httpapi.MakeHandler(svc))
return
}
logger.Info(fmt.Sprintf("Things service started using http on port %s", cfg.httpPort))
errs <- http.ListenAndServe(p, httpapi.MakeHandler(svc))
}
func startGRPCServer(svc things.Service, cfg config, logger logger.Logger, errs chan error) {
@ -254,6 +267,7 @@ func startGRPCServer(svc things.Service, cfg config, logger logger.Logger, errs
listener, err := net.Listen("tcp", p)
if err != nil {
logger.Error(fmt.Sprintf("Failed to listen on port %s: %s", cfg.grpcPort, err))
os.Exit(1)
}
var server *grpc.Server

View File

@ -103,6 +103,7 @@ services:
MF_THINGS_DB_PASS: mainflux
MF_THINGS_DB: things
MF_THINGS_CACHE_URL: things-redis:6379
MF_THINGS_ES_URL: things-redis:6379
MF_THINGS_HTTP_PORT: 8182
MF_THINGS_GRPC_PORT: 8183
MF_USERS_URL: users:8181

View File

@ -34,6 +34,9 @@ default values.
| MF_THINGS_CACHE_URL | Cache database URL | localhost:6379 |
| MF_THINGS_CACHE_PASS | Cache database password | |
| MF_THINGS_CACHE_DB | Cache instance that should be used | 0 |
| MF_THINGS_ES_URL | Event store URL | localhost:6379 |
| MF_THINGS_ES_PASS | Event store password | |
| MF_THINGS_ES_DB | Event store instance that should be used | 0 |
| MF_THINGS_HTTP_PORT | Things service HTTP port | 8180 |
| MF_THINGS_GRPC_PORT | Things service gRPC port | 8181 |
| MF_THINGS_SERVER_CERT | Path to server certificate in pem format | 8181 |
@ -69,6 +72,9 @@ services:
MF_THINGS_CACHE_URL: [Cache database URL]
MF_THINGS_CACHE_PASS: [Cache database password]
MF_THINGS_CACHE_DB: [Cache instance that should be used]
MF_THINGS_ES_URL: [Event store URL]
MF_THINGS_ES_PASS: [Event store password]
MF_THINGS_ES_DB: [Event store instance that should be used]
MF_THINGS_HTTP_PORT: [Service HTTP port]
MF_THINGS_GRPC_PORT: [Service gRPC port]
MF_THINGS_SERVER_CERT: [String path to server cert in pem format]
@ -92,7 +98,7 @@ make things
make install
# set the environment variables and run the service
MF_THINGS_LOG_LEVEL=[Things log level] MF_THINGS_DB_HOST=[Database host address] MF_THINGS_DB_PORT=[Database host port] MF_THINGS_DB_USER=[Database user] MF_THINGS_DB_PASS=[Database password] MF_THINGS_DB=[Name of the database used by the service] MF_THINGS_DB_SSL_MODE=[SSL mode to connect to the database with] MF_THINGS_DB_SSL_CERT=[Path to the PEM encoded certificate file] MF_THINGS_DB_SSL_KEY=[Path to the PEM encoded key file] MF_THINGS_DB_SSL_ROOT_CERT=[Path to the PEM encoded root certificate file] MF_HTTP_ADAPTER_CA_CERTS=[Path to trusted CAs in PEM format] MF_THINGS_CACHE_URL=[Cache database URL] MF_THINGS_CACHE_PASS=[Cache database password] MF_THINGS_CACHE_DB=[Cache instance that should be used] MF_THINGS_HTTP_PORT=[Service HTTP port] MF_THINGS_GRPC_PORT=[Service gRPC port] MF_USERS_URL=[Users service URL] MF_THINGS_SERVER_CERT=[Path to server certificate] MF_THINGS_SERVER_KEY=[Path to server key] $GOBIN/mainflux-things
MF_THINGS_LOG_LEVEL=[Things log level] MF_THINGS_DB_HOST=[Database host address] MF_THINGS_DB_PORT=[Database host port] MF_THINGS_DB_USER=[Database user] MF_THINGS_DB_PASS=[Database password] MF_THINGS_DB=[Name of the database used by the service] MF_THINGS_DB_SSL_MODE=[SSL mode to connect to the database with] MF_THINGS_DB_SSL_CERT=[Path to the PEM encoded certificate file] MF_THINGS_DB_SSL_KEY=[Path to the PEM encoded key file] MF_THINGS_DB_SSL_ROOT_CERT=[Path to the PEM encoded root certificate file] MF_HTTP_ADAPTER_CA_CERTS=[Path to trusted CAs in PEM format] MF_THINGS_CACHE_URL=[Cache database URL] MF_THINGS_CACHE_PASS=[Cache database password] MF_THINGS_CACHE_DB=[Cache instance that should be used] MF_THINGS_ES_URL=[Event store URL] MF_THINGS_ES_PASS=[Event store password] MF_THINGS_ES_DB=[Event store instance that should be used] MF_THINGS_HTTP_PORT=[Service HTTP port] MF_THINGS_GRPC_PORT=[Service gRPC port] MF_USERS_URL=[Users service URL] MF_THINGS_SERVER_CERT=[Path to server certificate] MF_THINGS_SERVER_KEY=[Path to server key] $GOBIN/mainflux-things
```
Setting `MF_THINGS_CA_CERTS` expects a file in PEM format of trusted CAs. This will enable TLS against the Users gRPC endpoint trusting only those CAs that are provided.