mirror of
https://github.com/mainflux/mainflux.git
synced 2025-04-26 13:48:53 +08:00

* fix error package errors Signed-off-by: aryan <aryangodara03@gmail.com> * fix bootstap and bootstrap api Signed-off-by: aryan <aryangodara03@gmail.com> * fix certs Signed-off-by: aryan <aryangodara03@gmail.com> * fix consumers Signed-off-by: aryan <aryangodara03@gmail.com> * fix http Signed-off-by: aryan <aryangodara03@gmail.com> * fix provision Signed-off-by: aryan <aryangodara03@gmail.com> * fix readers Signed-off-by: aryan <aryangodara03@gmail.com> * fix twins Signed-off-by: aryan <aryangodara03@gmail.com> * fix things Signed-off-by: aryan <aryangodara03@gmail.com> * fix users Signed-off-by: aryan <aryangodara03@gmail.com> * fix sdk excpet channel policies users things Signed-off-by: aryan <aryangodara03@gmail.com> * tests passing, but logging not working for things and users Signed-off-by: aryan <aryangodara03@gmail.com> * fix sdk tests, and other failing tests Signed-off-by: aryan <aryangodara03@gmail.com> * fix comment Signed-off-by: aryan <aryangodara03@gmail.com> * fix errors acc to pr review Signed-off-by: aryan <aryangodara03@gmail.com> * fix errror wrapping in consumers api Signed-off-by: aryan <aryangodara03@gmail.com> * all tests running Signed-off-by: aryan <aryangodara03@gmail.com> * fix encodeError Signed-off-by: aryan <aryangodara03@gmail.com> * fix minor issues Signed-off-by: aryan <aryangodara03@gmail.com> * fix failing sdk policy tests Signed-off-by: aryan <aryangodara03@gmail.com> * fix errors in things test sdk Signed-off-by: aryan <aryangodara03@gmail.com> * update things service Signed-off-by: aryan <aryangodara03@gmail.com> * update usrs service Signed-off-by: aryan <aryangodara03@gmail.com> * fix things and users sdk Signed-off-by: aryan <aryangodara03@gmail.com> * fix sdk for channels groups policies things users Signed-off-by: aryan <aryangodara03@gmail.com> * fix remaining services and sdk Signed-off-by: aryan <aryangodara03@gmail.com> * fix bootstrap twins Signed-off-by: aryan <aryangodara03@gmail.com> * resolve conflicts Signed-off-by: aryan <aryangodara03@gmail.com> * Shift errmalformedentity to pkg/errors Signed-off-by: aryan <aryangodara03@gmail.com> * Fix bootstrap service Signed-off-by: aryan <aryangodara03@gmail.com> * Add errors.Unwrap and use in encodeError Signed-off-by: aryan <aryangodara03@gmail.com> * Fix type in print statement for policies_test Signed-off-by: aryan <aryangodara03@gmail.com> * Fix ordering of errvalidation wrapping and encodeError Signed-off-by: aryan <aryangodara03@gmail.com> * Fix failing tests Signed-off-by: aryan <aryangodara03@gmail.com> --------- Signed-off-by: aryan <aryangodara03@gmail.com>
155 lines
3.8 KiB
Go
155 lines
3.8 KiB
Go
// Copyright (c) Mainflux
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
|
|
package consumers
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
"strings"
|
|
|
|
"github.com/pelletier/go-toml"
|
|
|
|
"github.com/mainflux/mainflux/internal/apiutil"
|
|
"github.com/mainflux/mainflux/logger"
|
|
"github.com/mainflux/mainflux/pkg/errors"
|
|
"github.com/mainflux/mainflux/pkg/messaging"
|
|
"github.com/mainflux/mainflux/pkg/messaging/brokers"
|
|
"github.com/mainflux/mainflux/pkg/transformers"
|
|
"github.com/mainflux/mainflux/pkg/transformers/json"
|
|
"github.com/mainflux/mainflux/pkg/transformers/senml"
|
|
)
|
|
|
|
const (
|
|
defContentType = "application/senml+json"
|
|
defFormat = "senml"
|
|
)
|
|
|
|
var (
|
|
errOpenConfFile = errors.New("unable to open configuration file")
|
|
errParseConfFile = errors.New("unable to parse configuration file")
|
|
)
|
|
|
|
// Start method starts consuming messages received from Message broker.
|
|
// This method transforms messages to SenML format before
|
|
// using MessageRepository to store them.
|
|
func Start(ctx context.Context, id string, sub messaging.Subscriber, consumer interface{}, configPath string, logger logger.Logger) error {
|
|
cfg, err := loadConfig(configPath)
|
|
if err != nil {
|
|
logger.Warn(fmt.Sprintf("Failed to load consumer config: %s", err))
|
|
}
|
|
|
|
transformer := makeTransformer(cfg.TransformerCfg, logger)
|
|
|
|
for _, subject := range cfg.SubscriberCfg.Subjects {
|
|
switch c := consumer.(type) {
|
|
case AsyncConsumer:
|
|
if err := sub.Subscribe(ctx, id, subject, handleAsync(ctx, transformer, c)); err != nil {
|
|
return err
|
|
}
|
|
case BlockingConsumer:
|
|
if err := sub.Subscribe(ctx, id, subject, handleSync(ctx, transformer, c)); err != nil {
|
|
return err
|
|
}
|
|
default:
|
|
return apiutil.ErrInvalidQueryParams
|
|
}
|
|
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func handleSync(ctx context.Context, t transformers.Transformer, sc BlockingConsumer) handleFunc {
|
|
return func(msg *messaging.Message) error {
|
|
m := interface{}(msg)
|
|
var err error
|
|
if t != nil {
|
|
m, err = t.Transform(msg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return sc.ConsumeBlocking(ctx, m)
|
|
}
|
|
}
|
|
|
|
func handleAsync(ctx context.Context, t transformers.Transformer, ac AsyncConsumer) handleFunc {
|
|
return func(msg *messaging.Message) error {
|
|
m := interface{}(msg)
|
|
var err error
|
|
if t != nil {
|
|
m, err = t.Transform(msg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
ac.ConsumeAsync(ctx, m)
|
|
return nil
|
|
}
|
|
}
|
|
|
|
type handleFunc func(msg *messaging.Message) error
|
|
|
|
func (h handleFunc) Handle(msg *messaging.Message) error {
|
|
return h(msg)
|
|
}
|
|
|
|
func (h handleFunc) Cancel() error {
|
|
return nil
|
|
}
|
|
|
|
type subscriberConfig struct {
|
|
Subjects []string `toml:"subjects"`
|
|
}
|
|
|
|
type transformerConfig struct {
|
|
Format string `toml:"format"`
|
|
ContentType string `toml:"content_type"`
|
|
TimeFields []json.TimeField `toml:"time_fields"`
|
|
}
|
|
|
|
type config struct {
|
|
SubscriberCfg subscriberConfig `toml:"subscriber"`
|
|
TransformerCfg transformerConfig `toml:"transformer"`
|
|
}
|
|
|
|
func loadConfig(configPath string) (config, error) {
|
|
cfg := config{
|
|
SubscriberCfg: subscriberConfig{
|
|
Subjects: []string{brokers.SubjectAllChannels},
|
|
},
|
|
TransformerCfg: transformerConfig{
|
|
Format: defFormat,
|
|
ContentType: defContentType,
|
|
},
|
|
}
|
|
|
|
data, err := os.ReadFile(configPath)
|
|
if err != nil {
|
|
return cfg, errors.Wrap(errOpenConfFile, err)
|
|
}
|
|
|
|
if err := toml.Unmarshal(data, &cfg); err != nil {
|
|
return cfg, errors.Wrap(errParseConfFile, err)
|
|
}
|
|
|
|
return cfg, nil
|
|
}
|
|
|
|
func makeTransformer(cfg transformerConfig, logger logger.Logger) transformers.Transformer {
|
|
switch strings.ToUpper(cfg.Format) {
|
|
case "SENML":
|
|
logger.Info("Using SenML transformer")
|
|
return senml.New(cfg.ContentType)
|
|
case "JSON":
|
|
logger.Info("Using JSON transformer")
|
|
return json.New(cfg.TimeFields)
|
|
default:
|
|
logger.Error(fmt.Sprintf("Can't create transformer: unknown transformer type %s", cfg.Format))
|
|
os.Exit(1)
|
|
return nil
|
|
}
|
|
}
|