1
0
mirror of https://github.com/mainflux/mainflux.git synced 2025-04-24 13:48:49 +08:00
b1ackd0t 38992085bd
NOISSUE - Enrich Existing OpenTelemetry Tags (#1840)
* Initial Commit: Sync Env Veriables With Docker Deployment

Signed-off-by: rodneyosodo <blackd0t@protonmail.com>

* Sync Env Vars With Master

Signed-off-by: rodneyosodo <blackd0t@protonmail.com>

* Initial Commit: Add Tags to Database and Message Bus

Signed-off-by: rodneyosodo <blackd0t@protonmail.com>

* Format Address Well

Signed-off-by: rodneyosodo <blackd0t@protonmail.com>

* Propagate Context

Signed-off-by: rodneyosodo <blackd0t@protonmail.com>

* Update PostgresSQL spans

Signed-off-by: rodneyosodo <blackd0t@protonmail.com>

* Update Message Bus Spans

Signed-off-by: rodneyosodo <blackd0t@protonmail.com>

* Add Tracing To MQTT Adapter

Signed-off-by: rodneyosodo <blackd0t@protonmail.com>

* Add Span Tags to HTTP

Signed-off-by: rodneyosodo <blackd0t@protonmail.com>

* Combine Tracing and PubSub

Signed-off-by: rodneyosodo <blackd0t@protonmail.com>

* Fix Error After Rebase

Signed-off-by: rodneyosodo <blackd0t@protonmail.com>

* Reorder Server Config

Signed-off-by: rodneyosodo <blackd0t@protonmail.com>

* Seperate Tracing

Signed-off-by: rodneyosodo <blackd0t@protonmail.com>

* shorten span names

Signed-off-by: rodneyosodo <blackd0t@protonmail.com>

* Fix Issue After Rebase

Signed-off-by: rodneyosodo <blackd0t@protonmail.com>

---------

Signed-off-by: rodneyosodo <blackd0t@protonmail.com>
2023-07-31 19:20:04 +02:00

132 lines
4.0 KiB
Go

// Copyright (c) Mainflux
// SPDX-License-Identifier: Apache-2.0
package postgres
import (
"context"
"database/sql"
"fmt"
"strings"
"github.com/jmoiron/sqlx"
"github.com/mainflux/mainflux/internal/clients/postgres"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
var _ Database = (*database)(nil)
type database struct {
postgres.Config
db *sqlx.DB
tracer trace.Tracer
}
// Database provides a database interface.
type Database interface {
// NamedQueryContext executes a named query against the database and returns
NamedQueryContext(context.Context, string, interface{}) (*sqlx.Rows, error)
// NamedExecContext executes a named query against the database and returns
NamedExecContext(context.Context, string, interface{}) (sql.Result, error)
// QueryRowxContext queries the database and returns an *sqlx.Row.
QueryRowxContext(context.Context, string, ...interface{}) *sqlx.Row
// QueryxContext queries the database and returns an *sqlx.Rows and an error.
QueryxContext(context.Context, string, ...interface{}) (*sqlx.Rows, error)
// QueryContext queries the database and returns an *sql.Rows and an error.
QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
// ExecContext executes a query without returning any rows.
ExecContext(context.Context, string, ...interface{}) (sql.Result, error)
// BeginTxx begins a transaction and returns an *sqlx.Tx.
BeginTxx(ctx context.Context, opts *sql.TxOptions) (*sqlx.Tx, error)
}
// NewDatabase creates a Clients'Database instance.
func NewDatabase(db *sqlx.DB, config postgres.Config, tracer trace.Tracer) Database {
database := &database{
Config: config,
db: db,
tracer: tracer,
}
return database
}
func (d *database) NamedQueryContext(ctx context.Context, query string, args interface{}) (*sqlx.Rows, error) {
ctx, span := d.addSpanTags(ctx, query)
defer span.End()
return d.db.NamedQueryContext(ctx, query, args)
}
func (d *database) NamedExecContext(ctx context.Context, query string, args interface{}) (sql.Result, error) {
ctx, span := d.addSpanTags(ctx, query)
defer span.End()
return d.db.NamedExecContext(ctx, query, args)
}
func (d *database) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) {
ctx, span := d.addSpanTags(ctx, query)
defer span.End()
return d.db.ExecContext(ctx, query, args...)
}
func (d *database) QueryRowxContext(ctx context.Context, query string, args ...interface{}) *sqlx.Row {
ctx, span := d.addSpanTags(ctx, query)
defer span.End()
return d.db.QueryRowxContext(ctx, query, args...)
}
func (d *database) QueryxContext(ctx context.Context, query string, args ...interface{}) (*sqlx.Rows, error) {
ctx, span := d.addSpanTags(ctx, query)
defer span.End()
return d.db.QueryxContext(ctx, query, args...)
}
func (d database) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) {
ctx, span := d.addSpanTags(ctx, query)
defer span.End()
return d.db.QueryContext(ctx, query, args...)
}
func (d database) BeginTxx(ctx context.Context, opts *sql.TxOptions) (*sqlx.Tx, error) {
ctx, span := d.addSpanTags(ctx, "BeginTxx")
defer span.End()
return d.db.BeginTxx(ctx, opts)
}
func (d *database) addSpanTags(ctx context.Context, query string) (context.Context, trace.Span) {
operation := strings.Replace(strings.Split(query, " ")[0], "(", "", 1)
ctx, span := d.tracer.Start(ctx,
fmt.Sprintf("%s %s", operation, d.Name),
trace.WithAttributes(
// Related to the database instance (informational)
attribute.String("db.system", "postgresql"),
attribute.String("db.user", d.User),
attribute.String("network.transport", "tcp"),
attribute.String("network.type", "ipv4"),
attribute.String("server.address", d.Host),
attribute.String("server.port", d.Port),
attribute.String("db.name", d.Name),
attribute.String("db.statement", query),
// General Span tags
attribute.String("span.kind", "client"),
),
)
return ctx, span
}