1
0
mirror of https://github.com/mainflux/mainflux.git synced 2025-04-28 13:48:49 +08:00
Manuel Imperiale e5278c463f
MF-1348 - Add transport errors logging (#1544)
* MF-1348 - Add go-kit transport level logging

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Fix reviews

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Fix reviews

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Fix merge

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Fix remark

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Fix go test flags

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Use httputil errors in things and http service

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Fix SDK tests

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Use httputil errors in certs and provision service

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Use httputil errors in consumers service

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* General renaming and add ErrMissingToken

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Rename httputil -> apiutil and use errors in users servive

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Use apiutil errors in auth, bootstrap, readers, things and twins

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Replace errors.Contain by comparison

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Fix remarks

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Simplify validateID

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Simplify validateID

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Simplify and rename ExtractAuthToken -> ExtractBearerToken

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Fix readers

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Fix auth key test and remarks

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Improve comment

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Simplify validateUUID check

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

* Fix typo

Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com>

Co-authored-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
2022-03-03 17:13:46 +01:00

284 lines
8.7 KiB
Go

package main
import (
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"os"
"os/signal"
"strconv"
"syscall"
"time"
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
influxdata "github.com/influxdata/influxdb/client/v2"
"github.com/mainflux/mainflux"
authapi "github.com/mainflux/mainflux/auth/api/grpc"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/readers"
"github.com/mainflux/mainflux/readers/api"
"github.com/mainflux/mainflux/readers/influxdb"
thingsapi "github.com/mainflux/mainflux/things/api/auth/grpc"
opentracing "github.com/opentracing/opentracing-go"
stdprometheus "github.com/prometheus/client_golang/prometheus"
jconfig "github.com/uber/jaeger-client-go/config"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
const (
svcName = "influxdb-reader"
defLogLevel = "error"
defPort = "8180"
defDB = "mainflux"
defDBHost = "localhost"
defDBPort = "8086"
defDBUser = "mainflux"
defDBPass = "mainflux"
defClientTLS = "false"
defCACerts = ""
defServerCert = ""
defServerKey = ""
defJaegerURL = ""
defThingsAuthURL = "localhost:8183"
defThingsAuthTimeout = "1s"
defUsersAuthURL = "localhost:8181"
defUsersAuthTimeout = "1s"
envLogLevel = "MF_INFLUX_READER_LOG_LEVEL"
envPort = "MF_INFLUX_READER_PORT"
envDB = "MF_INFLUXDB_DB"
envDBHost = "MF_INFLUX_READER_DB_HOST"
envDBPort = "MF_INFLUXDB_PORT"
envDBUser = "MF_INFLUXDB_ADMIN_USER"
envDBPass = "MF_INFLUXDB_ADMIN_PASSWORD"
envClientTLS = "MF_INFLUX_READER_CLIENT_TLS"
envCACerts = "MF_INFLUX_READER_CA_CERTS"
envServerCert = "MF_INFLUX_READER_SERVER_CERT"
envServerKey = "MF_INFLUX_READER_SERVER_KEY"
envJaegerURL = "MF_JAEGER_URL"
envThingsAuthURL = "MF_THINGS_AUTH_GRPC_URL"
envThingsAuthTimeout = "MF_THINGS_AUTH_GRPC_TIMEOUT"
envAuthURL = "MF_AUTH_GRPC_URL"
envUsersAuthTimeout = "MF_AUTH_GRPC_TIMEOUT"
)
type config struct {
logLevel string
port string
dbName string
dbHost string
dbPort string
dbUser string
dbPass string
clientTLS bool
caCerts string
serverCert string
serverKey string
jaegerURL string
thingsAuthURL string
usersAuthURL string
thingsAuthTimeout time.Duration
usersAuthTimeout time.Duration
}
func main() {
cfg, clientCfg := loadConfigs()
logger, err := logger.New(os.Stdout, cfg.logLevel)
if err != nil {
log.Fatalf(err.Error())
}
conn := connectToThings(cfg, logger)
defer conn.Close()
thingsTracer, thingsCloser := initJaeger("things", cfg.jaegerURL, logger)
defer thingsCloser.Close()
tc := thingsapi.NewClient(conn, thingsTracer, cfg.thingsAuthTimeout)
authTracer, authCloser := initJaeger("auth", cfg.jaegerURL, logger)
defer authCloser.Close()
authConn := connectToAuth(cfg, logger)
defer authConn.Close()
auth := authapi.NewClient(authTracer, authConn, cfg.usersAuthTimeout)
client, err := influxdata.NewHTTPClient(clientCfg)
if err != nil {
logger.Error(fmt.Sprintf("Failed to create InfluxDB client: %s", err))
os.Exit(1)
}
defer client.Close()
repo := newService(client, cfg.dbName, logger)
errs := make(chan error, 2)
go func() {
c := make(chan os.Signal)
signal.Notify(c, syscall.SIGINT)
errs <- fmt.Errorf("%s", <-c)
}()
go startHTTPServer(repo, tc, auth, cfg, logger, errs)
err = <-errs
logger.Error(fmt.Sprintf("InfluxDB writer service terminated: %s", err))
}
func connectToAuth(cfg config, logger logger.Logger) *grpc.ClientConn {
var opts []grpc.DialOption
logger.Info("Connecting to auth via gRPC")
if cfg.clientTLS {
if cfg.caCerts != "" {
tpc, err := credentials.NewClientTLSFromFile(cfg.caCerts, "")
if err != nil {
logger.Error(fmt.Sprintf("Failed to create tls credentials: %s", err))
os.Exit(1)
}
opts = append(opts, grpc.WithTransportCredentials(tpc))
}
} else {
opts = append(opts, grpc.WithInsecure())
logger.Info("gRPC communication is not encrypted")
}
conn, err := grpc.Dial(cfg.usersAuthURL, opts...)
if err != nil {
logger.Error(fmt.Sprintf("Failed to connect to auth service: %s", err))
os.Exit(1)
}
logger.Info("Established gRPC connection to auth via gRPC")
return conn
}
func loadConfigs() (config, influxdata.HTTPConfig) {
tls, err := strconv.ParseBool(mainflux.Env(envClientTLS, defClientTLS))
if err != nil {
log.Fatalf("Invalid value passed for %s\n", envClientTLS)
}
authTimeout, err := time.ParseDuration(mainflux.Env(envThingsAuthTimeout, defThingsAuthTimeout))
if err != nil {
log.Fatalf("Invalid %s value: %s", envThingsAuthTimeout, err.Error())
}
userAuthTimeout, err := time.ParseDuration(mainflux.Env(envUsersAuthTimeout, defUsersAuthTimeout))
if err != nil {
log.Fatalf("Invalid %s value: %s", envThingsAuthTimeout, err.Error())
}
cfg := config{
logLevel: mainflux.Env(envLogLevel, defLogLevel),
port: mainflux.Env(envPort, defPort),
dbName: mainflux.Env(envDB, defDB),
dbHost: mainflux.Env(envDBHost, defDBHost),
dbPort: mainflux.Env(envDBPort, defDBPort),
dbUser: mainflux.Env(envDBUser, defDBUser),
dbPass: mainflux.Env(envDBPass, defDBPass),
clientTLS: tls,
caCerts: mainflux.Env(envCACerts, defCACerts),
serverCert: mainflux.Env(envServerCert, defServerCert),
serverKey: mainflux.Env(envServerKey, defServerKey),
jaegerURL: mainflux.Env(envJaegerURL, defJaegerURL),
thingsAuthURL: mainflux.Env(envThingsAuthURL, defThingsAuthURL),
thingsAuthTimeout: authTimeout,
usersAuthURL: mainflux.Env(envAuthURL, defUsersAuthURL),
usersAuthTimeout: userAuthTimeout,
}
clientCfg := influxdata.HTTPConfig{
Addr: fmt.Sprintf("http://%s:%s", cfg.dbHost, cfg.dbPort),
Username: cfg.dbUser,
Password: cfg.dbPass,
}
return cfg, clientCfg
}
func connectToThings(cfg config, logger logger.Logger) *grpc.ClientConn {
var opts []grpc.DialOption
logger.Info("connecting to things via gRPC")
if cfg.clientTLS {
if cfg.caCerts != "" {
tpc, err := credentials.NewClientTLSFromFile(cfg.caCerts, "")
if err != nil {
logger.Error(fmt.Sprintf("Failed to load certs: %s", err))
os.Exit(1)
}
opts = append(opts, grpc.WithTransportCredentials(tpc))
}
} else {
logger.Info("gRPC communication is not encrypted")
opts = append(opts, grpc.WithInsecure())
}
conn, err := grpc.Dial(cfg.thingsAuthURL, opts...)
if err != nil {
logger.Error(fmt.Sprintf("Failed to connect to things service: %s", err))
os.Exit(1)
}
logger.Info(fmt.Sprintf("Established gRPC connection to things via gRPC: %s", cfg.thingsAuthURL))
return conn
}
func initJaeger(svcName, url string, logger logger.Logger) (opentracing.Tracer, io.Closer) {
if url == "" {
return opentracing.NoopTracer{}, ioutil.NopCloser(nil)
}
tracer, closer, err := jconfig.Configuration{
ServiceName: svcName,
Sampler: &jconfig.SamplerConfig{
Type: "const",
Param: 1,
},
Reporter: &jconfig.ReporterConfig{
LocalAgentHostPort: url,
LogSpans: true,
},
}.NewTracer()
if err != nil {
logger.Error(fmt.Sprintf("Failed to init Jaeger client: %s", err))
os.Exit(1)
}
return tracer, closer
}
func newService(client influxdata.Client, dbName string, logger logger.Logger) readers.MessageRepository {
repo := influxdb.New(client, dbName)
repo = api.LoggingMiddleware(repo, logger)
repo = api.MetricsMiddleware(
repo,
kitprometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: "influxdb",
Subsystem: "message_reader",
Name: "request_count",
Help: "Number of requests received.",
}, []string{"method"}),
kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{
Namespace: "influxdb",
Subsystem: "message_reader",
Name: "request_latency_microseconds",
Help: "Total duration of requests in microseconds.",
}, []string{"method"}),
)
return repo
}
func startHTTPServer(repo readers.MessageRepository, tc mainflux.ThingsServiceClient, ac mainflux.AuthServiceClient, cfg config, logger logger.Logger, errs chan error) {
p := fmt.Sprintf(":%s", cfg.port)
if cfg.serverCert != "" || cfg.serverKey != "" {
logger.Info(fmt.Sprintf("InfluxDB reader service started using https on port %s with cert %s key %s",
cfg.port, cfg.serverCert, cfg.serverKey))
errs <- http.ListenAndServeTLS(p, cfg.serverCert, cfg.serverKey, api.MakeHandler(repo, tc, ac, svcName, logger))
return
}
logger.Info(fmt.Sprintf("InfluxDB reader service started, exposed port %s", cfg.port))
errs <- http.ListenAndServe(p, api.MakeHandler(repo, tc, ac, svcName, logger))
}