1
0
mirror of https://github.com/mainflux/mainflux.git synced 2025-04-26 13:48:53 +08:00

NOISSUE - Refactor MQTT subscriber (#1561)

* correct suscriber interface validator + refactore token error handling

Signed-off-by: tzzed <zerouali.t@gmail.com>

* apply review suggestion

Signed-off-by: tzzed <zerouali.t@gmail.com>

Co-authored-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com>
This commit is contained in:
__touk__ 2022-02-09 10:19:09 +01:00 committed by GitHub
parent 1f8a221c22
commit f8ce94e9bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 17 additions and 19 deletions

View File

@ -8,13 +8,14 @@ import (
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/mainflux/mainflux/pkg/messaging"
)
var _ messaging.Publisher = (*publisher)(nil)
var errPublishTimeout = errors.New("failed to publish due to timeout reached")
var _ messaging.Publisher = (*publisher)(nil)
type publisher struct {
client mqtt.Client
timeout time.Duration
@ -40,11 +41,9 @@ func (pub publisher) Publish(topic string, msg messaging.Message) error {
return token.Error()
}
ok := token.WaitTimeout(pub.timeout)
if ok && token.Error() != nil {
return token.Error()
}
if !ok {
return errPublishTimeout
}
return nil
return token.Error()
}

View File

@ -29,12 +29,13 @@ func newClient(address string, timeout time.Duration) (mqtt.Client, error) {
}
ok := token.WaitTimeout(timeout)
if ok && token.Error() != nil {
return nil, token.Error()
}
if !ok {
return nil, errConnect
}
if token.Error() != nil {
return nil, token.Error()
}
return client, nil
}

View File

@ -9,18 +9,19 @@ import (
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/gogo/protobuf/proto"
log "github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/pkg/errors"
"github.com/mainflux/mainflux/pkg/messaging"
)
var _ messaging.Publisher = (*publisher)(nil)
var (
errSubscribeTimeout = errors.New("failed to subscribe due to timeout reached")
errUnsubscribeTimeout = errors.New("failed to unsubscribe due to timeout reached")
)
var _ messaging.Subscriber = (*subscriber)(nil)
type subscriber struct {
client mqtt.Client
timeout time.Duration
@ -48,13 +49,11 @@ func (sub subscriber) Subscribe(topic string, handler messaging.MessageHandler)
return token.Error()
}
ok := token.WaitTimeout(sub.timeout)
if ok && token.Error() != nil {
return token.Error()
}
if !ok {
return errSubscribeTimeout
}
return nil
return token.Error()
}
func (sub subscriber) Unsubscribe(topic string) error {
@ -62,14 +61,13 @@ func (sub subscriber) Unsubscribe(topic string) error {
if token.Error() != nil {
return token.Error()
}
ok := token.WaitTimeout(sub.timeout)
if ok && token.Error() != nil {
return token.Error()
}
if !ok {
return errUnsubscribeTimeout
}
return nil
return token.Error()
}
func (sub subscriber) mqttHandler(h messaging.MessageHandler) mqtt.MessageHandler {