mirror of
https://github.com/mainflux/mainflux.git
synced 2025-04-24 13:48:49 +08:00
Initial Commit: Add Tracing To Certs Service (#1850)
Signed-off-by: rodneyosodo <blackd0t@protonmail.com>
This commit is contained in:
parent
979a4b74d3
commit
1192325a2e
@ -13,6 +13,7 @@ import (
|
||||
"github.com/jackc/pgx/v5/pgconn"
|
||||
"github.com/jmoiron/sqlx"
|
||||
"github.com/mainflux/mainflux/certs"
|
||||
"github.com/mainflux/mainflux/internal/postgres"
|
||||
"github.com/mainflux/mainflux/logger"
|
||||
"github.com/mainflux/mainflux/pkg/errors"
|
||||
)
|
||||
@ -27,19 +28,19 @@ type Cert struct {
|
||||
}
|
||||
|
||||
type certsRepository struct {
|
||||
db *sqlx.DB
|
||||
db postgres.Database
|
||||
log logger.Logger
|
||||
}
|
||||
|
||||
// NewRepository instantiates a PostgreSQL implementation of certs
|
||||
// repository.
|
||||
func NewRepository(db *sqlx.DB, log logger.Logger) certs.Repository {
|
||||
func NewRepository(db postgres.Database, log logger.Logger) certs.Repository {
|
||||
return &certsRepository{db: db, log: log}
|
||||
}
|
||||
|
||||
func (cr certsRepository) RetrieveAll(ctx context.Context, ownerID string, offset, limit uint64) (certs.Page, error) {
|
||||
q := `SELECT thing_id, owner_id, serial, expire FROM certs WHERE owner_id = $1 ORDER BY expire LIMIT $2 OFFSET $3;`
|
||||
rows, err := cr.db.Query(q, ownerID, limit, offset)
|
||||
rows, err := cr.db.QueryContext(ctx, q, ownerID, limit, offset)
|
||||
if err != nil {
|
||||
cr.log.Error(fmt.Sprintf("Failed to retrieve configs due to %s", err))
|
||||
return certs.Page{}, err
|
||||
@ -59,7 +60,7 @@ func (cr certsRepository) RetrieveAll(ctx context.Context, ownerID string, offse
|
||||
|
||||
q = `SELECT COUNT(*) FROM certs WHERE owner_id = $1`
|
||||
var total uint64
|
||||
if err := cr.db.QueryRow(q, ownerID).Scan(&total); err != nil {
|
||||
if err := cr.db.QueryRowxContext(ctx, q, ownerID).Scan(&total); err != nil {
|
||||
cr.log.Error(fmt.Sprintf("Failed to count certs due to %s", err))
|
||||
return certs.Page{}, err
|
||||
}
|
||||
@ -75,7 +76,7 @@ func (cr certsRepository) RetrieveAll(ctx context.Context, ownerID string, offse
|
||||
func (cr certsRepository) Save(ctx context.Context, cert certs.Cert) (string, error) {
|
||||
q := `INSERT INTO certs (thing_id, owner_id, serial, expire) VALUES (:thing_id, :owner_id, :serial, :expire)`
|
||||
|
||||
tx, err := cr.db.Beginx()
|
||||
tx, err := cr.db.BeginTxx(ctx, nil)
|
||||
if err != nil {
|
||||
return "", errors.Wrap(errors.ErrCreateEntity, err)
|
||||
}
|
||||
@ -116,7 +117,7 @@ func (cr certsRepository) Remove(ctx context.Context, ownerID, serial string) er
|
||||
|
||||
func (cr certsRepository) RetrieveByThing(ctx context.Context, ownerID, thingID string, offset, limit uint64) (certs.Page, error) {
|
||||
q := `SELECT thing_id, owner_id, serial, expire FROM certs WHERE owner_id = $1 AND thing_id = $2 ORDER BY expire LIMIT $3 OFFSET $4;`
|
||||
rows, err := cr.db.Query(q, ownerID, thingID, limit, offset)
|
||||
rows, err := cr.db.QueryContext(ctx, q, ownerID, thingID, limit, offset)
|
||||
if err != nil {
|
||||
cr.log.Error(fmt.Sprintf("Failed to retrieve configs due to %s", err))
|
||||
return certs.Page{}, err
|
||||
@ -136,7 +137,7 @@ func (cr certsRepository) RetrieveByThing(ctx context.Context, ownerID, thingID
|
||||
|
||||
q = `SELECT COUNT(*) FROM certs WHERE owner_id = $1 AND thing_id = $2`
|
||||
var total uint64
|
||||
if err := cr.db.QueryRow(q, ownerID, thingID).Scan(&total); err != nil {
|
||||
if err := cr.db.QueryRowxContext(ctx, q, ownerID, thingID).Scan(&total); err != nil {
|
||||
cr.log.Error(fmt.Sprintf("Failed to count certs due to %s", err))
|
||||
return certs.Page{}, err
|
||||
}
|
||||
|
12
certs/tracing/doc.go
Normal file
12
certs/tracing/doc.go
Normal file
@ -0,0 +1,12 @@
|
||||
// Copyright (c) Mainflux
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
// Package tracing provides tracing instrumentation for Mainflux Users Groups service.
|
||||
//
|
||||
// This package provides tracing middleware for Mainflux Users Groups service.
|
||||
// It can be used to trace incoming requests and add tracing capabilities to
|
||||
// Mainflux Users Groups service.
|
||||
//
|
||||
// For more details about tracing instrumentation for Mainflux messaging refer
|
||||
// to the documentation at https://docs.mainflux.io/tracing/.
|
||||
package tracing
|
79
certs/tracing/tracing.go
Normal file
79
certs/tracing/tracing.go
Normal file
@ -0,0 +1,79 @@
|
||||
// Copyright (c) Mainflux
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package tracing
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/mainflux/mainflux/certs"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
var _ certs.Service = (*tracingMiddleware)(nil)
|
||||
|
||||
type tracingMiddleware struct {
|
||||
tracer trace.Tracer
|
||||
svc certs.Service
|
||||
}
|
||||
|
||||
// New returns a new certs service with tracing capabilities.
|
||||
func New(svc certs.Service, tracer trace.Tracer) certs.Service {
|
||||
return &tracingMiddleware{tracer, svc}
|
||||
}
|
||||
|
||||
// IssueCert traces the "IssueCert" operation of the wrapped certs.Service.
|
||||
func (tm *tracingMiddleware) IssueCert(ctx context.Context, token, thingID, ttl string) (certs.Cert, error) {
|
||||
ctx, span := tm.tracer.Start(ctx, "svc_create_group", trace.WithAttributes(
|
||||
attribute.String("thing_id", thingID),
|
||||
attribute.String("ttl", ttl),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
return tm.svc.IssueCert(ctx, token, thingID, ttl)
|
||||
}
|
||||
|
||||
// ListCerts traces the "ListCerts" operation of the wrapped certs.Service.
|
||||
func (tm *tracingMiddleware) ListCerts(ctx context.Context, token, thingID string, offset, limit uint64) (certs.Page, error) {
|
||||
ctx, span := tm.tracer.Start(ctx, "svc_list_certs", trace.WithAttributes(
|
||||
attribute.String("thing_id", thingID),
|
||||
attribute.Int64("offset", int64(offset)),
|
||||
attribute.Int64("limit", int64(limit)),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
return tm.svc.ListCerts(ctx, token, thingID, offset, limit)
|
||||
}
|
||||
|
||||
// ListSerials traces the "ListSerials" operation of the wrapped certs.Service.
|
||||
func (tm *tracingMiddleware) ListSerials(ctx context.Context, token, thingID string, offset, limit uint64) (certs.Page, error) {
|
||||
ctx, span := tm.tracer.Start(ctx, "svc_list_serials", trace.WithAttributes(
|
||||
attribute.String("thing_id", thingID),
|
||||
attribute.Int64("offset", int64(offset)),
|
||||
attribute.Int64("limit", int64(limit)),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
return tm.svc.ListSerials(ctx, token, thingID, offset, limit)
|
||||
}
|
||||
|
||||
// ViewCert traces the "ViewCert" operation of the wrapped certs.Service.
|
||||
func (tm *tracingMiddleware) ViewCert(ctx context.Context, token, serialID string) (certs.Cert, error) {
|
||||
ctx, span := tm.tracer.Start(ctx, "svc_view_cert", trace.WithAttributes(
|
||||
attribute.String("serial_id", serialID),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
return tm.svc.ViewCert(ctx, token, serialID)
|
||||
}
|
||||
|
||||
// RevokeCert traces the "RevokeCert" operation of the wrapped certs.Service.
|
||||
func (tm *tracingMiddleware) RevokeCert(ctx context.Context, token, serialID string) (certs.Revoke, error) {
|
||||
ctx, span := tm.tracer.Start(ctx, "svc_revoke_cert", trace.WithAttributes(
|
||||
attribute.String("serial_id", serialID),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
return tm.svc.RevokeCert(ctx, token, serialID)
|
||||
}
|
@ -10,27 +10,28 @@ import (
|
||||
"log"
|
||||
"os"
|
||||
|
||||
"github.com/jmoiron/sqlx"
|
||||
chclient "github.com/mainflux/callhome/pkg/client"
|
||||
"github.com/mainflux/mainflux"
|
||||
|
||||
"github.com/mainflux/mainflux/certs"
|
||||
"github.com/mainflux/mainflux/certs/api"
|
||||
vault "github.com/mainflux/mainflux/certs/pki"
|
||||
certsPg "github.com/mainflux/mainflux/certs/postgres"
|
||||
"github.com/mainflux/mainflux/certs/tracing"
|
||||
"github.com/mainflux/mainflux/internal"
|
||||
authClient "github.com/mainflux/mainflux/internal/clients/grpc/auth"
|
||||
jaegerClient "github.com/mainflux/mainflux/internal/clients/jaeger"
|
||||
pgClient "github.com/mainflux/mainflux/internal/clients/postgres"
|
||||
"github.com/mainflux/mainflux/internal/env"
|
||||
"github.com/mainflux/mainflux/internal/postgres"
|
||||
"github.com/mainflux/mainflux/internal/server"
|
||||
httpserver "github.com/mainflux/mainflux/internal/server/http"
|
||||
mflog "github.com/mainflux/mainflux/logger"
|
||||
"github.com/mainflux/mainflux/users/policies"
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"github.com/jmoiron/sqlx"
|
||||
authClient "github.com/mainflux/mainflux/internal/clients/grpc/auth"
|
||||
pgClient "github.com/mainflux/mainflux/internal/clients/postgres"
|
||||
mfsdk "github.com/mainflux/mainflux/pkg/sdk/go"
|
||||
"github.com/mainflux/mainflux/pkg/uuid"
|
||||
"github.com/mainflux/mainflux/users/policies"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -45,9 +46,9 @@ type config struct {
|
||||
LogLevel string `env:"MF_CERTS_LOG_LEVEL" envDefault:"info"`
|
||||
CertsURL string `env:"MF_SDK_CERTS_URL" envDefault:"http://localhost"`
|
||||
ThingsURL string `env:"MF_THINGS_URL" envDefault:"http://things:9000"`
|
||||
JaegerURL string `env:"MF_JAEGER_URL" envDefault:"http://jaeger:14268/api/traces"`
|
||||
SendTelemetry bool `env:"MF_SEND_TELEMETRY" envDefault:"true"`
|
||||
InstanceID string `env:"MF_CERTS_INSTANCE_ID" envDefault:""`
|
||||
JaegerURL string `env:"MF_JAEGER_URL" envDefault:"http://jaeger:14268/api/traces"`
|
||||
|
||||
// Sign and issue certificates without 3rd party PKI
|
||||
SignCAPath string `env:"MF_CERTS_SIGN_CA_PATH" envDefault:"ca.crt"`
|
||||
@ -108,12 +109,6 @@ func main() {
|
||||
defer authHandler.Close()
|
||||
logger.Info("Successfully connected to auth grpc server " + authHandler.Secure())
|
||||
|
||||
svc := newService(auth, db, logger, cfg, pkiClient)
|
||||
|
||||
httpServerConfig := server.Config{Port: defSvcHttpPort}
|
||||
if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHttp, AltPrefix: envPrefix}); err != nil {
|
||||
logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
|
||||
}
|
||||
tp, err := jaegerClient.NewProvider(svcName, cfg.JaegerURL, instanceID)
|
||||
if err != nil {
|
||||
logger.Fatal(fmt.Sprintf("failed to init Jaeger: %s", err))
|
||||
@ -123,6 +118,14 @@ func main() {
|
||||
logger.Error(fmt.Sprintf("error shutting down tracer provider: %v", err))
|
||||
}
|
||||
}()
|
||||
tracer := tp.Tracer(svcName)
|
||||
|
||||
svc := newService(auth, db, tracer, logger, cfg, pkiClient)
|
||||
|
||||
httpServerConfig := server.Config{Port: defSvcHttpPort}
|
||||
if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHttp, AltPrefix: envPrefix}); err != nil {
|
||||
logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err))
|
||||
}
|
||||
hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(svc, logger, instanceID), logger)
|
||||
|
||||
if cfg.SendTelemetry {
|
||||
@ -143,8 +146,9 @@ func main() {
|
||||
}
|
||||
}
|
||||
|
||||
func newService(auth policies.AuthServiceClient, db *sqlx.DB, logger mflog.Logger, cfg config, pkiAgent vault.Agent) certs.Service {
|
||||
certsRepo := certsPg.NewRepository(db, logger)
|
||||
func newService(auth policies.AuthServiceClient, db *sqlx.DB, tracer trace.Tracer, logger mflog.Logger, cfg config, pkiAgent vault.Agent) certs.Service {
|
||||
database := postgres.NewDatabase(db, tracer)
|
||||
certsRepo := certsPg.NewRepository(database, logger)
|
||||
config := mfsdk.Config{
|
||||
CertsURL: cfg.CertsURL,
|
||||
ThingsURL: cfg.ThingsURL,
|
||||
@ -154,5 +158,7 @@ func newService(auth policies.AuthServiceClient, db *sqlx.DB, logger mflog.Logge
|
||||
svc = api.LoggingMiddleware(svc, logger)
|
||||
counter, latency := internal.MakeMetrics(svcName, "api")
|
||||
svc = api.MetricsMiddleware(svc, counter, latency)
|
||||
svc = tracing.New(svc, tracer)
|
||||
|
||||
return svc
|
||||
}
|
||||
|
@ -69,6 +69,7 @@ services:
|
||||
MF_AUTH_GRPC_URL: ${MF_USERS_GRPC_URL}
|
||||
MF_AUTH_GRPC_TIMEOUT: ${MF_USERS_GRPC_TIMEOUT}
|
||||
MF_CERTS_VAULT_HOST: ${MF_CERTS_VAULT_HOST}
|
||||
MF_JAEGER_URL: ${MF_JAEGER_URL}
|
||||
MF_SEND_TELEMETRY: ${MF_SEND_TELEMETRY}
|
||||
MF_CERTS_INSTANCE_ID: ${MF_CERTS_INSTANCE_ID}
|
||||
volumes:
|
||||
|
Loading…
x
Reference in New Issue
Block a user