1
0
mirror of https://github.com/mainflux/mainflux.git synced 2025-04-24 13:48:49 +08:00

MF-1630 - Replace old subscriptions with a new one instead of throwing an error (#1633)

* updated pubsub.subscribe() to delete and renew already subscribed objects

Signed-off-by: aryan <aryangodara03@gmail.com>

* fixing sync.mutex error

Signed-off-by: aryan <aryangodara03@gmail.com>

* made changes to pubsub_test.go to fix failing CI

Signed-off-by: aryan <aryangodara03@gmail.com>

* made some changes to pubsub.go

Signed-off-by: aryan <aryangodara03@gmail.com>

* made some changes to pubsub.go added flag var

Signed-off-by: aryan <aryangodara03@gmail.com>

* little code cleanup

Signed-off-by: aryan <aryangodara03@gmail.com>

* minor change, to make code more intuitive

Signed-off-by: aryan <aryangodara03@gmail.com>

* added comment, removed the ws_client file

Signed-off-by: aryan <aryangodara03@gmail.com>

* removed ErrAlreadySubscribed

Signed-off-by: aryan <aryangodara03@gmail.com>

* fixed some failing tests, will check again after semaphore result

Signed-off-by: aryan <aryangodara03@gmail.com>

* made similar changes for MQTT and rabbitMQ

Signed-off-by: aryan <aryangodara03@gmail.com>

* removed extra comment

Signed-off-by: aryan <aryangodara03@gmail.com>

* removed extra comment

Signed-off-by: aryan <aryangodara03@gmail.com>

* removed extra comment

Signed-off-by: aryan <aryangodara03@gmail.com>

* to fix failing CI

Signed-off-by: aryan <aryangodara03@gmail.com>

* checking code after adding conditions

Signed-off-by: aryan <aryangodara03@gmail.com>

* added ps.mu.Lock(), and changed len(s)

Signed-off-by: aryan <aryangodara03@gmail.com>

* added tests for failing unsubscribe

Signed-off-by: aryan <aryangodara03@gmail.com>

* shifted defer lock

Signed-off-by: aryan <aryangodara03@gmail.com>

* fixed mqtt/pubsub

Signed-off-by: aryan <aryangodara03@gmail.com>

* fix mqtt/pubsub typo

Signed-off-by: aryan <aryangodara03@gmail.com>

* add comment to improve readability

Signed-off-by: aryan <aryangodara03@gmail.com>

* add comment to better improve readability

Signed-off-by: aryan <aryangodara03@gmail.com>

* add comments for docs

Signed-off-by: aryan <aryangodara03@gmail.com>

* improve doc comment

Signed-off-by: aryan <aryangodara03@gmail.com>

* change to mqtt/pubsub

Signed-off-by: aryan <aryangodara03@gmail.com>

* remove unnecessary mutex lock

Signed-off-by: aryan <aryangodara03@gmail.com>

* change to pubsub

Signed-off-by: aryan <aryangodara03@gmail.com>

* add comments to improve readability

Signed-off-by: aryan <aryangodara03@gmail.com>

* fix typo in comment

Signed-off-by: aryan <aryangodara03@gmail.com>

Signed-off-by: aryan <aryangodara03@gmail.com>
This commit is contained in:
Aryan Godara 2022-08-12 21:38:20 +05:30 committed by GitHub
parent 721ee545f9
commit d9c47045cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 168 additions and 22 deletions

View File

@ -25,7 +25,6 @@ var (
errSubscribeTimeout = errors.New("failed to subscribe due to timeout reached")
errUnsubscribeTimeout = errors.New("failed to unsubscribe due to timeout reached")
errUnsubscribeDeleteTopic = errors.New("failed to unsubscribe due to deletion of topic")
errAlreadySubscribed = errors.New("already subscribed to topic")
errNotSubscribed = errors.New("not subscribed")
errEmptyTopic = errors.New("empty topic")
errEmptyID = errors.New("empty ID")
@ -47,6 +46,7 @@ type pubsub struct {
subscriptions map[string]subscription
}
// NewPubSub returns MQTT message publisher/subscriber.
func NewPubSub(url, queue string, timeout time.Duration, logger log.Logger) (messaging.PubSub, error) {
client, err := newClient(url, "mqtt-publisher", timeout)
if err != nil {
@ -80,7 +80,23 @@ func (ps pubsub) Subscribe(id, topic string, handler messaging.MessageHandler) e
case true:
// Check topic
if ok = s.contains(topic); ok {
return errAlreadySubscribed
// Unlocking, so that Unsubscribe() can access ps.subscriptions
ps.mu.Unlock()
err := ps.Unsubscribe(id, topic)
ps.mu.Lock() // Lock so that deferred unlock handle it
if err != nil {
return err
}
if len(ps.subscriptions) == 0 {
client, err := newClient(ps.address, id, ps.timeout)
if err != nil {
return err
}
s = subscription{
client: client,
topics: []string{topic},
}
}
}
s.topics = append(s.topics, topic)
default:
@ -93,6 +109,7 @@ func (ps pubsub) Subscribe(id, topic string, handler messaging.MessageHandler) e
topics: []string{topic},
}
}
token := s.client.Subscribe(topic, qos, ps.mqttHandler(handler))
if token.Error() != nil {
return token.Error()

View File

@ -22,6 +22,7 @@ var (
ErrNotSubscribed = errors.New("not subscribed")
ErrEmptyTopic = errors.New("empty topic")
ErrEmptyID = errors.New("empty id")
ErrFailed = errors.New("failed")
)
var _ messaging.PubSub = (*pubsub)(nil)
@ -69,20 +70,30 @@ func (ps *pubsub) Subscribe(id, topic string, handler messaging.MessageHandler)
if topic == "" {
return ErrEmptyTopic
}
ps.mu.Lock()
defer ps.mu.Unlock()
// Check topic
s, ok := ps.subscriptions[topic]
switch ok {
case true:
// Check topic ID
if ok {
// Check client ID
if _, ok := s[id]; ok {
return ErrAlreadySubscribed
// Unlocking, so that Unsubscribe() can access ps.subscriptions
ps.mu.Unlock()
if err := ps.Unsubscribe(id, topic); err != nil {
return err
}
ps.mu.Lock()
// value of s can be changed while ps.mu is unlocked
s = ps.subscriptions[topic]
}
default:
}
defer ps.mu.Unlock()
if s == nil {
s = make(map[string]subscription)
ps.subscriptions[topic] = s
}
nh := ps.natsHandler(handler)
if ps.queue != "" {
@ -104,6 +115,7 @@ func (ps *pubsub) Subscribe(id, topic string, handler messaging.MessageHandler)
Subscription: sub,
cancel: handler.Cancel,
}
return nil
}

View File

@ -88,6 +88,7 @@ func TestPubsub(t *testing.T) {
clientID string
errorMessage error
pubsub bool //true for subscribe and false for unsubscribe
handler messaging.MessageHandler
}{
{
desc: "Subscribe to a topic with an ID",
@ -95,6 +96,7 @@ func TestPubsub(t *testing.T) {
clientID: "clientid1",
errorMessage: nil,
pubsub: true,
handler: handler{false},
},
{
desc: "Subscribe to the same topic with a different ID",
@ -102,13 +104,15 @@ func TestPubsub(t *testing.T) {
clientID: "clientid2",
errorMessage: nil,
pubsub: true,
handler: handler{false},
},
{
desc: "Subscribe to an already subscribed topic with an ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
clientID: "clientid1",
errorMessage: nats.ErrAlreadySubscribed,
errorMessage: nil,
pubsub: true,
handler: handler{false},
},
{
desc: "Unsubscribe from a topic with an ID",
@ -116,6 +120,7 @@ func TestPubsub(t *testing.T) {
clientID: "clientid1",
errorMessage: nil,
pubsub: false,
handler: handler{false},
},
{
desc: "Unsubscribe from a non-existent topic with an ID",
@ -123,6 +128,7 @@ func TestPubsub(t *testing.T) {
clientID: "clientid1",
errorMessage: nats.ErrNotSubscribed,
pubsub: false,
handler: handler{false},
},
{
desc: "Unsubscribe from the same topic with a different ID",
@ -130,6 +136,7 @@ func TestPubsub(t *testing.T) {
clientID: "clientidd2",
errorMessage: nats.ErrNotSubscribed,
pubsub: false,
handler: handler{false},
},
{
desc: "Unsubscribe from the same topic with a different ID not subscribed",
@ -137,6 +144,7 @@ func TestPubsub(t *testing.T) {
clientID: "clientidd3",
errorMessage: nats.ErrNotSubscribed,
pubsub: false,
handler: handler{false},
},
{
desc: "Unsubscribe from an already unsubscribed topic with an ID",
@ -144,6 +152,7 @@ func TestPubsub(t *testing.T) {
clientID: "clientid1",
errorMessage: nats.ErrNotSubscribed,
pubsub: false,
handler: handler{false},
},
{
desc: "Subscribe to a topic with a subtopic with an ID",
@ -151,13 +160,15 @@ func TestPubsub(t *testing.T) {
clientID: "clientidd1",
errorMessage: nil,
pubsub: true,
handler: handler{false},
},
{
desc: "Subscribe to an already subscribed topic with a subtopic with an ID",
topic: fmt.Sprintf("%s.%s.%s", chansPrefix, topic, subtopic),
clientID: "clientidd1",
errorMessage: nats.ErrAlreadySubscribed,
errorMessage: nil,
pubsub: true,
handler: handler{false},
},
{
desc: "Unsubscribe from a topic with a subtopic with an ID",
@ -165,6 +176,7 @@ func TestPubsub(t *testing.T) {
clientID: "clientidd1",
errorMessage: nil,
pubsub: false,
handler: handler{false},
},
{
desc: "Unsubscribe from an already unsubscribed topic with a subtopic with an ID",
@ -172,6 +184,7 @@ func TestPubsub(t *testing.T) {
clientID: "clientid1",
errorMessage: nats.ErrNotSubscribed,
pubsub: false,
handler: handler{false},
},
{
desc: "Subscribe to an empty topic with an ID",
@ -179,6 +192,7 @@ func TestPubsub(t *testing.T) {
clientID: "clientid1",
errorMessage: nats.ErrEmptyTopic,
pubsub: true,
handler: handler{false},
},
{
desc: "Unsubscribe from an empty topic with an ID",
@ -186,6 +200,7 @@ func TestPubsub(t *testing.T) {
clientID: "clientid1",
errorMessage: nats.ErrEmptyTopic,
pubsub: false,
handler: handler{false},
},
{
desc: "Subscribe to a topic with empty id",
@ -193,6 +208,7 @@ func TestPubsub(t *testing.T) {
clientID: "",
errorMessage: nats.ErrEmptyID,
pubsub: true,
handler: handler{false},
},
{
desc: "Unsubscribe from a topic with empty id",
@ -200,12 +216,45 @@ func TestPubsub(t *testing.T) {
clientID: "",
errorMessage: nats.ErrEmptyID,
pubsub: false,
handler: handler{false},
},
{
desc: "Subscribe to another topic with an ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic+"1"),
clientID: "clientid3",
errorMessage: nil,
pubsub: true,
handler: handler{true},
},
{
desc: "Subscribe to another already subscribed topic with an ID with Unsubscribe failing",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic+"1"),
clientID: "clientid3",
errorMessage: nats.ErrFailed,
pubsub: true,
handler: handler{true},
},
{
desc: "Subscribe to a new topic with an ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic+"2"),
clientID: "clientid4",
errorMessage: nil,
pubsub: true,
handler: handler{true},
},
{
desc: "Unsubscribe from a topic with an ID with failing handler",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic+"2"),
clientID: "clientid4",
errorMessage: nats.ErrFailed,
pubsub: false,
handler: handler{true},
},
}
for _, pc := range subcases {
if pc.pubsub == true {
err := pubsub.Subscribe(pc.clientID, pc.topic, handler{})
err := pubsub.Subscribe(pc.clientID, pc.topic, pc.handler)
if pc.errorMessage == nil {
require.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err))
} else {
@ -222,7 +271,9 @@ func TestPubsub(t *testing.T) {
}
}
type handler struct{}
type handler struct {
fail bool
}
func (h handler) Handle(msg messaging.Message) error {
msgChan <- msg
@ -230,5 +281,8 @@ func (h handler) Handle(msg messaging.Message) error {
}
func (h handler) Cancel() error {
if h.fail {
return nats.ErrFailed
}
return nil
}

View File

@ -26,6 +26,7 @@ var (
ErrNotSubscribed = errors.New("not subscribed")
ErrEmptyTopic = errors.New("empty topic")
ErrEmptyID = errors.New("empty id")
ErrFailed = errors.New("failed")
)
var _ messaging.PubSub = (*pubsub)(nil)
@ -72,16 +73,24 @@ func (ps *pubsub) Subscribe(id, topic string, handler messaging.MessageHandler)
return ErrEmptyTopic
}
ps.mu.Lock()
defer ps.mu.Unlock()
// Check topic
s, ok := ps.subscriptions[topic]
switch ok {
case true:
// Check topic ID
if ok {
// Check client ID
if _, ok := s[id]; ok {
return ErrAlreadySubscribed
// Unlocking, so that Unsubscribe() can access ps.subscriptions
ps.mu.Unlock()
if err := ps.Unsubscribe(id, topic); err != nil {
return err
}
ps.mu.Lock()
// value of s can be changed while ps.mu is unlocked
s = ps.subscriptions[topic]
}
default:
}
defer ps.mu.Unlock()
if s == nil {
s = make(map[string]subscription)
ps.subscriptions[topic] = s
}

View File

@ -85,6 +85,7 @@ func TestPubsub(t *testing.T) {
clientID string
errorMessage error
pubsub bool //true for subscribe and false for unsubscribe
handler messaging.MessageHandler
}{
{
desc: "Subscribe to a topic with an ID",
@ -92,6 +93,7 @@ func TestPubsub(t *testing.T) {
clientID: "clientid1",
errorMessage: nil,
pubsub: true,
handler: handler{false},
},
{
desc: "Subscribe to the same topic with a different ID",
@ -99,13 +101,15 @@ func TestPubsub(t *testing.T) {
clientID: "clientid2",
errorMessage: nil,
pubsub: true,
handler: handler{false},
},
{
desc: "Subscribe to an already subscribed topic with an ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic),
clientID: "clientid1",
errorMessage: rabbitmq.ErrAlreadySubscribed,
errorMessage: nil,
pubsub: true,
handler: handler{false},
},
{
desc: "Unsubscribe from a topic with an ID",
@ -113,6 +117,7 @@ func TestPubsub(t *testing.T) {
clientID: "clientid1",
errorMessage: nil,
pubsub: false,
handler: handler{false},
},
{
desc: "Unsubscribe from a non-existent topic with an ID",
@ -120,6 +125,7 @@ func TestPubsub(t *testing.T) {
clientID: "clientid1",
errorMessage: rabbitmq.ErrNotSubscribed,
pubsub: false,
handler: handler{false},
},
{
desc: "Unsubscribe from the same topic with a different ID",
@ -127,6 +133,7 @@ func TestPubsub(t *testing.T) {
clientID: "clientidd2",
errorMessage: rabbitmq.ErrNotSubscribed,
pubsub: false,
handler: handler{false},
},
{
desc: "Unsubscribe from the same topic with a different ID not subscribed",
@ -134,6 +141,7 @@ func TestPubsub(t *testing.T) {
clientID: "clientidd3",
errorMessage: rabbitmq.ErrNotSubscribed,
pubsub: false,
handler: handler{false},
},
{
desc: "Unsubscribe from an already unsubscribed topic with an ID",
@ -141,6 +149,7 @@ func TestPubsub(t *testing.T) {
clientID: "clientid1",
errorMessage: rabbitmq.ErrNotSubscribed,
pubsub: false,
handler: handler{false},
},
{
desc: "Subscribe to a topic with a subtopic with an ID",
@ -148,13 +157,15 @@ func TestPubsub(t *testing.T) {
clientID: "clientidd1",
errorMessage: nil,
pubsub: true,
handler: handler{false},
},
{
desc: "Subscribe to an already subscribed topic with a subtopic with an ID",
topic: fmt.Sprintf("%s.%s.%s", chansPrefix, topic, subtopic),
clientID: "clientidd1",
errorMessage: rabbitmq.ErrAlreadySubscribed,
errorMessage: nil,
pubsub: true,
handler: handler{false},
},
{
desc: "Unsubscribe from a topic with a subtopic with an ID",
@ -162,6 +173,7 @@ func TestPubsub(t *testing.T) {
clientID: "clientidd1",
errorMessage: nil,
pubsub: false,
handler: handler{false},
},
{
desc: "Unsubscribe from an already unsubscribed topic with a subtopic with an ID",
@ -169,6 +181,7 @@ func TestPubsub(t *testing.T) {
clientID: "clientid1",
errorMessage: rabbitmq.ErrNotSubscribed,
pubsub: false,
handler: handler{false},
},
{
desc: "Subscribe to an empty topic with an ID",
@ -176,6 +189,7 @@ func TestPubsub(t *testing.T) {
clientID: "clientid1",
errorMessage: rabbitmq.ErrEmptyTopic,
pubsub: true,
handler: handler{false},
},
{
desc: "Unsubscribe from an empty topic with an ID",
@ -183,6 +197,7 @@ func TestPubsub(t *testing.T) {
clientID: "clientid1",
errorMessage: rabbitmq.ErrEmptyTopic,
pubsub: false,
handler: handler{false},
},
{
desc: "Subscribe to a topic with empty id",
@ -190,6 +205,7 @@ func TestPubsub(t *testing.T) {
clientID: "",
errorMessage: rabbitmq.ErrEmptyID,
pubsub: true,
handler: handler{false},
},
{
desc: "Unsubscribe from a topic with empty id",
@ -197,12 +213,45 @@ func TestPubsub(t *testing.T) {
clientID: "",
errorMessage: rabbitmq.ErrEmptyID,
pubsub: false,
handler: handler{false},
},
{
desc: "Subscribe to another topic with an ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic+"1"),
clientID: "clientid3",
errorMessage: nil,
pubsub: true,
handler: handler{true},
},
{
desc: "Subscribe to another already subscribed topic with an ID with Unsubscribe failing",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic+"1"),
clientID: "clientid3",
errorMessage: rabbitmq.ErrFailed,
pubsub: true,
handler: handler{true},
},
{
desc: "Subscribe to a new topic with an ID",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic+"2"),
clientID: "clientid4",
errorMessage: nil,
pubsub: true,
handler: handler{true},
},
{
desc: "Unsubscribe from a topic with an ID with failing handler",
topic: fmt.Sprintf("%s.%s", chansPrefix, topic+"2"),
clientID: "clientid4",
errorMessage: rabbitmq.ErrFailed,
pubsub: false,
handler: handler{true},
},
}
for _, pc := range subcases {
if pc.pubsub == true {
err := pubsub.Subscribe(pc.clientID, pc.topic, handler{})
err := pubsub.Subscribe(pc.clientID, pc.topic, pc.handler)
if pc.errorMessage == nil {
require.Nil(t, err, fmt.Sprintf("%s got unexpected error: %s", pc.desc, err))
} else {
@ -219,7 +268,9 @@ func TestPubsub(t *testing.T) {
}
}
type handler struct{}
type handler struct {
fail bool
}
func (h handler) Handle(msg messaging.Message) error {
msgChan <- msg
@ -227,5 +278,8 @@ func (h handler) Handle(msg messaging.Message) error {
}
func (h handler) Cancel() error {
if h.fail {
return rabbitmq.ErrFailed
}
return nil
}