From 8dbe2f40665997a27486ab1baf7fe5105fd9b78b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleksandar=20Novakovi=C4=87?= Date: Tue, 8 Jan 2019 22:08:24 +0100 Subject: [PATCH] Add dedicated env vars for event sourcing (#536) Signed-off-by: Aleksandar Novakovic --- cmd/things/main.go | 36 +++++++++++++++++++++++++----------- docker/docker-compose.yml | 1 + things/README.md | 8 +++++++- 3 files changed, 33 insertions(+), 12 deletions(-) diff --git a/cmd/things/main.go b/cmd/things/main.go index d9e68edd..5aa0de08 100644 --- a/cmd/things/main.go +++ b/cmd/things/main.go @@ -52,6 +52,9 @@ const ( defCacheURL = "localhost:6379" defCachePass = "" defCacheDB = "0" + defESURL = "localhost:6379" + defESPass = "" + defESDB = "0" defHTTPPort = "8180" defGRPCPort = "8181" defServerCert = "" @@ -72,6 +75,9 @@ const ( envCacheURL = "MF_THINGS_CACHE_URL" envCachePass = "MF_THINGS_CACHE_PASS" envCacheDB = "MF_THINGS_CACHE_DB" + envESURL = "MF_THINGS_ES_URL" + envESPass = "MF_THINGS_ES_PASS" + envESDB = "MF_THINGS_ES_DB" envHTTPPort = "MF_THINGS_HTTP_PORT" envGRPCPort = "MF_THINGS_GRPC_PORT" envUsersURL = "MF_USERS_URL" @@ -87,6 +93,9 @@ type config struct { cacheURL string cachePass string cacheDB string + esURL string + esPass string + esDB string httpPort string grpcPort string usersURL string @@ -101,7 +110,9 @@ func main() { if err != nil { log.Fatalf(err.Error()) } - cache := connectToCache(cfg.cacheURL, cfg.cachePass, cfg.cacheDB, logger) + cacheClient := connectToRedis(cfg.cacheURL, cfg.cachePass, cfg.cacheDB, logger) + + esClient := connectToRedis(cfg.esURL, cfg.esPass, cfg.esDB, logger) db := connectToDB(cfg.dbConfig, logger) defer db.Close() @@ -109,7 +120,7 @@ func main() { conn := connectToUsers(cfg, logger) defer conn.Close() - svc := newService(conn, db, cache, logger) + svc := newService(conn, db, cacheClient, esClient, logger) errs := make(chan error, 2) go startHTTPServer(svc, cfg, logger, errs) @@ -151,6 +162,9 @@ func loadConfig() config { cacheURL: mainflux.Env(envCacheURL, defCacheURL), cachePass: mainflux.Env(envCachePass, defCachePass), cacheDB: mainflux.Env(envCacheDB, defCacheDB), + esURL: mainflux.Env(envESURL, defESURL), + esPass: mainflux.Env(envESPass, defESPass), + esDB: mainflux.Env(envESDB, defESDB), httpPort: mainflux.Env(envHTTPPort, defHTTPPort), grpcPort: mainflux.Env(envGRPCPort, defGRPCPort), usersURL: mainflux.Env(envUsersURL, defUsersURL), @@ -159,8 +173,7 @@ func loadConfig() config { } } -func connectToCache(cacheURL, cachePass string, cacheDB string, logger logger.Logger) *redis.Client { - +func connectToRedis(cacheURL, cachePass string, cacheDB string, logger logger.Logger) *redis.Client { db, err := strconv.Atoi(cacheDB) if err != nil { logger.Error(fmt.Sprintf("Failed to connect to cache: %s", err)) @@ -208,16 +221,16 @@ func connectToUsers(cfg config, logger logger.Logger) *grpc.ClientConn { return conn } -func newService(conn *grpc.ClientConn, db *sql.DB, client *redis.Client, logger logger.Logger) things.Service { +func newService(conn *grpc.ClientConn, db *sql.DB, cacheClient *redis.Client, esClient *redis.Client, logger logger.Logger) things.Service { users := usersapi.NewClient(conn) thingsRepo := postgres.NewThingRepository(db, logger) channelsRepo := postgres.NewChannelRepository(db, logger) - chanCache := rediscache.NewChannelCache(client) - thingCache := rediscache.NewThingCache(client) + chanCache := rediscache.NewChannelCache(cacheClient) + thingCache := rediscache.NewThingCache(cacheClient) idp := uuid.New() svc := things.New(users, thingsRepo, channelsRepo, chanCache, thingCache, idp) - svc = rediscache.NewEventStoreMiddleware(svc, client) + svc = rediscache.NewEventStoreMiddleware(svc, esClient) svc = api.LoggingMiddleware(svc, logger) svc = api.MetricsMiddleware( svc, @@ -243,10 +256,10 @@ func startHTTPServer(svc things.Service, cfg config, logger logger.Logger, errs logger.Info(fmt.Sprintf("Things service started using https on port %s with cert %s key %s", cfg.httpPort, cfg.serverCert, cfg.serverKey)) errs <- http.ListenAndServeTLS(p, cfg.serverCert, cfg.serverKey, httpapi.MakeHandler(svc)) - } else { - logger.Info(fmt.Sprintf("Things service started using http on port %s", cfg.httpPort)) - errs <- http.ListenAndServe(p, httpapi.MakeHandler(svc)) + return } + logger.Info(fmt.Sprintf("Things service started using http on port %s", cfg.httpPort)) + errs <- http.ListenAndServe(p, httpapi.MakeHandler(svc)) } func startGRPCServer(svc things.Service, cfg config, logger logger.Logger, errs chan error) { @@ -254,6 +267,7 @@ func startGRPCServer(svc things.Service, cfg config, logger logger.Logger, errs listener, err := net.Listen("tcp", p) if err != nil { logger.Error(fmt.Sprintf("Failed to listen on port %s: %s", cfg.grpcPort, err)) + os.Exit(1) } var server *grpc.Server diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index c5ca811d..78d24be3 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -103,6 +103,7 @@ services: MF_THINGS_DB_PASS: mainflux MF_THINGS_DB: things MF_THINGS_CACHE_URL: things-redis:6379 + MF_THINGS_ES_URL: things-redis:6379 MF_THINGS_HTTP_PORT: 8182 MF_THINGS_GRPC_PORT: 8183 MF_USERS_URL: users:8181 diff --git a/things/README.md b/things/README.md index d583b65b..7ce477ea 100644 --- a/things/README.md +++ b/things/README.md @@ -34,6 +34,9 @@ default values. | MF_THINGS_CACHE_URL | Cache database URL | localhost:6379 | | MF_THINGS_CACHE_PASS | Cache database password | | | MF_THINGS_CACHE_DB | Cache instance that should be used | 0 | +| MF_THINGS_ES_URL | Event store URL | localhost:6379 | +| MF_THINGS_ES_PASS | Event store password | | +| MF_THINGS_ES_DB | Event store instance that should be used | 0 | | MF_THINGS_HTTP_PORT | Things service HTTP port | 8180 | | MF_THINGS_GRPC_PORT | Things service gRPC port | 8181 | | MF_THINGS_SERVER_CERT | Path to server certificate in pem format | 8181 | @@ -69,6 +72,9 @@ services: MF_THINGS_CACHE_URL: [Cache database URL] MF_THINGS_CACHE_PASS: [Cache database password] MF_THINGS_CACHE_DB: [Cache instance that should be used] + MF_THINGS_ES_URL: [Event store URL] + MF_THINGS_ES_PASS: [Event store password] + MF_THINGS_ES_DB: [Event store instance that should be used] MF_THINGS_HTTP_PORT: [Service HTTP port] MF_THINGS_GRPC_PORT: [Service gRPC port] MF_THINGS_SERVER_CERT: [String path to server cert in pem format] @@ -92,7 +98,7 @@ make things make install # set the environment variables and run the service -MF_THINGS_LOG_LEVEL=[Things log level] MF_THINGS_DB_HOST=[Database host address] MF_THINGS_DB_PORT=[Database host port] MF_THINGS_DB_USER=[Database user] MF_THINGS_DB_PASS=[Database password] MF_THINGS_DB=[Name of the database used by the service] MF_THINGS_DB_SSL_MODE=[SSL mode to connect to the database with] MF_THINGS_DB_SSL_CERT=[Path to the PEM encoded certificate file] MF_THINGS_DB_SSL_KEY=[Path to the PEM encoded key file] MF_THINGS_DB_SSL_ROOT_CERT=[Path to the PEM encoded root certificate file] MF_HTTP_ADAPTER_CA_CERTS=[Path to trusted CAs in PEM format] MF_THINGS_CACHE_URL=[Cache database URL] MF_THINGS_CACHE_PASS=[Cache database password] MF_THINGS_CACHE_DB=[Cache instance that should be used] MF_THINGS_HTTP_PORT=[Service HTTP port] MF_THINGS_GRPC_PORT=[Service gRPC port] MF_USERS_URL=[Users service URL] MF_THINGS_SERVER_CERT=[Path to server certificate] MF_THINGS_SERVER_KEY=[Path to server key] $GOBIN/mainflux-things +MF_THINGS_LOG_LEVEL=[Things log level] MF_THINGS_DB_HOST=[Database host address] MF_THINGS_DB_PORT=[Database host port] MF_THINGS_DB_USER=[Database user] MF_THINGS_DB_PASS=[Database password] MF_THINGS_DB=[Name of the database used by the service] MF_THINGS_DB_SSL_MODE=[SSL mode to connect to the database with] MF_THINGS_DB_SSL_CERT=[Path to the PEM encoded certificate file] MF_THINGS_DB_SSL_KEY=[Path to the PEM encoded key file] MF_THINGS_DB_SSL_ROOT_CERT=[Path to the PEM encoded root certificate file] MF_HTTP_ADAPTER_CA_CERTS=[Path to trusted CAs in PEM format] MF_THINGS_CACHE_URL=[Cache database URL] MF_THINGS_CACHE_PASS=[Cache database password] MF_THINGS_CACHE_DB=[Cache instance that should be used] MF_THINGS_ES_URL=[Event store URL] MF_THINGS_ES_PASS=[Event store password] MF_THINGS_ES_DB=[Event store instance that should be used] MF_THINGS_HTTP_PORT=[Service HTTP port] MF_THINGS_GRPC_PORT=[Service gRPC port] MF_USERS_URL=[Users service URL] MF_THINGS_SERVER_CERT=[Path to server certificate] MF_THINGS_SERVER_KEY=[Path to server key] $GOBIN/mainflux-things ``` Setting `MF_THINGS_CA_CERTS` expects a file in PEM format of trusted CAs. This will enable TLS against the Users gRPC endpoint trusting only those CAs that are provided.