mirror of
https://github.com/mainflux/mainflux.git
synced 2025-04-29 13:49:28 +08:00

* send and receive span context Signed-off-by: SammyOina <sammyoina@gmail.com> * initilize tracer in cmd Signed-off-by: SammyOina <sammyoina@gmail.com> * add span context to mainflux message Signed-off-by: SammyOina <sammyoina@gmail.com> * propagate span context from adapters Signed-off-by: SammyOina <sammyoina@gmail.com> * fix failing build Signed-off-by: SammyOina <sammyoina@gmail.com> * fix failing build Signed-off-by: SammyOina <sammyoina@gmail.com> * fix linting error Signed-off-by: SammyOina <sammyoina@gmail.com> * add tracing middleware to coap service Signed-off-by: SammyOina <sammyoina@gmail.com> * add tracing middleware to http service Signed-off-by: SammyOina <sammyoina@gmail.com> * add tracing middleware to mqtt service Signed-off-by: SammyOina <sammyoina@gmail.com> * add tracing middleware to broker Signed-off-by: SammyOina <sammyoina@gmail.com> * add tracing middleware to ws service Signed-off-by: SammyOina <sammyoina@gmail.com> * add tracing to services Signed-off-by: SammyOina <sammyoina@gmail.com> * fix naming convention of functions Signed-off-by: SammyOina <sammyoina@gmail.com> * fix failing build Signed-off-by: SammyOina <sammyoina@gmail.com> * remove tracer from mqtt svc Signed-off-by: SammyOina <sammyoina@gmail.com> * format code and removed unused code Signed-off-by: SammyOina <sammyoina@gmail.com> * rename tracing files Signed-off-by: SammyOina <sammyoina@gmail.com> * - rename nats tracer - intialize tracing at the broker Signed-off-by: SammyOina <sammyoina@gmail.com> * check for nil spans Signed-off-by: SammyOina <sammyoina@gmail.com> * propagate spans to consumers writers Signed-off-by: SammyOina <sammyoina@gmail.com> * remove unused variable Signed-off-by: SammyOina <sammyoina@gmail.com> * propagate traces to notifiers Signed-off-by: SammyOina <sammyoina@gmail.com> * remove comments Signed-off-by: SammyOina <sammyoina@gmail.com> * remove span from message Signed-off-by: SammyOina <sammyoina@gmail.com> * remove span from message Signed-off-by: SammyOina <sammyoina@gmail.com> * trace over rabbitmq and mqtt Signed-off-by: SammyOina <sammyoina@gmail.com> * fix failing build Signed-off-by: SammyOina <sammyoina@gmail.com> * trace subscribe and unsuscribe ops Signed-off-by: SammyOina <sammyoina@gmail.com> * add create span method Signed-off-by: SammyOina <sammyoina@gmail.com> * pass context from main Signed-off-by: SammyOina <sammyoina@gmail.com> * add span tags Signed-off-by: SammyOina <sammyoina@gmail.com> * restore coap transport.go Signed-off-by: SammyOina <sammyoina@gmail.com> * add context to ops Signed-off-by: SammyOina <sammyoina@gmail.com> * add cancel to tracing Signed-off-by: SammyOina <sammyoina@gmail.com> * refactor handler Signed-off-by: SammyOina <sammyoina@gmail.com> * add comments to exported functions return interfaces Signed-off-by: SammyOina <sammyoina@gmail.com> * remove channel span tag Signed-off-by: SammyOina <sammyoina@gmail.com> * add pub sub tracer Signed-off-by: SammyOina <sammyoina@gmail.com> * add pub sub tracer Signed-off-by: SammyOina <sammyoina@gmail.com> * remove prefixes from operations Signed-off-by: SammyOina <sammyoina@gmail.com> * perpetuate context from root remove protocol from span tag Signed-off-by: SammyOina <sammyoina@gmail.com> * reorder functions Signed-off-by: SammyOina <sammyoina@gmail.com> * create span function Signed-off-by: SammyOina <sammyoina@gmail.com> * add suffixes add pubsub tags Signed-off-by: SammyOina <sammyoina@gmail.com> * move from if to switch Signed-off-by: SammyOina <sammyoina@gmail.com> * add jaeger url to docker compose Signed-off-by: SammyOina <sammyoina@gmail.com> * remove snake case variables add comments to exported functions Signed-off-by: SammyOina <sammyoina@gmail.com> * check for empty sub topics Signed-off-by: SammyOina <sammyoina@gmail.com> * remove tracing on consumers Signed-off-by: SammyOina <sammyoina@gmail.com> * remove duplicate tracers Signed-off-by: SammyOina <sammyoina@gmail.com> * clean up unused variables Signed-off-by: SammyOina <sammyoina@gmail.com> * remove empty lines Signed-off-by: SammyOina <sammyoina@gmail.com> * write better comments Signed-off-by: SammyOina <sammyoina@gmail.com> * update readme jaeger url Signed-off-by: SammyOina <sammyoina@gmail.com> * initialize tracing midlleware on main update readme jaeger urls Signed-off-by: SammyOina <sammyoina@gmail.com> * reorder arguments for tracing middleware Signed-off-by: SammyOina <sammyoina@gmail.com> * add context to subscribe Signed-off-by: SammyOina <sammyoina@gmail.com> --------- Signed-off-by: SammyOina <sammyoina@gmail.com>
154 lines
3.7 KiB
Go
154 lines
3.7 KiB
Go
// Copyright (c) Mainflux
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
|
|
package consumers
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
"strings"
|
|
|
|
"github.com/pelletier/go-toml"
|
|
|
|
"github.com/mainflux/mainflux/logger"
|
|
"github.com/mainflux/mainflux/pkg/errors"
|
|
"github.com/mainflux/mainflux/pkg/messaging"
|
|
"github.com/mainflux/mainflux/pkg/messaging/brokers"
|
|
"github.com/mainflux/mainflux/pkg/transformers"
|
|
"github.com/mainflux/mainflux/pkg/transformers/json"
|
|
"github.com/mainflux/mainflux/pkg/transformers/senml"
|
|
)
|
|
|
|
const (
|
|
defContentType = "application/senml+json"
|
|
defFormat = "senml"
|
|
)
|
|
|
|
var (
|
|
errOpenConfFile = errors.New("unable to open configuration file")
|
|
errParseConfFile = errors.New("unable to parse configuration file")
|
|
)
|
|
|
|
// Start method starts consuming messages received from Message broker.
|
|
// This method transforms messages to SenML format before
|
|
// using MessageRepository to store them.
|
|
func Start(ctx context.Context, id string, sub messaging.Subscriber, consumer interface{}, configPath string, logger logger.Logger) error {
|
|
cfg, err := loadConfig(configPath)
|
|
if err != nil {
|
|
logger.Warn(fmt.Sprintf("Failed to load consumer config: %s", err))
|
|
}
|
|
|
|
transformer := makeTransformer(cfg.TransformerCfg, logger)
|
|
|
|
for _, subject := range cfg.SubscriberCfg.Subjects {
|
|
switch c := consumer.(type) {
|
|
case AsyncConsumer:
|
|
if err := sub.Subscribe(ctx, id, subject, handleAsync(transformer, c)); err != nil {
|
|
return err
|
|
}
|
|
case BlockingConsumer:
|
|
if err := sub.Subscribe(ctx, id, subject, handleSync(transformer, c)); err != nil {
|
|
return err
|
|
}
|
|
default:
|
|
return errors.ErrInvalidQueryParams
|
|
}
|
|
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func handleSync(t transformers.Transformer, sc BlockingConsumer) handleFunc {
|
|
return func(msg *messaging.Message) error {
|
|
m := interface{}(msg)
|
|
var err error
|
|
if t != nil {
|
|
m, err = t.Transform(msg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return sc.ConsumeBlocking(m)
|
|
}
|
|
}
|
|
|
|
func handleAsync(t transformers.Transformer, ac AsyncConsumer) handleFunc {
|
|
return func(msg *messaging.Message) error {
|
|
m := interface{}(msg)
|
|
var err error
|
|
if t != nil {
|
|
m, err = t.Transform(msg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
ac.ConsumeAsync(m)
|
|
return nil
|
|
}
|
|
}
|
|
|
|
type handleFunc func(msg *messaging.Message) error
|
|
|
|
func (h handleFunc) Handle(msg *messaging.Message) error {
|
|
return h(msg)
|
|
}
|
|
|
|
func (h handleFunc) Cancel() error {
|
|
return nil
|
|
}
|
|
|
|
type subscriberConfig struct {
|
|
Subjects []string `toml:"subjects"`
|
|
}
|
|
|
|
type transformerConfig struct {
|
|
Format string `toml:"format"`
|
|
ContentType string `toml:"content_type"`
|
|
TimeFields []json.TimeField `toml:"time_fields"`
|
|
}
|
|
|
|
type config struct {
|
|
SubscriberCfg subscriberConfig `toml:"subscriber"`
|
|
TransformerCfg transformerConfig `toml:"transformer"`
|
|
}
|
|
|
|
func loadConfig(configPath string) (config, error) {
|
|
cfg := config{
|
|
SubscriberCfg: subscriberConfig{
|
|
Subjects: []string{brokers.SubjectAllChannels},
|
|
},
|
|
TransformerCfg: transformerConfig{
|
|
Format: defFormat,
|
|
ContentType: defContentType,
|
|
},
|
|
}
|
|
|
|
data, err := os.ReadFile(configPath)
|
|
if err != nil {
|
|
return cfg, errors.Wrap(errOpenConfFile, err)
|
|
}
|
|
|
|
if err := toml.Unmarshal(data, &cfg); err != nil {
|
|
return cfg, errors.Wrap(errParseConfFile, err)
|
|
}
|
|
|
|
return cfg, nil
|
|
}
|
|
|
|
func makeTransformer(cfg transformerConfig, logger logger.Logger) transformers.Transformer {
|
|
switch strings.ToUpper(cfg.Format) {
|
|
case "SENML":
|
|
logger.Info("Using SenML transformer")
|
|
return senml.New(cfg.ContentType)
|
|
case "JSON":
|
|
logger.Info("Using JSON transformer")
|
|
return json.New(cfg.TimeFields)
|
|
default:
|
|
logger.Error(fmt.Sprintf("Can't create transformer: unknown transformer type %s", cfg.Format))
|
|
os.Exit(1)
|
|
return nil
|
|
}
|
|
}
|