diff --git a/auth/tracing/keys.go b/auth/tracing/keys.go index 36755a06..d37cea66 100644 --- a/auth/tracing/keys.go +++ b/auth/tracing/keys.go @@ -29,14 +29,13 @@ type keyRepositoryMiddleware struct { // New tracks request and their latency, and adds spans // to context. -func New(repo auth.KeyRepository, tracer opentracing.Tracer) auth.KeyRepository { +func New(tracer opentracing.Tracer, repo auth.KeyRepository) auth.KeyRepository { return keyRepositoryMiddleware{ tracer: tracer, repo: repo, } } - func (krm keyRepositoryMiddleware) Save(ctx context.Context, key auth.Key) (string, error) { span := createSpan(ctx, krm.tracer, saveOp) defer span.Finish() diff --git a/cmd/auth/main.go b/cmd/auth/main.go index b4443445..0f38fde1 100644 --- a/cmd/auth/main.go +++ b/cmd/auth/main.go @@ -99,7 +99,7 @@ func main() { svc := newService(db, dbTracer, cfg.Secret, logger, readerConn, writerConn, cfg.LoginDuration) // Create new HTTP Server - tracer, closer, err := jaegerClient.NewTracer("auth", cfg.JaegerURL) + tracer, closer, err := jaegerClient.NewTracer(svcName, cfg.JaegerURL) if err != nil { logger.Fatal(fmt.Sprintf("failed to init Jaeger: %s", err)) } @@ -143,7 +143,7 @@ func main() { func newService(db *sqlx.DB, tracer opentracing.Tracer, secret string, logger mflog.Logger, readerConn, writerConn *grpc.ClientConn, duration time.Duration) auth.Service { database := authPg.NewDatabase(db) - keysRepo := tracing.New(authPg.New(database), tracer) + keysRepo := tracing.New(tracer, authPg.New(database)) groupsRepo := authPg.NewGroupRepo(database) groupsRepo = tracing.GroupRepositoryMiddleware(tracer, groupsRepo) diff --git a/cmd/cassandra-writer/main.go b/cmd/cassandra-writer/main.go index d2d4c9ef..df5a3280 100644 --- a/cmd/cassandra-writer/main.go +++ b/cmd/cassandra-writer/main.go @@ -15,11 +15,13 @@ import ( "github.com/mainflux/mainflux/consumers/writers/cassandra" "github.com/mainflux/mainflux/internal" cassandraClient "github.com/mainflux/mainflux/internal/clients/cassandra" + jaegerClient "github.com/mainflux/mainflux/internal/clients/jaeger" "github.com/mainflux/mainflux/internal/env" "github.com/mainflux/mainflux/internal/server" httpserver "github.com/mainflux/mainflux/internal/server/http" mflog "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/pkg/messaging/brokers" + "github.com/mainflux/mainflux/pkg/messaging/tracing" "golang.org/x/sync/errgroup" ) @@ -34,6 +36,7 @@ type config struct { LogLevel string `env:"MF_CASSANDRA_WRITER_LOG_LEVEL" envDefault:"info"` ConfigPath string `env:"MF_CASSANDRA_WRITER_CONFIG_PATH" envDefault:"/config.toml"` BrokerURL string `env:"MF_BROKER_URL" envDefault:"nats://localhost:4222"` + JaegerURL string `env:"MF_JAEGER_URL" envDefault:"localhost:6831"` } func main() { @@ -58,6 +61,12 @@ func main() { } defer csdSession.Close() + tracer, traceCloser, err := jaegerClient.NewTracer(svcName, cfg.JaegerURL) + if err != nil { + logger.Fatal(fmt.Sprintf("failed to init Jaeger: %s", err)) + } + defer traceCloser.Close() + // Create new cassandra-writer repo repo := newService(csdSession, logger) @@ -66,10 +75,11 @@ func main() { if err != nil { logger.Fatal(fmt.Sprintf("failed to connect to message broker: %s", err)) } + pubSub = tracing.NewPubSub(tracer, pubSub) defer pubSub.Close() // Start new consumer - if err := consumers.Start(svcName, pubSub, repo, cfg.ConfigPath, logger); err != nil { + if err := consumers.Start(ctx, svcName, pubSub, repo, cfg.ConfigPath, logger); err != nil { logger.Error(fmt.Sprintf("Failed to create Cassandra writer: %s", err)) } diff --git a/cmd/coap/main.go b/cmd/coap/main.go index 7b43df40..13c6a880 100644 --- a/cmd/coap/main.go +++ b/cmd/coap/main.go @@ -11,14 +11,17 @@ import ( "github.com/mainflux/mainflux/coap" "github.com/mainflux/mainflux/coap/api" + "github.com/mainflux/mainflux/coap/tracing" "github.com/mainflux/mainflux/internal" thingsClient "github.com/mainflux/mainflux/internal/clients/grpc/things" + jaegerClient "github.com/mainflux/mainflux/internal/clients/jaeger" "github.com/mainflux/mainflux/internal/env" "github.com/mainflux/mainflux/internal/server" coapserver "github.com/mainflux/mainflux/internal/server/coap" httpserver "github.com/mainflux/mainflux/internal/server/http" mflog "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/pkg/messaging/brokers" + pstracing "github.com/mainflux/mainflux/pkg/messaging/tracing" "golang.org/x/sync/errgroup" ) @@ -58,14 +61,23 @@ func main() { defer tcHandler.Close() logger.Info("Successfully connected to things grpc server " + tcHandler.Secure()) + tracer, traceCloser, err := jaegerClient.NewTracer(svcName, cfg.JaegerURL) + if err != nil { + logger.Fatal(fmt.Sprintf("failed to init Jaeger: %s", err)) + } + defer traceCloser.Close() + nps, err := brokers.NewPubSub(cfg.BrokerURL, "", logger) if err != nil { logger.Fatal(fmt.Sprintf("failed to connect to message broker: %s", err)) } + nps = pstracing.NewPubSub(tracer, nps) defer nps.Close() svc := coap.New(tc, nps) + svc = tracing.New(tracer, svc) + svc = api.LoggingMiddleware(svc, logger) counter, latency := internal.MakeMetrics(svcName, "api") diff --git a/cmd/http/main.go b/cmd/http/main.go index 75e1be89..2cb126c5 100644 --- a/cmd/http/main.go +++ b/cmd/http/main.go @@ -12,6 +12,7 @@ import ( "github.com/mainflux/mainflux" adapter "github.com/mainflux/mainflux/http" "github.com/mainflux/mainflux/http/api" + "github.com/mainflux/mainflux/http/tracing" "github.com/mainflux/mainflux/internal" thingsClient "github.com/mainflux/mainflux/internal/clients/grpc/things" jaegerClient "github.com/mainflux/mainflux/internal/clients/jaeger" @@ -21,6 +22,8 @@ import ( mflog "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/pkg/messaging" "github.com/mainflux/mainflux/pkg/messaging/brokers" + pstracing "github.com/mainflux/mainflux/pkg/messaging/tracing" + "github.com/opentracing/opentracing-go" "golang.org/x/sync/errgroup" ) @@ -58,20 +61,21 @@ func main() { defer tcHandler.Close() logger.Info("Successfully connected to things grpc server " + tcHandler.Secure()) - pub, err := brokers.NewPublisher(cfg.BrokerURL) - if err != nil { - logger.Fatal(fmt.Sprintf("failed to connect to message broker: %s", err)) - } - defer pub.Close() - - svc := newService(pub, tc, logger) - - tracer, closer, err := jaegerClient.NewTracer("http_adapter", cfg.JaegerURL) + tracer, closer, err := jaegerClient.NewTracer(svcName, cfg.JaegerURL) if err != nil { logger.Fatal(fmt.Sprintf("failed to init Jaeger: %s", err)) } defer closer.Close() + pub, err := brokers.NewPublisher(cfg.BrokerURL) + if err != nil { + logger.Fatal(fmt.Sprintf("failed to connect to message broker: %s", err)) + } + pub = pstracing.New(tracer, pub) + defer pub.Close() + + svc := newService(pub, tc, logger, tracer) + httpServerConfig := server.Config{Port: defSvcHttpPort} if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHttp, AltPrefix: envPrefix}); err != nil { logger.Fatal(fmt.Sprintf("failed to load %s HTTP server configuration : %s", svcName, err)) @@ -91,8 +95,9 @@ func main() { } } -func newService(pub messaging.Publisher, tc mainflux.ThingsServiceClient, logger mflog.Logger) adapter.Service { +func newService(pub messaging.Publisher, tc mainflux.ThingsServiceClient, logger mflog.Logger, tracer opentracing.Tracer) adapter.Service { svc := adapter.New(pub, tc) + svc = tracing.New(tracer, svc) svc = api.LoggingMiddleware(svc, logger) counter, latency := internal.MakeMetrics(svcName, "api") svc = api.MetricsMiddleware(svc, counter, latency) diff --git a/cmd/influxdb-writer/main.go b/cmd/influxdb-writer/main.go index 6e559c2c..6d8865bc 100644 --- a/cmd/influxdb-writer/main.go +++ b/cmd/influxdb-writer/main.go @@ -13,11 +13,13 @@ import ( "github.com/mainflux/mainflux/consumers/writers/api" "github.com/mainflux/mainflux/consumers/writers/influxdb" influxDBClient "github.com/mainflux/mainflux/internal/clients/influxdb" + jaegerClient "github.com/mainflux/mainflux/internal/clients/jaeger" "github.com/mainflux/mainflux/internal/env" "github.com/mainflux/mainflux/internal/server" httpserver "github.com/mainflux/mainflux/internal/server/http" mflog "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/pkg/messaging/brokers" + "github.com/mainflux/mainflux/pkg/messaging/tracing" "golang.org/x/sync/errgroup" ) @@ -33,6 +35,7 @@ type config struct { LogLevel string `env:"MF_INFLUX_WRITER_LOG_LEVEL" envDefault:"info"` ConfigPath string `env:"MF_INFLUX_WRITER_CONFIG_PATH" envDefault:"/config.toml"` BrokerURL string `env:"MF_BROKER_URL" envDefault:"nats://localhost:4222"` + JaegerURL string `env:"MF_JAEGER_URL" envDefault:"localhost:6831"` } func main() { @@ -49,10 +52,17 @@ func main() { log.Fatalf("failed to init logger: %s", err) } + tracer, traceCloser, err := jaegerClient.NewTracer(svcName, cfg.JaegerURL) + if err != nil { + logger.Fatal(fmt.Sprintf("failed to init Jaeger: %s", err)) + } + defer traceCloser.Close() + pubSub, err := brokers.NewPubSub(cfg.BrokerURL, "", logger) if err != nil { logger.Fatal(fmt.Sprintf("failed to connect to message broker: %s", err)) } + pubSub = tracing.NewPubSub(tracer, pubSub) defer pubSub.Close() influxDBConfig := influxDBClient.Config{} @@ -83,7 +93,7 @@ func main() { } }(logger) - if err := consumers.Start(svcName, pubSub, repo, cfg.ConfigPath, logger); err != nil { + if err := consumers.Start(ctx, svcName, pubSub, repo, cfg.ConfigPath, logger); err != nil { logger.Fatal(fmt.Sprintf("failed to start InfluxDB writer: %s", err)) } diff --git a/cmd/lora/main.go b/cmd/lora/main.go index 3fab3f1e..bec01f68 100644 --- a/cmd/lora/main.go +++ b/cmd/lora/main.go @@ -22,8 +22,10 @@ import ( "github.com/mainflux/mainflux/lora/mqtt" "github.com/mainflux/mainflux/pkg/messaging" "github.com/mainflux/mainflux/pkg/messaging/brokers" + "github.com/mainflux/mainflux/pkg/messaging/tracing" "golang.org/x/sync/errgroup" + jaegerClient "github.com/mainflux/mainflux/internal/clients/jaeger" redisClient "github.com/mainflux/mainflux/internal/clients/redis" "github.com/mainflux/mainflux/lora/redis" ) @@ -50,6 +52,7 @@ type config struct { LoraMsgTimeout time.Duration `env:"MF_LORA_ADAPTER_MESSAGES_TIMEOUT" envDefault:"30s"` ESConsumerName string `env:"MF_LORA_ADAPTER_EVENT_CONSUMER" envDefault:"lora"` BrokerURL string `env:"MF_BROKER_URL" envDefault:"nats://localhost:4222"` + JaegerURL string `env:"MF_JAEGER_URL" envDefault:"localhost:6831"` } func main() { @@ -72,10 +75,17 @@ func main() { } defer rmConn.Close() + tracer, traceCloser, err := jaegerClient.NewTracer(svcName, cfg.JaegerURL) + if err != nil { + logger.Fatal(fmt.Sprintf("failed to init Jaeger: %s", err)) + } + defer traceCloser.Close() + pub, err := brokers.NewPublisher(cfg.BrokerURL) if err != nil { logger.Fatal(fmt.Sprintf("failed to connect to message broker: %s", err)) } + pub = tracing.New(tracer, pub) defer pub.Close() svc := newService(pub, rmConn, thingsRMPrefix, channelsRMPrefix, connsRMPrefix, logger) diff --git a/cmd/mongodb-writer/main.go b/cmd/mongodb-writer/main.go index 64319984..9d6b853d 100644 --- a/cmd/mongodb-writer/main.go +++ b/cmd/mongodb-writer/main.go @@ -13,12 +13,14 @@ import ( "github.com/mainflux/mainflux/consumers/writers/api" "github.com/mainflux/mainflux/consumers/writers/mongodb" "github.com/mainflux/mainflux/internal" + jaegerClient "github.com/mainflux/mainflux/internal/clients/jaeger" mongoClient "github.com/mainflux/mainflux/internal/clients/mongo" "github.com/mainflux/mainflux/internal/env" "github.com/mainflux/mainflux/internal/server" httpserver "github.com/mainflux/mainflux/internal/server/http" mflog "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/pkg/messaging/brokers" + "github.com/mainflux/mainflux/pkg/messaging/tracing" "go.mongodb.org/mongo-driver/mongo" "golang.org/x/sync/errgroup" ) @@ -35,6 +37,7 @@ type config struct { LogLevel string `env:"MF_MONGO_WRITER_LOG_LEVEL" envDefault:"info"` ConfigPath string `env:"MF_MONGO_WRITER_CONFIG_PATH" envDefault:"/config.toml"` BrokerURL string `env:"MF_BROKER_URL" envDefault:"nats://localhost:4222"` + JaegerURL string `env:"MF_JAEGER_URL" envDefault:"localhost:6831"` } func main() { @@ -51,10 +54,17 @@ func main() { log.Fatalf("failed to init logger: %s", err) } + tracer, traceCloser, err := jaegerClient.NewTracer(svcName, cfg.JaegerURL) + if err != nil { + logger.Fatal(fmt.Sprintf("failed to init Jaeger: %s", err)) + } + defer traceCloser.Close() + pubSub, err := brokers.NewPubSub(cfg.BrokerURL, "", logger) if err != nil { logger.Fatal(fmt.Sprintf("failed to connect to message broker: %s", err)) } + pubSub = tracing.NewPubSub(tracer, pubSub) defer pubSub.Close() db, err := mongoClient.Setup(envPrefixDB) @@ -64,7 +74,7 @@ func main() { repo := newService(db, logger) - if err := consumers.Start(svcName, pubSub, repo, cfg.ConfigPath, logger); err != nil { + if err := consumers.Start(ctx, svcName, pubSub, repo, cfg.ConfigPath, logger); err != nil { logger.Fatal(fmt.Sprintf("failed to start MongoDB writer: %s", err)) } diff --git a/cmd/mqtt/main.go b/cmd/mqtt/main.go index b7564037..49ba4205 100644 --- a/cmd/mqtt/main.go +++ b/cmd/mqtt/main.go @@ -11,6 +11,7 @@ import ( "github.com/cenkalti/backoff/v4" thingsClient "github.com/mainflux/mainflux/internal/clients/grpc/things" + jaegerClient "github.com/mainflux/mainflux/internal/clients/jaeger" redisClient "github.com/mainflux/mainflux/internal/clients/redis" "github.com/mainflux/mainflux/internal/env" mflog "github.com/mainflux/mainflux/logger" @@ -21,6 +22,7 @@ import ( "github.com/mainflux/mainflux/pkg/messaging" "github.com/mainflux/mainflux/pkg/messaging/brokers" mqttpub "github.com/mainflux/mainflux/pkg/messaging/mqtt" + "github.com/mainflux/mainflux/pkg/messaging/tracing" mp "github.com/mainflux/mproxy/pkg/mqtt" "github.com/mainflux/mproxy/pkg/session" ws "github.com/mainflux/mproxy/pkg/websocket" @@ -75,19 +77,27 @@ func main() { } } + tracer, traceCloser, err := jaegerClient.NewTracer(svcName, cfg.JaegerURL) + if err != nil { + logger.Fatal(fmt.Sprintf("failed to init Jaeger: %s", err)) + } + defer traceCloser.Close() + nps, err := brokers.NewPubSub(cfg.BrokerURL, "mqtt", logger) if err != nil { logger.Fatal(fmt.Sprintf("failed to connect to message broker: %s", err)) } + nps = tracing.NewPubSub(tracer, nps) defer nps.Close() mpub, err := mqttpub.NewPublisher(fmt.Sprintf("%s:%s", cfg.MqttTargetHost, cfg.MqttTargetPort), cfg.MqttForwarderTimeout) if err != nil { logger.Fatal(fmt.Sprintf("failed to create MQTT publisher: %s", err)) } + mpub = tracing.New(tracer, mpub) fwd := mqtt.NewForwarder(brokers.SubjectAllChannels, logger) - if err := fwd.Forward(svcName, nps, mpub); err != nil { + if err := fwd.Forward(ctx, svcName, nps, mpub); err != nil { logger.Fatal(fmt.Sprintf("failed to forward message broker messages: %s", err)) } @@ -95,6 +105,7 @@ func main() { if err != nil { logger.Fatal(fmt.Sprintf("failed to connect to message broker: %s", err)) } + np = tracing.New(tracer, np) defer np.Close() ec, err := redisClient.Setup(envPrefixES) diff --git a/cmd/opcua/main.go b/cmd/opcua/main.go index 304ebf2f..731e277b 100644 --- a/cmd/opcua/main.go +++ b/cmd/opcua/main.go @@ -11,6 +11,7 @@ import ( r "github.com/go-redis/redis/v8" "github.com/mainflux/mainflux/internal" + jaegerClient "github.com/mainflux/mainflux/internal/clients/jaeger" redisClient "github.com/mainflux/mainflux/internal/clients/redis" "github.com/mainflux/mainflux/internal/env" "github.com/mainflux/mainflux/internal/server" @@ -22,6 +23,7 @@ import ( "github.com/mainflux/mainflux/opcua/gopcua" "github.com/mainflux/mainflux/opcua/redis" "github.com/mainflux/mainflux/pkg/messaging/brokers" + "github.com/mainflux/mainflux/pkg/messaging/tracing" "golang.org/x/sync/errgroup" ) @@ -42,6 +44,7 @@ type config struct { LogLevel string `env:"MF_OPCUA_ADAPTER_LOG_LEVEL" envDefault:"info"` ESConsumerName string `env:"MF_OPCUA_ADAPTER_EVENT_CONSUMER" envDefault:""` BrokerURL string `env:"MF_BROKER_URL" envDefault:"nats://localhost:4222"` + JaegerURL string `env:"MF_JAEGER_URL" envDefault:"localhost:6831"` } func main() { @@ -79,10 +82,17 @@ func main() { } defer esConn.Close() + tracer, traceCloser, err := jaegerClient.NewTracer(svcName, cfg.JaegerURL) + if err != nil { + logger.Fatal(fmt.Sprintf("failed to init Jaeger: %s", err)) + } + defer traceCloser.Close() + pubSub, err := brokers.NewPubSub(cfg.BrokerURL, "", logger) if err != nil { logger.Fatal(fmt.Sprintf("failed to connect to message broker: %s", err)) } + pubSub = tracing.NewPubSub(tracer, pubSub) defer pubSub.Close() ctx := context.Background() diff --git a/cmd/postgres-writer/main.go b/cmd/postgres-writer/main.go index 0b6f436a..abf5f0c6 100644 --- a/cmd/postgres-writer/main.go +++ b/cmd/postgres-writer/main.go @@ -14,12 +14,14 @@ import ( "github.com/mainflux/mainflux/consumers/writers/api" writerPg "github.com/mainflux/mainflux/consumers/writers/postgres" "github.com/mainflux/mainflux/internal" + jaegerClient "github.com/mainflux/mainflux/internal/clients/jaeger" pgClient "github.com/mainflux/mainflux/internal/clients/postgres" "github.com/mainflux/mainflux/internal/env" "github.com/mainflux/mainflux/internal/server" httpserver "github.com/mainflux/mainflux/internal/server/http" mflog "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/pkg/messaging/brokers" + "github.com/mainflux/mainflux/pkg/messaging/tracing" "golang.org/x/sync/errgroup" ) @@ -35,6 +37,7 @@ type config struct { LogLevel string `env:"MF_POSTGRES_WRITER_LOG_LEVEL" envDefault:"info"` ConfigPath string `env:"MF_POSTGRES_WRITER_CONFIG_PATH" envDefault:"/config.toml"` BrokerURL string `env:"MF_BROKER_URL" envDefault:"nats://localhost:4222"` + JaegerURL string `env:"MF_JAEGER_URL" envDefault:"localhost:6831"` } func main() { @@ -51,10 +54,17 @@ func main() { log.Fatalf("failed to init logger: %s", err) } + tracer, traceCloser, err := jaegerClient.NewTracer(svcName, cfg.JaegerURL) + if err != nil { + logger.Fatal(fmt.Sprintf("failed to init Jaeger: %s", err)) + } + defer traceCloser.Close() + pubSub, err := brokers.NewPubSub(cfg.BrokerURL, "", logger) if err != nil { logger.Fatal(fmt.Sprintf("failed to connect to message broker: %s", err)) } + pubSub = tracing.NewPubSub(tracer, pubSub) defer pubSub.Close() dbConfig := pgClient.Config{Name: defDB} @@ -66,7 +76,7 @@ func main() { repo := newService(db, logger) - if err = consumers.Start(svcName, pubSub, repo, cfg.ConfigPath, logger); err != nil { + if err = consumers.Start(ctx, svcName, pubSub, repo, cfg.ConfigPath, logger); err != nil { logger.Fatal(fmt.Sprintf("failed to create Postgres writer: %s", err)) } diff --git a/cmd/smpp-notifier/main.go b/cmd/smpp-notifier/main.go index 42d9bdd2..ffa7818a 100644 --- a/cmd/smpp-notifier/main.go +++ b/cmd/smpp-notifier/main.go @@ -28,6 +28,7 @@ import ( pgClient "github.com/mainflux/mainflux/internal/clients/postgres" mflog "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/pkg/messaging/brokers" + pstracing "github.com/mainflux/mainflux/pkg/messaging/tracing" "github.com/mainflux/mainflux/pkg/ulid" opentracing "github.com/opentracing/opentracing-go" ) @@ -74,10 +75,17 @@ func main() { logger.Fatal(fmt.Sprintf("failed to load SMPP configuration from environment : %s", err)) } + tracer, traceCloser, err := jaegerClient.NewTracer(svcName, cfg.JaegerURL) + if err != nil { + logger.Fatal(fmt.Sprintf("failed to init Jaeger: %s", err)) + } + defer traceCloser.Close() + pubSub, err := brokers.NewPubSub(cfg.BrokerURL, "", logger) if err != nil { logger.Fatal(fmt.Sprintf("failed to connect to message broker: %s", err)) } + pubSub = pstracing.NewPubSub(tracer, pubSub) defer pubSub.Close() auth, authHandler, err := authClient.Setup(envPrefix, cfg.JaegerURL) @@ -87,21 +95,15 @@ func main() { defer authHandler.Close() logger.Info("Successfully connected to auth grpc server " + authHandler.Secure()) - tracer, closer, err := jaegerClient.NewTracer("smpp-notifier", cfg.JaegerURL) - if err != nil { - logger.Fatal(fmt.Sprintf("failed to init Jaeger: %s", err)) - } - defer closer.Close() - dbTracer, dbCloser, err := jaegerClient.NewTracer("smpp-notifier_db", cfg.JaegerURL) if err != nil { logger.Fatal(fmt.Sprintf("failed to init Jaeger: %s", err)) } defer dbCloser.Close() - svc := newService(db, dbTracer, auth, cfg, smppConfig, logger) + svc := newService(db, dbTracer, auth, cfg, smppConfig, logger, tracer) - if err = consumers.Start(svcName, pubSub, svc, cfg.ConfigPath, logger); err != nil { + if err = consumers.Start(ctx, svcName, pubSub, svc, cfg.ConfigPath, logger); err != nil { logger.Fatal(fmt.Sprintf("failed to create Postgres writer: %s", err)) } @@ -125,11 +127,12 @@ func main() { } -func newService(db *sqlx.DB, tracer opentracing.Tracer, auth mainflux.AuthServiceClient, c config, sc mfsmpp.Config, logger mflog.Logger) notifiers.Service { +func newService(db *sqlx.DB, tracer opentracing.Tracer, auth mainflux.AuthServiceClient, c config, sc mfsmpp.Config, logger mflog.Logger, svcTracer opentracing.Tracer) notifiers.Service { database := notifierPg.NewDatabase(db) - repo := tracing.New(notifierPg.New(database), tracer) + repo := tracing.New(tracer, notifierPg.New(database)) idp := ulid.New() notifier := mfsmpp.New(sc) + notifier = tracing.NewNotifier(svcTracer, notifier) svc := notifiers.New(auth, repo, idp, notifier, c.From) svc = api.LoggingMiddleware(svc, logger) counter, latency := internal.MakeMetrics("notifier", "smpp") diff --git a/cmd/smtp-notifier/main.go b/cmd/smtp-notifier/main.go index 77ac29e3..400ec3db 100644 --- a/cmd/smtp-notifier/main.go +++ b/cmd/smtp-notifier/main.go @@ -19,7 +19,7 @@ import ( "github.com/mainflux/mainflux/consumers/notifiers/tracing" "github.com/mainflux/mainflux/internal" authClient "github.com/mainflux/mainflux/internal/clients/grpc/auth" - jagerClient "github.com/mainflux/mainflux/internal/clients/jaeger" + jaegerClient "github.com/mainflux/mainflux/internal/clients/jaeger" pgClient "github.com/mainflux/mainflux/internal/clients/postgres" "github.com/mainflux/mainflux/internal/email" "github.com/mainflux/mainflux/internal/env" @@ -27,6 +27,7 @@ import ( httpserver "github.com/mainflux/mainflux/internal/server/http" mflog "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/pkg/messaging/brokers" + pstracing "github.com/mainflux/mainflux/pkg/messaging/tracing" "github.com/mainflux/mainflux/pkg/ulid" opentracing "github.com/opentracing/opentracing-go" "golang.org/x/sync/errgroup" @@ -74,10 +75,17 @@ func main() { logger.Fatal(fmt.Sprintf("failed to load email configuration : %s", err)) } + tracer, traceCloser, err := jaegerClient.NewTracer(svcName, cfg.JaegerURL) + if err != nil { + logger.Fatal(fmt.Sprintf("failed to init Jaeger: %s", err)) + } + defer traceCloser.Close() + pubSub, err := brokers.NewPubSub(cfg.BrokerURL, "", logger) if err != nil { logger.Fatal(fmt.Sprintf("failed to connect to message broker: %s", err)) } + pubSub = pstracing.NewPubSub(tracer, pubSub) defer pubSub.Close() auth, authHandler, err := authClient.Setup(envPrefix, cfg.JaegerURL) @@ -87,21 +95,15 @@ func main() { defer authHandler.Close() logger.Info("Successfully connected to auth grpc server " + authHandler.Secure()) - tracer, closer, err := jagerClient.NewTracer("smtp-notifier", cfg.JaegerURL) - if err != nil { - logger.Fatal(fmt.Sprintf("failed to init Jaeger: %s", err)) - } - defer closer.Close() - - dbTracer, dbCloser, err := jagerClient.NewTracer("smtp-notifier_db", cfg.JaegerURL) + dbTracer, dbCloser, err := jaegerClient.NewTracer("smtp-notifier_db", cfg.JaegerURL) if err != nil { logger.Fatal(fmt.Sprintf("failed to init Jaeger: %s", err)) } defer dbCloser.Close() - svc := newService(db, dbTracer, auth, cfg, ec, logger) + svc := newService(db, dbTracer, auth, cfg, ec, logger, tracer) - if err = consumers.Start(svcName, pubSub, svc, cfg.ConfigPath, logger); err != nil { + if err = consumers.Start(ctx, svcName, pubSub, svc, cfg.ConfigPath, logger); err != nil { logger.Fatal(fmt.Sprintf("failed to create Postgres writer: %s", err)) } @@ -125,9 +127,9 @@ func main() { } -func newService(db *sqlx.DB, tracer opentracing.Tracer, auth mainflux.AuthServiceClient, c config, ec email.Config, logger mflog.Logger) notifiers.Service { +func newService(db *sqlx.DB, tracer opentracing.Tracer, auth mainflux.AuthServiceClient, c config, ec email.Config, logger mflog.Logger, svcTracer opentracing.Tracer) notifiers.Service { database := notifierPg.NewDatabase(db) - repo := tracing.New(notifierPg.New(database), tracer) + repo := tracing.New(tracer, notifierPg.New(database)) idp := ulid.New() agent, err := email.New(&ec) @@ -136,6 +138,7 @@ func newService(db *sqlx.DB, tracer opentracing.Tracer, auth mainflux.AuthServic } notifier := smtp.New(agent) + notifier = tracing.NewNotifier(tracer, notifier) svc := notifiers.New(auth, repo, idp, notifier, c.From) svc = api.LoggingMiddleware(svc, logger) counter, latency := internal.MakeMetrics("notifier", "smtp") diff --git a/cmd/timescale-writer/main.go b/cmd/timescale-writer/main.go index 9a6296bf..ad211a20 100644 --- a/cmd/timescale-writer/main.go +++ b/cmd/timescale-writer/main.go @@ -14,12 +14,14 @@ import ( "github.com/mainflux/mainflux/consumers/writers/api" "github.com/mainflux/mainflux/consumers/writers/timescale" "github.com/mainflux/mainflux/internal" + jaegerClient "github.com/mainflux/mainflux/internal/clients/jaeger" pgClient "github.com/mainflux/mainflux/internal/clients/postgres" "github.com/mainflux/mainflux/internal/env" "github.com/mainflux/mainflux/internal/server" httpserver "github.com/mainflux/mainflux/internal/server/http" mflog "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/pkg/messaging/brokers" + "github.com/mainflux/mainflux/pkg/messaging/tracing" "golang.org/x/sync/errgroup" ) @@ -35,6 +37,7 @@ type config struct { LogLevel string `env:"MF_TIMESCALE_WRITER_LOG_LEVEL" envDefault:"info"` ConfigPath string `env:"MF_TIMESCALE_WRITER_CONFIG_PATH" envDefault:"/config.toml"` BrokerURL string `env:"MF_BROKER_URL" envDefault:"nats://localhost:4222"` + JaegerURL string `env:"MF_JAEGER_URL" envDefault:"localhost:6831"` } func main() { @@ -58,15 +61,22 @@ func main() { } defer db.Close() + tracer, traceCloser, err := jaegerClient.NewTracer(svcName, cfg.JaegerURL) + if err != nil { + logger.Fatal(fmt.Sprintf("failed to init Jaeger: %s", err)) + } + defer traceCloser.Close() + repo := newService(db, logger) pubSub, err := brokers.NewPubSub(cfg.BrokerURL, "", logger) if err != nil { logger.Fatal(fmt.Sprintf("failed to connect to message broker: %s", err)) } + pubSub = tracing.NewPubSub(tracer, pubSub) defer pubSub.Close() - if err = consumers.Start(svcName, pubSub, repo, cfg.ConfigPath, logger); err != nil { + if err = consumers.Start(ctx, svcName, pubSub, repo, cfg.ConfigPath, logger); err != nil { logger.Fatal(fmt.Sprintf("failed to create Timescale writer: %s", err)) } diff --git a/cmd/twins/main.go b/cmd/twins/main.go index 6c6c425c..c225ea8a 100644 --- a/cmd/twins/main.go +++ b/cmd/twins/main.go @@ -22,6 +22,7 @@ import ( mflog "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/pkg/messaging" "github.com/mainflux/mainflux/pkg/messaging/brokers" + pstracing "github.com/mainflux/mainflux/pkg/messaging/tracing" "github.com/mainflux/mainflux/pkg/uuid" localusers "github.com/mainflux/mainflux/things/standalone" "github.com/mainflux/mainflux/twins" @@ -104,19 +105,20 @@ func main() { logger.Info("Successfully connected to auth grpc server " + authHandler.Secure()) } + tracer, traceCloser, err := jaegerClient.NewTracer(svcName, cfg.JaegerURL) + if err != nil { + logger.Fatal(fmt.Sprintf("failed to init Jaeger: %s", err)) + } + defer traceCloser.Close() + pubSub, err := brokers.NewPubSub(cfg.BrokerURL, queue, logger) if err != nil { logger.Fatal(fmt.Sprintf("failed to connect to message broker: %s", err)) } + pubSub = pstracing.NewPubSub(tracer, pubSub) defer pubSub.Close() - svc := newService(svcName, pubSub, cfg.ChannelID, auth, dbTracer, db, cacheTracer, cacheClient, logger) - - tracer, closer, err := jaegerClient.NewTracer("twins", cfg.JaegerURL) - if err != nil { - logger.Fatal(fmt.Sprintf("failed to init Jaeger: %s", err)) - } - defer closer.Close() + svc := newService(ctx, svcName, pubSub, cfg.ChannelID, auth, dbTracer, db, cacheTracer, cacheClient, logger) httpServerConfig := server.Config{Port: defSvcHttpPort} if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHttp, AltPrefix: envPrefix}); err != nil { @@ -137,7 +139,7 @@ func main() { } } -func newService(id string, ps messaging.PubSub, chanID string, users mainflux.AuthServiceClient, dbTracer opentracing.Tracer, db *mongo.Database, cacheTracer opentracing.Tracer, cacheClient *redis.Client, logger mflog.Logger) twins.Service { +func newService(ctx context.Context, id string, ps messaging.PubSub, chanID string, users mainflux.AuthServiceClient, dbTracer opentracing.Tracer, db *mongo.Database, cacheTracer opentracing.Tracer, cacheClient *redis.Client, logger mflog.Logger) twins.Service { twinRepo := twmongodb.NewTwinRepository(db) twinRepo = tracing.TwinRepositoryMiddleware(dbTracer, twinRepo) @@ -152,7 +154,7 @@ func newService(id string, ps messaging.PubSub, chanID string, users mainflux.Au svc = api.LoggingMiddleware(svc, logger) counter, latency := internal.MakeMetrics(svcName, "api") svc = api.MetricsMiddleware(svc, counter, latency) - err := ps.Subscribe(id, brokers.SubjectAllChannels, handle(logger, chanID, svc)) + err := ps.Subscribe(ctx, id, brokers.SubjectAllChannels, handle(logger, chanID, svc)) if err != nil { logger.Fatal(err.Error()) } diff --git a/cmd/users/main.go b/cmd/users/main.go index 0bc465e0..fa513a1d 100644 --- a/cmd/users/main.go +++ b/cmd/users/main.go @@ -127,7 +127,7 @@ func main() { func newService(db *sqlx.DB, tracer opentracing.Tracer, auth mainflux.AuthServiceClient, c config, ec email.Config, logger mflog.Logger) users.Service { database := usersPg.NewDatabase(db) hasher := bcrypt.New() - userRepo := tracing.UserRepositoryMiddleware(usersPg.NewUserRepo(database), tracer) + userRepo := tracing.UserRepositoryMiddleware(tracer, usersPg.NewUserRepo(database)) emailer, err := emailer.New(c.ResetURL, &ec) if err != nil { diff --git a/cmd/ws/main.go b/cmd/ws/main.go index e695a61b..f7127439 100644 --- a/cmd/ws/main.go +++ b/cmd/ws/main.go @@ -10,18 +10,22 @@ import ( "os" "github.com/mainflux/mainflux" + "github.com/opentracing/opentracing-go" "golang.org/x/sync/errgroup" "github.com/mainflux/mainflux/internal" thingsClient "github.com/mainflux/mainflux/internal/clients/grpc/things" + jaegerClient "github.com/mainflux/mainflux/internal/clients/jaeger" "github.com/mainflux/mainflux/internal/env" "github.com/mainflux/mainflux/internal/server" httpserver "github.com/mainflux/mainflux/internal/server/http" mflog "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/pkg/messaging" "github.com/mainflux/mainflux/pkg/messaging/brokers" + pstracing "github.com/mainflux/mainflux/pkg/messaging/tracing" adapter "github.com/mainflux/mainflux/ws" "github.com/mainflux/mainflux/ws/api" + "github.com/mainflux/mainflux/ws/tracing" ) const ( @@ -58,14 +62,20 @@ func main() { defer internal.Close(logger, tcHandler) logger.Info("Successfully connected to things grpc server " + tcHandler.Secure()) + tracer, traceCloser, err := jaegerClient.NewTracer(svcName, cfg.JaegerURL) + if err != nil { + logger.Fatal(fmt.Sprintf("failed to init Jaeger: %s", err)) + } + defer traceCloser.Close() + nps, err := brokers.NewPubSub(cfg.BrokerURL, "", logger) if err != nil { logger.Fatal(fmt.Sprintf("Failed to connect to message broker: %s", err)) - } + nps = pstracing.NewPubSub(tracer, nps) defer nps.Close() - svc := newService(tc, nps, logger) + svc := newService(tc, nps, logger, tracer) httpServerConfig := server.Config{Port: defSvcHttpPort} if err := env.Parse(&httpServerConfig, env.Options{Prefix: envPrefixHttp, AltPrefix: envPrefix}); err != nil { @@ -86,8 +96,9 @@ func main() { } } -func newService(tc mainflux.ThingsServiceClient, nps messaging.PubSub, logger mflog.Logger) adapter.Service { +func newService(tc mainflux.ThingsServiceClient, nps messaging.PubSub, logger mflog.Logger, tracer opentracing.Tracer) adapter.Service { svc := adapter.New(tc, nps) + svc = tracing.New(tracer, svc) svc = api.LoggingMiddleware(svc, logger) counter, latency := internal.MakeMetrics("ws_adapter", "api") svc = api.MetricsMiddleware(svc, counter, latency) diff --git a/coap/adapter.go b/coap/adapter.go index 57df97b2..cb4fa0b1 100644 --- a/coap/adapter.go +++ b/coap/adapter.go @@ -66,7 +66,7 @@ func (svc *adapterService) Publish(ctx context.Context, key string, msg *messagi } msg.Publisher = thid.GetValue() - return svc.pubsub.Publish(msg.Channel, msg) + return svc.pubsub.Publish(ctx, msg.Channel, msg) } func (svc *adapterService) Subscribe(ctx context.Context, key, chanID, subtopic string, c Client) error { @@ -81,7 +81,7 @@ func (svc *adapterService) Subscribe(ctx context.Context, key, chanID, subtopic if subtopic != "" { subject = fmt.Sprintf("%s.%s", subject, subtopic) } - return svc.pubsub.Subscribe(c.Token(), subject, c) + return svc.pubsub.Subscribe(ctx, c.Token(), subject, c) } func (svc *adapterService) Unsubscribe(ctx context.Context, key, chanID, subtopic, token string) error { @@ -96,5 +96,5 @@ func (svc *adapterService) Unsubscribe(ctx context.Context, key, chanID, subtopi if subtopic != "" { subject = fmt.Sprintf("%s.%s", subject, subtopic) } - return svc.pubsub.Unsubscribe(token, subject) + return svc.pubsub.Unsubscribe(ctx, token, subject) } diff --git a/coap/tracing/adapter.go b/coap/tracing/adapter.go new file mode 100644 index 00000000..bea97a2b --- /dev/null +++ b/coap/tracing/adapter.go @@ -0,0 +1,67 @@ +package tracing + +import ( + "context" + + "github.com/mainflux/mainflux/coap" + "github.com/mainflux/mainflux/pkg/messaging" + "github.com/opentracing/opentracing-go" +) + +var _ coap.Service = (*tracingServiceMiddleware)(nil) + +// Operation names for tracing CoAP operations. +const ( + publishOP = "publish_op" + subscribeOP = "subscirbe_op" + unsubscribeOP = "unsubscribe_op" +) + +// tracingServiceMiddleware is a middleware implementation for tracing CoAP service operations using OpenTracing. +type tracingServiceMiddleware struct { + tracer opentracing.Tracer + svc coap.Service +} + +// New creates a new instance of TracingServiceMiddleware that wraps an existing CoAP service with tracing capabilities. +func New(tracer opentracing.Tracer, svc coap.Service) coap.Service { + return &tracingServiceMiddleware{ + tracer: tracer, + svc: svc, + } +} + +// Publish traces a CoAP publish operation. +func (tm *tracingServiceMiddleware) Publish(ctx context.Context, key string, msg *messaging.Message) error { + span := tm.createSpan(ctx, publishOP) + defer span.Finish() + ctx = opentracing.ContextWithSpan(ctx, span) + return tm.svc.Publish(ctx, key, msg) +} + +// Subscribe traces a CoAP subscribe operation. +func (tm *tracingServiceMiddleware) Subscribe(ctx context.Context, key string, chanID string, subtopic string, c coap.Client) error { + span := tm.createSpan(ctx, subscribeOP) + defer span.Finish() + ctx = opentracing.ContextWithSpan(ctx, span) + return tm.svc.Subscribe(ctx, key, chanID, subtopic, c) +} + +// Unsubscribe traces a CoAP unsubscribe operation. +func (tm *tracingServiceMiddleware) Unsubscribe(ctx context.Context, key string, chanID string, subptopic string, token string) error { + span := tm.createSpan(ctx, unsubscribeOP) + defer span.Finish() + ctx = opentracing.ContextWithSpan(ctx, span) + return tm.svc.Unsubscribe(ctx, key, chanID, subptopic, token) +} + +// createSpan creates an OpenTracing span with an operation name and an optional parent span. +func (tm *tracingServiceMiddleware) createSpan(ctx context.Context, opName string) opentracing.Span { + if parentSpan := opentracing.SpanFromContext(ctx); parentSpan != nil { + return tm.tracer.StartSpan( + opName, + opentracing.ChildOf(parentSpan.Context()), + ) + } + return tm.tracer.StartSpan(opName) +} diff --git a/consumers/messages.go b/consumers/messages.go index 277973ac..ae29d391 100644 --- a/consumers/messages.go +++ b/consumers/messages.go @@ -4,6 +4,7 @@ package consumers import ( + "context" "fmt" "os" "strings" @@ -32,7 +33,7 @@ var ( // Start method starts consuming messages received from Message broker. // This method transforms messages to SenML format before // using MessageRepository to store them. -func Start(id string, sub messaging.Subscriber, consumer interface{}, configPath string, logger logger.Logger) error { +func Start(ctx context.Context, id string, sub messaging.Subscriber, consumer interface{}, configPath string, logger logger.Logger) error { cfg, err := loadConfig(configPath) if err != nil { logger.Warn(fmt.Sprintf("Failed to load consumer config: %s", err)) @@ -43,11 +44,11 @@ func Start(id string, sub messaging.Subscriber, consumer interface{}, configPath for _, subject := range cfg.SubscriberCfg.Subjects { switch c := consumer.(type) { case AsyncConsumer: - if err := sub.Subscribe(id, subject, handleAsync(transformer, c)); err != nil { + if err := sub.Subscribe(ctx, id, subject, handleAsync(transformer, c)); err != nil { return err } case BlockingConsumer: - if err := sub.Subscribe(id, subject, handleSync(transformer, c)); err != nil { + if err := sub.Subscribe(ctx, id, subject, handleSync(transformer, c)); err != nil { return err } default: @@ -92,7 +93,6 @@ type handleFunc func(msg *messaging.Message) error func (h handleFunc) Handle(msg *messaging.Message) error { return h(msg) - } func (h handleFunc) Cancel() error { diff --git a/consumers/notifiers/tracing/notifier.go b/consumers/notifiers/tracing/notifier.go new file mode 100644 index 00000000..49fc7493 --- /dev/null +++ b/consumers/notifiers/tracing/notifier.go @@ -0,0 +1,33 @@ +package tracing + +import ( + notifiers "github.com/mainflux/mainflux/consumers/notifiers" + "github.com/mainflux/mainflux/pkg/messaging" + "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" +) + +const notifierOP = "notifier_op" + +var _ notifiers.Notifier = (*serviceMiddleware)(nil) + +type serviceMiddleware struct { + svc notifiers.Notifier + tracer opentracing.Tracer +} + +// NewNotifier creates a new notifier tracing middleware service. +func NewNotifier(tracer opentracing.Tracer, svc notifiers.Notifier) notifiers.Notifier { + return &serviceMiddleware{ + svc: svc, + tracer: tracer, + } +} + +// Notify traces notify operations. +func (sm *serviceMiddleware) Notify(from string, to []string, msg *messaging.Message) error { + span := sm.tracer.StartSpan(notifierOP, ext.SpanKindConsumer) + ext.MessageBusDestination.Set(span, msg.Subtopic) + defer span.Finish() + return sm.svc.Notify(from, to, msg) +} diff --git a/consumers/notifiers/tracing/subscriptions.go b/consumers/notifiers/tracing/subscriptions.go index bcaf00b5..f165b8e6 100644 --- a/consumers/notifiers/tracing/subscriptions.go +++ b/consumers/notifiers/tracing/subscriptions.go @@ -28,7 +28,7 @@ type subRepositoryMiddleware struct { // New instantiates a new Subscriptions repository that // tracks request and their latency, and adds spans to context. -func New(repo notifiers.SubscriptionsRepository, tracer opentracing.Tracer) notifiers.SubscriptionsRepository { +func New(tracer opentracing.Tracer, repo notifiers.SubscriptionsRepository) notifiers.SubscriptionsRepository { return subRepositoryMiddleware{ tracer: tracer, repo: repo, diff --git a/consumers/writers/cassandra/README.md b/consumers/writers/cassandra/README.md index 1cefa58d..ee0e3286 100644 --- a/consumers/writers/cassandra/README.md +++ b/consumers/writers/cassandra/README.md @@ -18,7 +18,8 @@ default values. | MF_CASSANDRA_WRITER_DB_USER | Cassandra DB username | | | MF_CASSANDRA_WRITER_DB_PASS | Cassandra DB password | | | MF_CASSANDRA_WRITER_DB_PORT | Cassandra DB port | 9042 | -| MF_CASSANDRA_WRITER_CONFIG_PATH | Config file path with NATS subjects list, payload type and content-type | /config.toml | +| MF_CASSANDRA_WRITER_CONFIG_PATH | Config file path with NATS subjects list, payload type and content-type | /config.toml || MF_JAEGER_URL | Jaeger server URL | localhost:6831 | +| MF_JAEGER_URL | Jaeger server URL | localhost:6831 | ## Deployment The service itself is distributed as Docker container. Check the [`cassandra-writer`](https://github.com/mainflux/mainflux/blob/master/docker/addons/cassandra-writer/docker-compose.yml#L30-L49) service section in docker-compose to see how service is deployed. diff --git a/consumers/writers/influxdb/README.md b/consumers/writers/influxdb/README.md index c178fcce..9091fcc6 100644 --- a/consumers/writers/influxdb/README.md +++ b/consumers/writers/influxdb/README.md @@ -27,6 +27,7 @@ default values. | MF_INFLUXDB_HTTP_ENABLED | InfluxDB http enabled status | true | | MF_INFLUXDB_INIT_MODE | InfluxDB initialization mode | setup | | MF_INFLUX_WRITER_CONFIG_PATH | Config file path with message broker subjects list, payload type and content-type | /configs.toml | +| MF_JAEGER_URL | Jaeger server URL | localhost:6831 | ## Deployment diff --git a/consumers/writers/mongodb/README.md b/consumers/writers/mongodb/README.md index da75dea0..6d13266c 100644 --- a/consumers/writers/mongodb/README.md +++ b/consumers/writers/mongodb/README.md @@ -17,6 +17,7 @@ default values. | MF_MONGO_WRITER_DB_HOST | Default MongoDB database host | localhost | | MF_MONGO_WRITER_DB_PORT | Default MongoDB database port | 27017 | | MF_MONGO_WRITER_CONFIG_PATH | Config file path with Message broker subjects list, payload type and content-type | /config.toml | +| MF_JAEGER_URL | Jaeger server URL | localhost:6831 | ## Deployment diff --git a/consumers/writers/postgres/README.md b/consumers/writers/postgres/README.md index 5b7f7801..8df0ef35 100644 --- a/consumers/writers/postgres/README.md +++ b/consumers/writers/postgres/README.md @@ -23,6 +23,7 @@ default values. | MF_POSTGRES_WRITER_DB_SSL_KEY | Postgres SSL key | "" | | MF_POSTGRES_WRITER_DB_SSL_ROOT_CERT | Postgres SSL root certificate path | "" | | MF_POSTGRES_WRITER_CONFIG_PATH | Config file path with Message broker subjects list, payload type and content-type | /config.toml | +| MF_JAEGER_URL | Jaeger server URL | localhost:6831 | ## Deployment diff --git a/consumers/writers/timescale/README.md b/consumers/writers/timescale/README.md index bae33652..2d4ebb4a 100644 --- a/consumers/writers/timescale/README.md +++ b/consumers/writers/timescale/README.md @@ -23,6 +23,7 @@ default values. | MF_TIMESCALE_WRITER_DB_SSL_KEY | Timescale SSL key | "" | | MF_TIMESCALE_WRITER_DB_SSL_ROOT_CERT | Timescale SSL root certificate path | "" | | MF_TIMESCALE_WRITER_CONFIG_PATH | Configuration file path with Message broker subjects list | /config.toml | +| MF_JAEGER_URL | Jaeger server URL | localhost:6831 | ## Deployment diff --git a/docker/addons/cassandra-writer/docker-compose.yml b/docker/addons/cassandra-writer/docker-compose.yml index bb3831fb..48e9e4e1 100644 --- a/docker/addons/cassandra-writer/docker-compose.yml +++ b/docker/addons/cassandra-writer/docker-compose.yml @@ -40,6 +40,7 @@ services: MF_CASSANDRA_WRITER_DB_PORT: ${MF_CASSANDRA_WRITER_DB_PORT} MF_CASSANDRA_WRITER_DB_CLUSTER: ${MF_CASSANDRA_WRITER_DB_CLUSTER} MF_CASSANDRA_WRITER_DB_KEYSPACE: ${MF_CASSANDRA_WRITER_DB_KEYSPACE} + MF_JAEGER_URL: ${MF_JAEGER_URL} ports: - ${MF_CASSANDRA_WRITER_PORT}:${MF_CASSANDRA_WRITER_PORT} networks: diff --git a/docker/addons/influxdb-writer/docker-compose.yml b/docker/addons/influxdb-writer/docker-compose.yml index 6c16c5a1..ccbee09e 100644 --- a/docker/addons/influxdb-writer/docker-compose.yml +++ b/docker/addons/influxdb-writer/docker-compose.yml @@ -54,6 +54,7 @@ services: MF_INFLUXDB_PROTOCOL: ${MF_INFLUXDB_PROTOCOL} MF_INFLUXDB_ADMIN_USER: ${MF_INFLUXDB_ADMIN_USER} MF_INFLUXDB_ADMIN_PASSWORD: ${MF_INFLUXDB_ADMIN_PASSWORD} + MF_JAEGER_URL: ${MF_JAEGER_URL} ports: - ${MF_INFLUX_WRITER_PORT}:${MF_INFLUX_WRITER_PORT} networks: diff --git a/docker/addons/mongodb-writer/docker-compose.yml b/docker/addons/mongodb-writer/docker-compose.yml index 70da6dc3..4426d0c8 100644 --- a/docker/addons/mongodb-writer/docker-compose.yml +++ b/docker/addons/mongodb-writer/docker-compose.yml @@ -46,6 +46,7 @@ services: MF_MONGO_WRITER_DB: ${MF_MONGO_WRITER_DB} MF_MONGO_WRITER_DB_HOST: mongodb MF_MONGO_WRITER_DB_PORT: ${MF_MONGO_WRITER_DB_PORT} + MF_JAEGER_URL: ${MF_JAEGER_URL} ports: - ${MF_MONGO_WRITER_PORT}:${MF_MONGO_WRITER_PORT} networks: diff --git a/docker/addons/postgres-writer/docker-compose.yml b/docker/addons/postgres-writer/docker-compose.yml index 4c2cdad1..07593d58 100644 --- a/docker/addons/postgres-writer/docker-compose.yml +++ b/docker/addons/postgres-writer/docker-compose.yml @@ -50,6 +50,7 @@ services: MF_POSTGRES_WRITER_DB_SSL_CERT: ${MF_POSTGRES_WRITER_DB_SSL_CERT} MF_POSTGRES_WRITER_DB_SSL_KEY: ${MF_POSTGRES_WRITER_DB_SSL_KEY} MF_POSTGRES_WRITER_DB_SSL_ROOT_CERT: ${MF_POSTGRES_WRITER_DB_SSL_ROOT_CERT} + MF_JAEGER_URL: ${MF_JAEGER_URL} ports: - ${MF_POSTGRES_WRITER_PORT}:${MF_POSTGRES_WRITER_PORT} networks: diff --git a/docker/addons/timescale-writer/docker-compose.yml b/docker/addons/timescale-writer/docker-compose.yml index 7c8a21a3..37eeba4e 100644 --- a/docker/addons/timescale-writer/docker-compose.yml +++ b/docker/addons/timescale-writer/docker-compose.yml @@ -50,6 +50,7 @@ services: MF_TIMESCALE_WRITER_DB_SSL_CERT: ${MF_TIMESCALE_WRITER_DB_SSL_CERT} MF_TIMESCALE_WRITER_DB_SSL_KEY: ${MF_TIMESCALE_WRITER_DB_SSL_KEY} MF_TIMESCALE_WRITER_DB_SSL_ROOT_CERT: ${MF_TIMESCALE_WRITER_DB_SSL_ROOT_CERT} + MF_JAEGER_URL: ${MF_JAEGER_URL} ports: - ${MF_TIMESCALE_WRITER_PORT}:${MF_TIMESCALE_WRITER_PORT} networks: diff --git a/http/adapter.go b/http/adapter.go index 22e65e17..b0e13699 100644 --- a/http/adapter.go +++ b/http/adapter.go @@ -44,5 +44,5 @@ func (as *adapterService) Publish(ctx context.Context, token string, msg *messag } msg.Publisher = thid.GetValue() - return as.publisher.Publish(msg.Channel, msg) + return as.publisher.Publish(ctx, msg.Channel, msg) } diff --git a/http/mocks/publisher.go b/http/mocks/publisher.go index 4da4443e..6d07a633 100644 --- a/http/mocks/publisher.go +++ b/http/mocks/publisher.go @@ -4,6 +4,8 @@ package mocks import ( + "context" + "github.com/mainflux/mainflux/pkg/messaging" ) @@ -14,7 +16,7 @@ func NewPublisher() messaging.Publisher { return mockPublisher{} } -func (pub mockPublisher) Publish(topic string, msg *messaging.Message) error { +func (pub mockPublisher) Publish(ctx context.Context, topic string, msg *messaging.Message) error { return nil } diff --git a/http/tracing/adapter.go b/http/tracing/adapter.go new file mode 100644 index 00000000..9aa5d5da --- /dev/null +++ b/http/tracing/adapter.go @@ -0,0 +1,41 @@ +package tracing + +import ( + "context" + + "github.com/mainflux/mainflux/http" + "github.com/mainflux/mainflux/pkg/messaging" + "github.com/opentracing/opentracing-go" +) + +var _ http.Service = (*serviceMiddleware)(nil) + +const publishOP = "publishOP" + +// serviceMiddleware implements the http.Service interface, providing a middleware layer for tracing HTTP requests. +// It creates a new span for each request and sets it as the active span in the OpenTracing context. +type serviceMiddleware struct { + tracer opentracing.Tracer + svc http.Service +} + +// New creates a new instance of the http.Service interface with tracing middleware. +func New(tracer opentracing.Tracer, svc http.Service) http.Service { + return &serviceMiddleware{ + tracer: tracer, + svc: svc, + } +} + +// Publish traces HTTP publish operations. +// It starts a new span as a child of the incoming span (if there is one) and sets it as the active span in the context. +func (sm *serviceMiddleware) Publish(ctx context.Context, token string, msg *messaging.Message) error { + var spanCtx opentracing.SpanContext = nil + if httpSpan := opentracing.SpanFromContext(ctx); httpSpan != nil { + spanCtx = httpSpan.Context() + } + span := sm.tracer.StartSpan(publishOP, opentracing.ChildOf(spanCtx)) + defer span.Finish() + ctx = opentracing.ContextWithSpan(ctx, span) + return sm.svc.Publish(ctx, token, msg) +} diff --git a/lora/README.md b/lora/README.md index f7273984..760a81ee 100644 --- a/lora/README.md +++ b/lora/README.md @@ -28,6 +28,7 @@ default values. | MF_THINGS_ES_PASS | Things service event source password | | | MF_THINGS_ES_DB | Things service event source DB | 0 | | MF_LORA_ADAPTER_EVENT_CONSUMER | Service event consumer name | lora | +| MF_JAEGER_URL | Jaeger server URL | localhost:6831 | ## Deployment diff --git a/lora/adapter.go b/lora/adapter.go index 61ab4009..02a50aad 100644 --- a/lora/adapter.go +++ b/lora/adapter.go @@ -122,7 +122,7 @@ func (as *adapterService) Publish(ctx context.Context, m *Message) error { Created: time.Now().UnixNano(), } - return as.publisher.Publish(msg.Channel, &msg) + return as.publisher.Publish(ctx, msg.Channel, &msg) } func (as *adapterService) CreateThing(ctx context.Context, thingID string, devEUI string) error { diff --git a/lora/mocks/publisher.go b/lora/mocks/publisher.go index 4da4443e..6d07a633 100644 --- a/lora/mocks/publisher.go +++ b/lora/mocks/publisher.go @@ -4,6 +4,8 @@ package mocks import ( + "context" + "github.com/mainflux/mainflux/pkg/messaging" ) @@ -14,7 +16,7 @@ func NewPublisher() messaging.Publisher { return mockPublisher{} } -func (pub mockPublisher) Publish(topic string, msg *messaging.Message) error { +func (pub mockPublisher) Publish(ctx context.Context, topic string, msg *messaging.Message) error { return nil } diff --git a/mqtt/README.md b/mqtt/README.md index 96d9c408..a56e9089 100644 --- a/mqtt/README.md +++ b/mqtt/README.md @@ -25,7 +25,7 @@ default values. | MF_BROKER_URL | Message broker broker URL | nats://127.0.0.1:4222 | | MF_THINGS_AUTH_GRPC_URL | Things gRPC endpoint URL | localhost:7000 | | MF_THINGS_AUTH_GRPC_TIMEOUT | Timeout in seconds for Things service gRPC calls | 1s | -| MF_JAEGER_URL | URL of Jaeger tracing service | "" | +| MF_JAEGER_URL | URL of Jaeger tracing service | "localhost:6831" | | MF_MQTT_ADAPTER_CLIENT_TLS | gRPC client TLS | false | | MF_MQTT_ADAPTER_CA_CERTS | CA certs for gRPC client TLS | "" | | MF_MQTT_ADAPTER_INSTANCE | Instance name for event sourcing | "" | diff --git a/mqtt/forwarder.go b/mqtt/forwarder.go index 0f5ab766..6ad39061 100644 --- a/mqtt/forwarder.go +++ b/mqtt/forwarder.go @@ -4,6 +4,7 @@ package mqtt import ( + "context" "fmt" "strings" @@ -20,7 +21,7 @@ const ( type Forwarder interface { // Forward subscribes to the Subscriber and // publishes messages using provided Publisher. - Forward(id string, sub messaging.Subscriber, pub messaging.Publisher) error + Forward(ctx context.Context, id string, sub messaging.Subscriber, pub messaging.Publisher) error } type forwarder struct { @@ -36,11 +37,11 @@ func NewForwarder(topic string, logger log.Logger) Forwarder { } } -func (f forwarder) Forward(id string, sub messaging.Subscriber, pub messaging.Publisher) error { - return sub.Subscribe(id, f.topic, handle(pub, f.logger)) +func (f forwarder) Forward(ctx context.Context, id string, sub messaging.Subscriber, pub messaging.Publisher) error { + return sub.Subscribe(ctx, id, f.topic, handle(ctx, pub, f.logger)) } -func handle(pub messaging.Publisher, logger log.Logger) handleFunc { +func handle(ctx context.Context, pub messaging.Publisher, logger log.Logger) handleFunc { return func(msg *messaging.Message) error { if msg.Protocol == protocol { return nil @@ -52,7 +53,7 @@ func handle(pub messaging.Publisher, logger log.Logger) handleFunc { topic += "/" + strings.ReplaceAll(msg.Subtopic, ".", "/") } go func() { - if err := pub.Publish(topic, msg); err != nil { + if err := pub.Publish(ctx, topic, msg); err != nil { logger.Warn(fmt.Sprintf("Failed to forward message: %s", err)) } }() diff --git a/mqtt/handler.go b/mqtt/handler.go index 18b8615b..864d5f8e 100644 --- a/mqtt/handler.go +++ b/mqtt/handler.go @@ -174,7 +174,7 @@ func (h *handler) Publish(c *session.Client, topic *string, payload *[]byte) { } for _, pub := range h.publishers { - if err := pub.Publish(msg.Channel, &msg); err != nil { + if err := pub.Publish(context.Background(), msg.Channel, &msg); err != nil { h.logger.Error(LogErrFailedPublishToMsgBroker + err.Error()) } } diff --git a/mqtt/mocks/publisher.go b/mqtt/mocks/publisher.go index 07812289..ec337a45 100644 --- a/mqtt/mocks/publisher.go +++ b/mqtt/mocks/publisher.go @@ -1,6 +1,10 @@ package mocks -import "github.com/mainflux/mainflux/pkg/messaging" +import ( + "context" + + "github.com/mainflux/mainflux/pkg/messaging" +) type MockPublisher struct{} @@ -9,7 +13,7 @@ func NewPublisher() messaging.Publisher { return MockPublisher{} } -func (pub MockPublisher) Publish(topic string, msg *messaging.Message) error { +func (pub MockPublisher) Publish(ctx context.Context, topic string, msg *messaging.Message) error { return nil } diff --git a/opcua/README.md b/opcua/README.md index f0c6d641..a51a14bc 100644 --- a/opcua/README.md +++ b/opcua/README.md @@ -28,6 +28,7 @@ default values. | MF_THINGS_ES_PASS | Things service event source password | | | MF_THINGS_ES_DB | Things service event source DB | 0 | | MF_OPCUA_ADAPTER_EVENT_CONSUMER | Service event consumer name | opcua | +| MF_JAEGER_URL | Jaeger server URL | localhost:6831 | ## Deployment diff --git a/opcua/gopcua/subscribe.go b/opcua/gopcua/subscribe.go index 1897c4f9..0e7b6000 100644 --- a/opcua/gopcua/subscribe.go +++ b/opcua/gopcua/subscribe.go @@ -236,7 +236,7 @@ func (c client) publish(ctx context.Context, token string, m message) error { Created: time.Now().UnixNano(), } - if err := c.publisher.Publish(msg.Channel, &msg); err != nil { + if err := c.publisher.Publish(ctx, msg.Channel, &msg); err != nil { return err } diff --git a/pkg/messaging/mqtt/publisher.go b/pkg/messaging/mqtt/publisher.go index 562e37ca..12445c41 100644 --- a/pkg/messaging/mqtt/publisher.go +++ b/pkg/messaging/mqtt/publisher.go @@ -4,6 +4,7 @@ package mqtt import ( + "context" "errors" "time" @@ -35,7 +36,7 @@ func NewPublisher(address string, timeout time.Duration) (messaging.Publisher, e return ret, nil } -func (pub publisher) Publish(topic string, msg *messaging.Message) error { +func (pub publisher) Publish(ctx context.Context, topic string, msg *messaging.Message) error { if topic == "" { return ErrEmptyTopic } diff --git a/pkg/messaging/mqtt/pubsub.go b/pkg/messaging/mqtt/pubsub.go index 642a6201..8b4df41c 100644 --- a/pkg/messaging/mqtt/pubsub.go +++ b/pkg/messaging/mqtt/pubsub.go @@ -4,6 +4,7 @@ package mqtt import ( + "context" "errors" "fmt" "sync" @@ -79,7 +80,7 @@ func NewPubSub(url, queue string, timeout time.Duration, logger log.Logger) (mes return ret, nil } -func (ps *pubsub) Subscribe(id, topic string, handler messaging.MessageHandler) error { +func (ps *pubsub) Subscribe(ctx context.Context, id, topic string, handler messaging.MessageHandler) error { if id == "" { return ErrEmptyID } @@ -122,7 +123,7 @@ func (ps *pubsub) Subscribe(id, topic string, handler messaging.MessageHandler) return token.Error() } -func (ps *pubsub) Unsubscribe(id, topic string) error { +func (ps *pubsub) Unsubscribe(ctx context.Context, id, topic string) error { if id == "" { return ErrEmptyID } diff --git a/pkg/messaging/mqtt/pubsub_test.go b/pkg/messaging/mqtt/pubsub_test.go index 07c86dfb..5c9c7bda 100644 --- a/pkg/messaging/mqtt/pubsub_test.go +++ b/pkg/messaging/mqtt/pubsub_test.go @@ -4,6 +4,7 @@ package mqtt_test import ( + "context" "errors" "fmt" "testing" @@ -63,7 +64,7 @@ func TestPublisher(t *testing.T) { }) // Test publish with an empty topic. - err = pubsub.Publish("", &messaging.Message{Payload: data}) + err = pubsub.Publish(context.TODO(), "", &messaging.Message{Payload: data}) assert.Equal(t, err, mqtt_pubsub.ErrEmptyTopic, fmt.Sprintf("Publish with empty topic: expected: %s, got: %s", mqtt_pubsub.ErrEmptyTopic, err)) cases := []struct { @@ -104,7 +105,8 @@ func TestPublisher(t *testing.T) { Subtopic: tc.subtopic, Payload: tc.payload, } - err := pubsub.Publish(topic, &expectedMsg) + + err := pubsub.Publish(context.TODO(), topic, &expectedMsg) assert.Nil(t, err, fmt.Sprintf("%s: got unexpected error: %s\n", tc.desc, err)) data, err := proto.Marshal(&expectedMsg) @@ -185,7 +187,7 @@ func TestSubscribe(t *testing.T) { }, } for _, tc := range cases { - err = pubsub.Subscribe(tc.clientID, tc.topic, tc.handler) + err = pubsub.Subscribe(context.TODO(), tc.clientID, tc.topic, tc.handler) assert.Equal(t, err, tc.err, fmt.Sprintf("%s: expected: %s, but got: %s", tc.desc, err, tc.err)) if tc.err == nil { @@ -260,7 +262,7 @@ func TestPubSub(t *testing.T) { }, } for _, tc := range cases { - err := pubsub.Subscribe(tc.clientID, tc.topic, tc.handler) + err := pubsub.Subscribe(context.TODO(), tc.clientID, tc.topic, tc.handler) assert.Equal(t, err, tc.err, fmt.Sprintf("%s: expected: %s, but got: %s", tc.desc, err, tc.err)) if tc.err == nil { @@ -273,7 +275,7 @@ func TestPubSub(t *testing.T) { } // Publish message, and then receive it on message channel. - err := pubsub.Publish(topic, &expectedMsg) + err := pubsub.Publish(context.TODO(), topic, &expectedMsg) assert.Nil(t, err, fmt.Sprintf("%s: got unexpected error: %s\n", tc.desc, err)) receivedMsg := <-msgChan @@ -422,10 +424,10 @@ func TestUnsubscribe(t *testing.T) { for _, tc := range cases { switch tc.subscribe { case true: - err := pubsub.Subscribe(tc.clientID, tc.topic, tc.handler) + err := pubsub.Subscribe(context.TODO(), tc.clientID, tc.topic, tc.handler) assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected: %s, but got: %s", tc.desc, tc.err, err)) default: - err := pubsub.Unsubscribe(tc.clientID, tc.topic) + err := pubsub.Unsubscribe(context.TODO(), tc.clientID, tc.topic) assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected: %s, but got: %s", tc.desc, tc.err, err)) } } diff --git a/pkg/messaging/nats/publisher.go b/pkg/messaging/nats/publisher.go index bb3b6fd3..b07eb8c5 100644 --- a/pkg/messaging/nats/publisher.go +++ b/pkg/messaging/nats/publisher.go @@ -4,6 +4,7 @@ package nats import ( + "context" "fmt" "github.com/mainflux/mainflux/pkg/messaging" @@ -37,10 +38,11 @@ func NewPublisher(url string) (messaging.Publisher, error) { return ret, nil } -func (pub *publisher) Publish(topic string, msg *messaging.Message) error { +func (pub *publisher) Publish(ctx context.Context, topic string, msg *messaging.Message) error { if topic == "" { return ErrEmptyTopic } + data, err := proto.Marshal(msg) if err != nil { return err @@ -50,6 +52,7 @@ func (pub *publisher) Publish(topic string, msg *messaging.Message) error { if msg.Subtopic != "" { subject = fmt.Sprintf("%s.%s", subject, msg.Subtopic) } + if err := pub.conn.Publish(subject, data); err != nil { return err } diff --git a/pkg/messaging/nats/pubsub.go b/pkg/messaging/nats/pubsub.go index e04d1243..ebfce5a7 100644 --- a/pkg/messaging/nats/pubsub.go +++ b/pkg/messaging/nats/pubsub.go @@ -4,6 +4,7 @@ package nats import ( + "context" "errors" "fmt" "sync" @@ -63,7 +64,7 @@ func NewPubSub(url, queue string, logger log.Logger) (messaging.PubSub, error) { return ret, nil } -func (ps *pubsub) Subscribe(id, topic string, handler messaging.MessageHandler) error { +func (ps *pubsub) Subscribe(ctx context.Context, id, topic string, handler messaging.MessageHandler) error { if id == "" { return ErrEmptyID } @@ -79,7 +80,7 @@ func (ps *pubsub) Subscribe(id, topic string, handler messaging.MessageHandler) if _, ok := s[id]; ok { // Unlocking, so that Unsubscribe() can access ps.subscriptions ps.mu.Unlock() - if err := ps.Unsubscribe(id, topic); err != nil { + if err := ps.Unsubscribe(ctx, id, topic); err != nil { return err } @@ -119,7 +120,7 @@ func (ps *pubsub) Subscribe(id, topic string, handler messaging.MessageHandler) return nil } -func (ps *pubsub) Unsubscribe(id, topic string) error { +func (ps *pubsub) Unsubscribe(ctx context.Context, id, topic string) error { if id == "" { return ErrEmptyID } diff --git a/pkg/messaging/nats/pubsub_test.go b/pkg/messaging/nats/pubsub_test.go index 8e8c0b41..abc4ece5 100644 --- a/pkg/messaging/nats/pubsub_test.go +++ b/pkg/messaging/nats/pubsub_test.go @@ -4,6 +4,7 @@ package nats_test import ( + "context" "errors" "fmt" "testing" @@ -28,9 +29,9 @@ var ( ) func TestPublisher(t *testing.T) { - err := pubsub.Subscribe(clientID, fmt.Sprintf("%s.%s", chansPrefix, topic), handler{}) + err := pubsub.Subscribe(context.TODO(), clientID, fmt.Sprintf("%s.%s", chansPrefix, topic), handler{}) assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) - err = pubsub.Subscribe(clientID, fmt.Sprintf("%s.%s.%s", chansPrefix, topic, subtopic), handler{}) + err = pubsub.Subscribe(context.TODO(), clientID, fmt.Sprintf("%s.%s.%s", chansPrefix, topic, subtopic), handler{}) assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) cases := []struct { @@ -73,7 +74,7 @@ func TestPublisher(t *testing.T) { } assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) - err = pubsub.Publish(topic, &expectedMsg) + err = pubsub.Publish(context.TODO(), topic, &expectedMsg) assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) receivedMsg := <-msgChan @@ -260,14 +261,14 @@ func TestPubsub(t *testing.T) { for _, pc := range subcases { if pc.pubsub == true { - err := pubsub.Subscribe(pc.clientID, pc.topic, pc.handler) + err := pubsub.Subscribe(context.TODO(), pc.clientID, pc.topic, pc.handler) if pc.errorMessage == nil { assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) } else { assert.Equal(t, err, pc.errorMessage) } } else { - err := pubsub.Unsubscribe(pc.clientID, pc.topic) + err := pubsub.Unsubscribe(context.TODO(), pc.clientID, pc.topic) if pc.errorMessage == nil { assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err)) } else { diff --git a/pkg/messaging/pubsub.go b/pkg/messaging/pubsub.go index 4cc569ac..7e4a51dd 100644 --- a/pkg/messaging/pubsub.go +++ b/pkg/messaging/pubsub.go @@ -3,10 +3,12 @@ package messaging +import "context" + // Publisher specifies message publishing API. type Publisher interface { // Publishes message to the stream. - Publish(topic string, msg *Message) error + Publish(ctx context.Context, topic string, msg *Message) error // Close gracefully closes message publisher's connection. Close() error @@ -24,11 +26,11 @@ type MessageHandler interface { // Subscriber specifies message subscription API. type Subscriber interface { // Subscribe subscribes to the message stream and consumes messages. - Subscribe(id, topic string, handler MessageHandler) error + Subscribe(ctx context.Context, id, topic string, handler MessageHandler) error // Unsubscribe unsubscribes from the message stream and // stops consuming messages. - Unsubscribe(id, topic string) error + Unsubscribe(ctx context.Context, id, topic string) error // Close gracefully closes message subscriber's connection. Close() error diff --git a/pkg/messaging/rabbitmq/publisher.go b/pkg/messaging/rabbitmq/publisher.go index d49d41ee..cab8ca50 100644 --- a/pkg/messaging/rabbitmq/publisher.go +++ b/pkg/messaging/rabbitmq/publisher.go @@ -41,7 +41,7 @@ func NewPublisher(url string) (messaging.Publisher, error) { return ret, nil } -func (pub *publisher) Publish(topic string, msg *messaging.Message) error { +func (pub *publisher) Publish(ctx context.Context, topic string, msg *messaging.Message) error { if topic == "" { return ErrEmptyTopic } diff --git a/pkg/messaging/rabbitmq/pubsub.go b/pkg/messaging/rabbitmq/pubsub.go index 87c007c3..69c8d5e7 100644 --- a/pkg/messaging/rabbitmq/pubsub.go +++ b/pkg/messaging/rabbitmq/pubsub.go @@ -4,6 +4,7 @@ package rabbitmq import ( + "context" "errors" "fmt" "sync" @@ -67,7 +68,7 @@ func NewPubSub(url, queue string, logger log.Logger) (messaging.PubSub, error) { return ret, nil } -func (ps *pubsub) Subscribe(id, topic string, handler messaging.MessageHandler) error { +func (ps *pubsub) Subscribe(ctx context.Context, id, topic string, handler messaging.MessageHandler) error { if id == "" { return ErrEmptyID } @@ -84,7 +85,7 @@ func (ps *pubsub) Subscribe(id, topic string, handler messaging.MessageHandler) if _, ok := s[id]; ok { // Unlocking, so that Unsubscribe() can access ps.subscriptions ps.mu.Unlock() - if err := ps.Unsubscribe(id, topic); err != nil { + if err := ps.Unsubscribe(ctx, id, topic); err != nil { return err } @@ -123,7 +124,7 @@ func (ps *pubsub) Subscribe(id, topic string, handler messaging.MessageHandler) return nil } -func (ps *pubsub) Unsubscribe(id, topic string) error { +func (ps *pubsub) Unsubscribe(ctx context.Context, id, topic string) error { if id == "" { return ErrEmptyID } diff --git a/pkg/messaging/rabbitmq/pubsub_test.go b/pkg/messaging/rabbitmq/pubsub_test.go index 3c5df23c..c7788d78 100644 --- a/pkg/messaging/rabbitmq/pubsub_test.go +++ b/pkg/messaging/rabbitmq/pubsub_test.go @@ -87,7 +87,7 @@ func TestPublisher(t *testing.T) { Subtopic: tc.subtopic, Payload: tc.payload, } - err = pubsub.Publish(topic, &expectedMsg) + err = pubsub.Publish(context.TODO(), topic, &expectedMsg) assert.Nil(t, err, fmt.Sprintf("%s: got unexpected error: %s", tc.desc, err)) receivedMsg := <-msgChan @@ -168,7 +168,7 @@ func TestSubscribe(t *testing.T) { }, } for _, tc := range cases { - err := pubsub.Subscribe(tc.clientID, tc.topic, tc.handler) + err := pubsub.Subscribe(context.TODO(), tc.clientID, tc.topic, tc.handler) assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected: %s, but got: %s", tc.desc, tc.err, err)) if tc.err == nil { @@ -342,10 +342,10 @@ func TestUnsubscribe(t *testing.T) { for _, tc := range cases { switch tc.subscribe { case true: - err := pubsub.Subscribe(tc.clientID, tc.topic, tc.handler) + err := pubsub.Subscribe(context.TODO(), tc.clientID, tc.topic, tc.handler) assert.Equal(t, err, tc.err, fmt.Sprintf("%s: expected: %s, but got: %s", tc.desc, tc.err, err)) default: - err := pubsub.Unsubscribe(tc.clientID, tc.topic) + err := pubsub.Unsubscribe(context.TODO(), tc.clientID, tc.topic) assert.Equal(t, err, tc.err, fmt.Sprintf("%s: expected: %s, but got: %s", tc.desc, tc.err, err)) } } @@ -400,7 +400,7 @@ func TestPubSub(t *testing.T) { if tc.topic != "" { subject = fmt.Sprintf("%s.%s", chansPrefix, tc.topic) } - err := pubsub.Subscribe(tc.clientID, subject, tc.handler) + err := pubsub.Subscribe(context.TODO(), tc.clientID, subject, tc.handler) switch tc.err { case nil: @@ -412,7 +412,7 @@ func TestPubSub(t *testing.T) { Payload: data, } - err = pubsub.Publish(tc.topic, &expectedMsg) + err = pubsub.Publish(context.TODO(), tc.topic, &expectedMsg) assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", tc.desc, err)) receivedMsg := <-msgChan @@ -423,7 +423,7 @@ func TestPubSub(t *testing.T) { assert.Equal(t, expectedMsg.Subtopic, receivedMsg.Subtopic, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg)) assert.Equal(t, expectedMsg.Payload, receivedMsg.Payload, fmt.Sprintf("%s: expected %+v got %+v\n", tc.desc, &expectedMsg, receivedMsg)) - err = pubsub.Unsubscribe(tc.clientID, fmt.Sprintf("%s.%s", chansPrefix, tc.topic)) + err = pubsub.Unsubscribe(context.TODO(), tc.clientID, fmt.Sprintf("%s.%s", chansPrefix, tc.topic)) assert.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", tc.desc, err)) default: assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected: %s, but got: %s", tc.desc, err, tc.err)) diff --git a/pkg/messaging/tracing/publisher.go b/pkg/messaging/tracing/publisher.go new file mode 100644 index 00000000..23c58882 --- /dev/null +++ b/pkg/messaging/tracing/publisher.go @@ -0,0 +1,54 @@ +package tracing + +import ( + "context" + + "github.com/mainflux/mainflux/pkg/messaging" + "github.com/opentracing/opentracing-go" +) + +// traced ops. +const publishOP = "publish_op" + +var _ messaging.Publisher = (*publisherMiddleware)(nil) + +type publisherMiddleware struct { + publisher messaging.Publisher + tracer opentracing.Tracer +} + +// New creates new messaging publisher tracing middleware. +func New(tracer opentracing.Tracer, publisher messaging.Publisher) messaging.Publisher { + return &publisherMiddleware{ + publisher: publisher, + tracer: tracer, + } +} + +// Publish traces nats publish operations. +func (pm *publisherMiddleware) Publish(ctx context.Context, topic string, msg *messaging.Message) error { + span := createSpan(ctx, publishOP, topic, msg.Subtopic, msg.Publisher, pm.tracer) + defer span.Finish() + ctx = opentracing.ContextWithSpan(ctx, span) + return pm.publisher.Publish(ctx, topic, msg) +} + +// Close nats trace publisher middleware +func (pm *publisherMiddleware) Close() error { + return pm.publisher.Close() +} + +func createSpan(ctx context.Context, operation, topic, subTopic, thingID string, tracer opentracing.Tracer) opentracing.Span { + span, _ := opentracing.StartSpanFromContextWithTracer(ctx, tracer, operation) + switch operation { + case publishOP: + span.SetTag("publisher", thingID) + default: + span.SetTag("subscriber", thingID) + } + span.SetTag("topic", topic) + if subTopic != "" { + span.SetTag("sub-topic", subTopic) + } + return span +} diff --git a/pkg/messaging/tracing/pubsub.go b/pkg/messaging/tracing/pubsub.go new file mode 100644 index 00000000..c501b685 --- /dev/null +++ b/pkg/messaging/tracing/pubsub.go @@ -0,0 +1,76 @@ +package tracing + +import ( + "context" + + "github.com/mainflux/mainflux/pkg/messaging" + "github.com/opentracing/opentracing-go" +) + +// Constants to define different operations to be traced. +const ( + subscribeOP = "subscribe_op" + unsubscribeOp = "unsubscribe_op" + handleOp = "handle_op" +) + +var _ messaging.PubSub = (*pubsubMiddleware)(nil) + +type pubsubMiddleware struct { + publisherMiddleware + pubsub messaging.PubSub + tracer opentracing.Tracer +} + +// NewPubSub creates a new pubsub middleware that traces pubsub operations. +func NewPubSub(tracer opentracing.Tracer, pubsub messaging.PubSub) messaging.PubSub { + return &pubsubMiddleware{ + publisherMiddleware: publisherMiddleware{ + publisher: pubsub, + tracer: tracer, + }, + pubsub: pubsub, + tracer: tracer, + } +} + +// Subscribe creates a new subscription and traces the operation. +func (pm *pubsubMiddleware) Subscribe(ctx context.Context, id string, topic string, handler messaging.MessageHandler) error { + span := createSpan(ctx, subscribeOP, topic, "", id, pm.tracer) + defer span.Finish() + ctx = opentracing.ContextWithSpan(ctx, span) + h := &traceHandler{ + handler: handler, + tracer: pm.tracer, + ctx: ctx, + } + return pm.pubsub.Subscribe(ctx, id, topic, h) +} + +// Unsubscribe removes an existing subscription and traces the operation. +func (pm *pubsubMiddleware) Unsubscribe(ctx context.Context, id string, topic string) error { + span := createSpan(ctx, unsubscribeOp, topic, "", id, pm.tracer) + defer span.Finish() + ctx = opentracing.ContextWithSpan(ctx, span) + return pm.pubsub.Unsubscribe(ctx, id, topic) +} + +// traceHandler is used to trace the message handling operation +type traceHandler struct { + handler messaging.MessageHandler + tracer opentracing.Tracer + ctx context.Context + topic string +} + +// Handle instruments the message handling operation +func (h *traceHandler) Handle(msg *messaging.Message) error { + span := createSpan(h.ctx, handleOp, h.topic, msg.Subtopic, msg.Publisher, h.tracer) + defer span.Finish() + return h.handler.Handle(msg) +} + +// Cancel cancels the message handling operation +func (h *traceHandler) Cancel() error { + return h.handler.Cancel() +} diff --git a/twins/README.md b/twins/README.md index a67891d9..9cf4a38a 100644 --- a/twins/README.md +++ b/twins/README.md @@ -18,7 +18,7 @@ default values. | MF_TWINS_HTTP_PORT | Twins service HTTP port | 9018 | | MF_TWINS_SERVER_CERT | Path to server certificate in PEM format | | | MF_TWINS_SERVER_KEY | Path to server key in PEM format | | -| MF_JAEGER_URL | Jaeger server URL | | +| MF_JAEGER_URL | Jaeger server URL | localhost:6831 | | MF_TWINS_DB | Database name | mainflux | | MF_TWINS_DB_HOST | Database host address | localhost | | MF_TWINS_DB_PORT | Database host port | 27017 | diff --git a/twins/mocks/messages.go b/twins/mocks/messages.go index e7baf7f3..01e9ec90 100644 --- a/twins/mocks/messages.go +++ b/twins/mocks/messages.go @@ -4,6 +4,8 @@ package mocks import ( + "context" + "github.com/mainflux/mainflux/pkg/errors" "github.com/mainflux/mainflux/pkg/messaging" ) @@ -21,7 +23,7 @@ func NewBroker(sub map[string]string) messaging.Publisher { } } -func (mb mockBroker) Publish(topic string, msg *messaging.Message) error { +func (mb mockBroker) Publish(ctx context.Context, topic string, msg *messaging.Message) error { if len(msg.Payload) == 0 { return errors.New("failed to publish") } diff --git a/twins/service.go b/twins/service.go index ce04a314..ff374d57 100644 --- a/twins/service.go +++ b/twins/service.go @@ -102,7 +102,7 @@ func New(publisher messaging.Publisher, auth mainflux.AuthServiceClient, twins T func (ts *twinsService) AddTwin(ctx context.Context, token string, twin Twin, def Definition) (tw Twin, err error) { var id string var b []byte - defer ts.publish(&id, &err, crudOp["createSucc"], crudOp["createFail"], &b) + defer ts.publish(ctx, &id, &err, crudOp["createSucc"], crudOp["createFail"], &b) res, err := ts.auth.Identify(ctx, &mainflux.Token{Value: token}) if err != nil { @@ -145,7 +145,7 @@ func (ts *twinsService) AddTwin(ctx context.Context, token string, twin Twin, de func (ts *twinsService) UpdateTwin(ctx context.Context, token string, twin Twin, def Definition) (err error) { var b []byte var id string - defer ts.publish(&id, &err, crudOp["updateSucc"], crudOp["updateFail"], &b) + defer ts.publish(ctx, &id, &err, crudOp["updateSucc"], crudOp["updateFail"], &b) _, err = ts.auth.Identify(ctx, &mainflux.Token{Value: token}) if err != nil { @@ -195,7 +195,7 @@ func (ts *twinsService) UpdateTwin(ctx context.Context, token string, twin Twin, func (ts *twinsService) ViewTwin(ctx context.Context, token, twinID string) (tw Twin, err error) { var b []byte - defer ts.publish(&twinID, &err, crudOp["getSucc"], crudOp["getFail"], &b) + defer ts.publish(ctx, &twinID, &err, crudOp["getSucc"], crudOp["getFail"], &b) _, err = ts.auth.Identify(ctx, &mainflux.Token{Value: token}) if err != nil { @@ -214,7 +214,7 @@ func (ts *twinsService) ViewTwin(ctx context.Context, token, twinID string) (tw func (ts *twinsService) RemoveTwin(ctx context.Context, token, twinID string) (err error) { var b []byte - defer ts.publish(&twinID, &err, crudOp["removeSucc"], crudOp["removeFail"], &b) + defer ts.publish(ctx, &twinID, &err, crudOp["removeSucc"], crudOp["removeFail"], &b) _, err = ts.auth.Identify(ctx, &mainflux.Token{Value: token}) if err != nil { @@ -280,9 +280,9 @@ func (ts *twinsService) SaveStates(msg *messaging.Message) error { func (ts *twinsService) saveState(msg *messaging.Message, twinID string) error { var b []byte var err error - defer ts.publish(&twinID, &err, crudOp["stateSucc"], crudOp["stateFail"], &b) - ctx := context.TODO() + defer ts.publish(ctx, &twinID, &err, crudOp["stateSucc"], crudOp["stateFail"], &b) + tw, err := ts.twins.RetrieveByID(ctx, twinID) if err != nil { return fmt.Errorf("Retrieving twin for %s failed: %s", msg.Publisher, err) @@ -396,7 +396,7 @@ func findAttribute(name string, attrs []Attribute) (idx int) { return -1 } -func (ts *twinsService) publish(twinID *string, err *error, succOp, failOp string, payload *[]byte) { +func (ts *twinsService) publish(ctx context.Context, twinID *string, err *error, succOp, failOp string, payload *[]byte) { if ts.channelID == "" { return } @@ -421,7 +421,7 @@ func (ts *twinsService) publish(twinID *string, err *error, succOp, failOp strin Created: time.Now().UnixNano(), } - if err := ts.publisher.Publish(msg.Channel, &msg); err != nil { + if err := ts.publisher.Publish(ctx, msg.Channel, &msg); err != nil { ts.logger.Warn(fmt.Sprintf("Failed to publish notification on Message Broker: %s", err)) } } diff --git a/users/tracing/users.go b/users/tracing/users.go index a33e9977..2036b6a5 100644 --- a/users/tracing/users.go +++ b/users/tracing/users.go @@ -28,7 +28,7 @@ type userRepositoryMiddleware struct { // UserRepositoryMiddleware tracks request and their latency, and adds spans // to context. -func UserRepositoryMiddleware(repo users.UserRepository, tracer opentracing.Tracer) users.UserRepository { +func UserRepositoryMiddleware(tracer opentracing.Tracer, repo users.UserRepository) users.UserRepository { return userRepositoryMiddleware{ tracer: tracer, repo: repo, diff --git a/ws/adapter.go b/ws/adapter.go index 32589bc0..fe706f1c 100644 --- a/ws/adapter.go +++ b/ws/adapter.go @@ -85,7 +85,7 @@ func (svc *adapterService) Publish(ctx context.Context, thingKey string, msg *me msg.Publisher = thid.GetValue() - if err := svc.pubsub.Publish(msg.GetChannel(), msg); err != nil { + if err := svc.pubsub.Publish(ctx, msg.GetChannel(), msg); err != nil { return ErrFailedMessagePublish } @@ -110,7 +110,7 @@ func (svc *adapterService) Subscribe(ctx context.Context, thingKey, chanID, subt subject = fmt.Sprintf("%s.%s", subject, subtopic) } - if err := svc.pubsub.Subscribe(thid.GetValue(), subject, c); err != nil { + if err := svc.pubsub.Subscribe(ctx, thid.GetValue(), subject, c); err != nil { return ErrFailedSubscription } @@ -133,7 +133,7 @@ func (svc *adapterService) Unsubscribe(ctx context.Context, thingKey, chanID, su subject = fmt.Sprintf("%s.%s", subject, subtopic) } - return svc.pubsub.Unsubscribe(thid.GetValue(), subject) + return svc.pubsub.Unsubscribe(ctx, thid.GetValue(), subject) } func (svc *adapterService) authorize(ctx context.Context, thingKey, chanID string) (*mainflux.ThingID, error) { diff --git a/ws/mocks/pubsub.go b/ws/mocks/pubsub.go index 45d4dbc9..bc9d60d4 100644 --- a/ws/mocks/pubsub.go +++ b/ws/mocks/pubsub.go @@ -4,6 +4,7 @@ package mocks import ( + "context" "encoding/json" "fmt" @@ -15,9 +16,9 @@ import ( var _ messaging.PubSub = (*mockPubSub)(nil) type MockPubSub interface { - Publish(string, *messaging.Message) error - Subscribe(string, string, messaging.MessageHandler) error - Unsubscribe(string, string) error + Publish(context.Context, string, *messaging.Message) error + Subscribe(context.Context, string, string, messaging.MessageHandler) error + Unsubscribe(context.Context, string, string) error SetFail(bool) SetConn(*websocket.Conn) Close() error @@ -32,7 +33,7 @@ type mockPubSub struct { func NewPubSub() MockPubSub { return &mockPubSub{false, nil} } -func (pubsub *mockPubSub) Publish(s string, msg *messaging.Message) error { +func (pubsub *mockPubSub) Publish(ctx context.Context, s string, msg *messaging.Message) error { if pubsub.conn != nil { data, err := json.Marshal(msg) if err != nil { @@ -47,14 +48,14 @@ func (pubsub *mockPubSub) Publish(s string, msg *messaging.Message) error { return nil } -func (pubsub *mockPubSub) Subscribe(string, string, messaging.MessageHandler) error { +func (pubsub *mockPubSub) Subscribe(context.Context, string, string, messaging.MessageHandler) error { if pubsub.fail { return ws.ErrFailedSubscription } return nil } -func (pubsub *mockPubSub) Unsubscribe(string, string) error { +func (pubsub *mockPubSub) Unsubscribe(context.Context, string, string) error { if pubsub.fail { return ws.ErrFailedUnsubscribe } diff --git a/ws/tracing/tracing.go b/ws/tracing/tracing.go new file mode 100644 index 00000000..929f5bf4 --- /dev/null +++ b/ws/tracing/tracing.go @@ -0,0 +1,65 @@ +package tracing + +import ( + "context" + + "github.com/mainflux/mainflux/pkg/messaging" + "github.com/mainflux/mainflux/ws" + "github.com/opentracing/opentracing-go" +) + +var _ ws.Service = (*tracingMiddleware)(nil) + +const ( + publishOP = "publish_op" + subscribeOP = "subscribe_op" + unsubscribeOP = "unsubscribe_op" +) + +type tracingMiddleware struct { + tracer opentracing.Tracer + svc ws.Service +} + +// New returns a new ws.Service that traces incoming requests using the given tracer. +func New(tracer opentracing.Tracer, svc ws.Service) ws.Service { + return &tracingMiddleware{ + tracer: tracer, + svc: svc, + } +} + +// Publish traces the "Publish" operation of the wrapped ws.Service. +func (tm *tracingMiddleware) Publish(ctx context.Context, thingKey string, msg *messaging.Message) error { + span := tm.createSpan(ctx, publishOP) + defer span.Finish() + ctx = opentracing.ContextWithSpan(ctx, span) + return tm.svc.Publish(ctx, thingKey, msg) +} + +// Subscribe traces the "Subscribe" operation of the wrapped ws.Service. +func (tm *tracingMiddleware) Subscribe(ctx context.Context, thingKey string, chanID string, subtopic string, client *ws.Client) error { + span := tm.createSpan(ctx, subscribeOP) + defer span.Finish() + ctx = opentracing.ContextWithSpan(ctx, span) + return tm.svc.Subscribe(ctx, thingKey, chanID, subtopic, client) +} + +// Unsubscribe traces the "Unsubscribe" operation of the wrapped ws.Service. +func (tm *tracingMiddleware) Unsubscribe(ctx context.Context, thingKey string, chanID string, subtopic string) error { + span := tm.createSpan(ctx, unsubscribeOP) + defer span.Finish() + ctx = opentracing.ContextWithSpan(ctx, span) + return tm.svc.Unsubscribe(ctx, thingKey, chanID, subtopic) +} + +// createSpan creates a new tracing span using the given context and operation name. +func (tm *tracingMiddleware) createSpan(ctx context.Context, opName string) opentracing.Span { + if parentSpan := opentracing.SpanFromContext(ctx); parentSpan != nil { + return tm.tracer.StartSpan( + opName, + opentracing.ChildOf(parentSpan.Context()), + ) + } + return tm.tracer.StartSpan(opName) +}