diff --git a/cmd/twins/main.go b/cmd/twins/main.go index 426dbcf9..fe77aa0f 100644 --- a/cmd/twins/main.go +++ b/cmd/twins/main.go @@ -193,8 +193,13 @@ func newService(ctx context.Context, id string, ps messaging.PubSub, cfg config, counter, latency := internal.MakeMetrics(svcName, "api") svc = api.MetricsMiddleware(svc, counter, latency) - if err = ps.Subscribe(ctx, id, brokers.SubjectAllChannels, handle(ctx, logger, cfg.ChannelID, svc)); err != nil { - return nil, err + subCfg := messaging.SubscriberConfig{ + ID: id, + Topic: brokers.SubjectAllChannels, + Handler: handle(ctx, logger, cfg.ChannelID, svc), + } + if err = ps.Subscribe(ctx, subCfg); err != nil { + logger.Fatal(err.Error()) } return svc, nil diff --git a/coap/adapter.go b/coap/adapter.go index b28bd55f..2debbdca 100644 --- a/coap/adapter.go +++ b/coap/adapter.go @@ -96,7 +96,12 @@ func (svc *adapterService) Subscribe(ctx context.Context, key, chanID, subtopic if subtopic != "" { subject = fmt.Sprintf("%s.%s", subject, subtopic) } - return svc.pubsub.Subscribe(ctx, c.Token(), subject, c) + subCfg := messaging.SubscriberConfig{ + ID: c.Token(), + Topic: subject, + Handler: c, + } + return svc.pubsub.Subscribe(ctx, subCfg) } func (svc *adapterService) Unsubscribe(ctx context.Context, key, chanID, subtopic, token string) error { @@ -119,5 +124,6 @@ func (svc *adapterService) Unsubscribe(ctx context.Context, key, chanID, subtopi if subtopic != "" { subject = fmt.Sprintf("%s.%s", subject, subtopic) } + return svc.pubsub.Unsubscribe(ctx, token, subject) } diff --git a/consumers/messages.go b/consumers/messages.go index 59788f10..e1f37486 100644 --- a/consumers/messages.go +++ b/consumers/messages.go @@ -42,13 +42,20 @@ func Start(ctx context.Context, id string, sub messaging.Subscriber, consumer in transformer := makeTransformer(cfg.TransformerCfg, logger) for _, subject := range cfg.SubscriberCfg.Subjects { + subCfg := messaging.SubscriberConfig{ + ID: id, + Topic: subject, + DeliveryPolicy: messaging.DeliverAllPolicy, + } switch c := consumer.(type) { case AsyncConsumer: - if err := sub.Subscribe(ctx, id, subject, handleAsync(ctx, transformer, c)); err != nil { + subCfg.Handler = handleAsync(ctx, transformer, c) + if err := sub.Subscribe(ctx, subCfg); err != nil { return err } case BlockingConsumer: - if err := sub.Subscribe(ctx, id, subject, handleSync(ctx, transformer, c)); err != nil { + subCfg.Handler = handleSync(ctx, transformer, c) + if err := sub.Subscribe(ctx, subCfg); err != nil { return err } default: diff --git a/consumers/writers/cassandra/init.go b/consumers/writers/cassandra/init.go index ead3331a..bfbe134d 100644 --- a/consumers/writers/cassandra/init.go +++ b/consumers/writers/cassandra/init.go @@ -20,7 +20,7 @@ const ( sum double, time double, update_time double, - PRIMARY KEY (channel, time, id) + PRIMARY KEY (publisher, time, subtopic, name) ) WITH CLUSTERING ORDER BY (time DESC)` jsonTable = `CREATE TABLE IF NOT EXISTS %s ( @@ -31,6 +31,6 @@ const ( protocol text, created bigint, payload text, - PRIMARY KEY (channel, created, id) + PRIMARY KEY (publisher, created, subtopic) ) WITH CLUSTERING ORDER BY (created DESC)` ) diff --git a/consumers/writers/mongodb/consumer.go b/consumers/writers/mongodb/consumer.go index 81408338..8702ebc3 100644 --- a/consumers/writers/mongodb/consumer.go +++ b/consumers/writers/mongodb/consumer.go @@ -10,6 +10,7 @@ import ( "github.com/mainflux/mainflux/pkg/errors" "github.com/mainflux/mainflux/pkg/transformers/json" "github.com/mainflux/mainflux/pkg/transformers/senml" + "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" ) @@ -45,7 +46,17 @@ func (repo *mongoRepo) saveSenml(ctx context.Context, messages interface{}) erro coll := repo.db.Collection(senmlCollection) var dbMsgs []interface{} for _, msg := range msgs { - dbMsgs = append(dbMsgs, msg) + // Check if message is already in database. + filter := bson.M{"time": msg.Time, "publisher": msg.Publisher, "subtopic": msg.Subtopic, "name": msg.Name} + + count, err := coll.CountDocuments(ctx, filter) + if err != nil { + return errors.Wrap(errSaveMessage, err) + } + + if count == 0 { + dbMsgs = append(dbMsgs, msg) + } } _, err := coll.InsertMany(ctx, dbMsgs) diff --git a/consumers/writers/postgres/init.go b/consumers/writers/postgres/init.go index 8efbedc1..15b35189 100644 --- a/consumers/writers/postgres/init.go +++ b/consumers/writers/postgres/init.go @@ -34,6 +34,13 @@ func Migration() *migrate.MemoryMigrationSource { "DROP TABLE messages", }, }, + { + Id: "messages_2", + Up: []string{ + `ALTER TABLE messages DROP CONSTRAINT messages_pkey`, + `ALTER TABLE messages ADD PRIMARY KEY (time, publisher, subtopic, name)`, + }, + }, }, } } diff --git a/mqtt/forwarder.go b/mqtt/forwarder.go index b77ee5e1..cfe49adf 100644 --- a/mqtt/forwarder.go +++ b/mqtt/forwarder.go @@ -33,7 +33,13 @@ func NewForwarder(topic string, logger mflog.Logger) Forwarder { } func (f forwarder) Forward(ctx context.Context, id string, sub messaging.Subscriber, pub messaging.Publisher) error { - return sub.Subscribe(ctx, id, f.topic, handle(ctx, pub, f.logger)) + subCfg := messaging.SubscriberConfig{ + ID: id, + Topic: f.topic, + Handler: handle(ctx, pub, f.logger), + } + + return sub.Subscribe(ctx, subCfg) } func handle(ctx context.Context, pub messaging.Publisher, logger mflog.Logger) handleFunc { diff --git a/pkg/messaging/mqtt/pubsub.go b/pkg/messaging/mqtt/pubsub.go index aa7f59e3..9d90a30c 100644 --- a/pkg/messaging/mqtt/pubsub.go +++ b/pkg/messaging/mqtt/pubsub.go @@ -78,40 +78,40 @@ func NewPubSub(url string, qos uint8, timeout time.Duration, logger mflog.Logger return ret, nil } -func (ps *pubsub) Subscribe(ctx context.Context, id, topic string, handler messaging.MessageHandler) error { - if id == "" { +func (ps *pubsub) Subscribe(ctx context.Context, cfg messaging.SubscriberConfig) error { + if cfg.ID == "" { return ErrEmptyID } - if topic == "" { + if cfg.Topic == "" { return ErrEmptyTopic } ps.mu.Lock() defer ps.mu.Unlock() - s, ok := ps.subscriptions[id] + s, ok := ps.subscriptions[cfg.ID] // If the client exists, check if it's subscribed to the topic and unsubscribe if needed. switch ok { case true: - if ok := s.contains(topic); ok { - if err := s.unsubscribe(topic, ps.timeout); err != nil { + if ok := s.contains(cfg.Topic); ok { + if err := s.unsubscribe(cfg.Topic, ps.timeout); err != nil { return err } } default: - client, err := newClient(ps.address, id, ps.timeout) + client, err := newClient(ps.address, cfg.ID, ps.timeout) if err != nil { return err } s = subscription{ client: client, topics: []string{}, - cancel: handler.Cancel, + cancel: cfg.Handler.Cancel, } } - s.topics = append(s.topics, topic) - ps.subscriptions[id] = s + s.topics = append(s.topics, cfg.Topic) + ps.subscriptions[cfg.ID] = s - token := s.client.Subscribe(topic, byte(ps.qos), ps.mqttHandler(handler)) + token := s.client.Subscribe(cfg.Topic, byte(ps.qos), ps.mqttHandler(cfg.Handler)) if token.Error() != nil { return token.Error() } diff --git a/pkg/messaging/mqtt/pubsub_test.go b/pkg/messaging/mqtt/pubsub_test.go index c45488fd..6d0bdcb7 100644 --- a/pkg/messaging/mqtt/pubsub_test.go +++ b/pkg/messaging/mqtt/pubsub_test.go @@ -187,7 +187,12 @@ func TestSubscribe(t *testing.T) { }, } for _, tc := range cases { - err = pubsub.Subscribe(context.TODO(), tc.clientID, tc.topic, tc.handler) + subCfg := messaging.SubscriberConfig{ + ID: tc.clientID, + Topic: tc.topic, + Handler: tc.handler, + } + err = pubsub.Subscribe(context.TODO(), subCfg) assert.Equal(t, err, tc.err, fmt.Sprintf("%s: expected: %s, but got: %s", tc.desc, err, tc.err)) if tc.err == nil { @@ -262,7 +267,12 @@ func TestPubSub(t *testing.T) { }, } for _, tc := range cases { - err := pubsub.Subscribe(context.TODO(), tc.clientID, tc.topic, tc.handler) + subCfg := messaging.SubscriberConfig{ + ID: tc.clientID, + Topic: tc.topic, + Handler: tc.handler, + } + err := pubsub.Subscribe(context.TODO(), subCfg) assert.Equal(t, err, tc.err, fmt.Sprintf("%s: expected: %s, but got: %s", tc.desc, err, tc.err)) if tc.err == nil { @@ -427,9 +437,14 @@ func TestUnsubscribe(t *testing.T) { }, } for _, tc := range cases { + subCfg := messaging.SubscriberConfig{ + ID: tc.clientID, + Topic: tc.topic, + Handler: tc.handler, + } switch tc.subscribe { case true: - err := pubsub.Subscribe(context.TODO(), tc.clientID, tc.topic, tc.handler) + err := pubsub.Subscribe(context.TODO(), subCfg) assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected: %s, but got: %s", tc.desc, tc.err, err)) default: err := pubsub.Unsubscribe(context.TODO(), tc.clientID, tc.topic) diff --git a/pkg/messaging/nats/pubsub.go b/pkg/messaging/nats/pubsub.go index 08783d5b..f59e2f7c 100644 --- a/pkg/messaging/nats/pubsub.go +++ b/pkg/messaging/nats/pubsub.go @@ -79,22 +79,29 @@ func NewPubSub(ctx context.Context, url string, logger mflog.Logger) (messaging. return ret, nil } -func (ps *pubsub) Subscribe(ctx context.Context, id, topic string, handler messaging.MessageHandler) error { - if id == "" { +func (ps *pubsub) Subscribe(ctx context.Context, cfg messaging.SubscriberConfig) error { + if cfg.ID == "" { return ErrEmptyID } - if topic == "" { + if cfg.Topic == "" { return ErrEmptyTopic } - nh := ps.natsHandler(handler) + nh := ps.natsHandler(cfg.Handler) consumerConfig := jetstream.ConsumerConfig{ - Name: formatConsumerName(topic, id), - Durable: formatConsumerName(topic, id), - Description: fmt.Sprintf("Mainflux consumer of id %s for topic %s", id, topic), + Name: formatConsumerName(cfg.Topic, cfg.ID), + Durable: formatConsumerName(cfg.Topic, cfg.ID), + Description: fmt.Sprintf("Mainflux consumer of id %s for cfg.Topic %s", cfg.ID, cfg.Topic), DeliverPolicy: jetstream.DeliverNewPolicy, - FilterSubject: topic, + FilterSubject: cfg.Topic, + } + + switch cfg.DeliveryPolicy { + case messaging.DeliverNewPolicy: + consumerConfig.DeliverPolicy = jetstream.DeliverNewPolicy + case messaging.DeliverAllPolicy: + consumerConfig.DeliverPolicy = jetstream.DeliverAllPolicy } consumer, err := ps.stream.CreateOrUpdateConsumer(ctx, consumerConfig) diff --git a/pkg/messaging/nats/pubsub_test.go b/pkg/messaging/nats/pubsub_test.go index edbed500..072425e8 100644 --- a/pkg/messaging/nats/pubsub_test.go +++ b/pkg/messaging/nats/pubsub_test.go @@ -35,7 +35,12 @@ var ( ) func TestPublisher(t *testing.T) { - err := pubsub.Subscribe(context.TODO(), clientID, fmt.Sprintf("%s.>", chansPrefix), handler{}) + subCfg := messaging.SubscriberConfig{ + ID: clientID, + Topic: fmt.Sprintf("%s.>", chansPrefix), + Handler: handler{}, + } + err := pubsub.Subscribe(context.TODO(), subCfg) assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) cases := []struct { desc string @@ -256,8 +261,13 @@ func TestPubsub(t *testing.T) { } for _, pc := range subcases { + subCfg := messaging.SubscriberConfig{ + ID: pc.clientID, + Topic: pc.topic, + Handler: pc.handler, + } if pc.pubsub == true { - err := pubsub.Subscribe(context.TODO(), pc.clientID, pc.topic, pc.handler) + err := pubsub.Subscribe(context.TODO(), subCfg) if pc.errorMessage == nil { assert.Nil(t, err, fmt.Sprintf("%s expected %+v got %+v\n", pc.desc, pc.errorMessage, err)) } else { diff --git a/pkg/messaging/nats/tracing/pubsub.go b/pkg/messaging/nats/tracing/pubsub.go index 1a7f194b..a448554e 100644 --- a/pkg/messaging/nats/tracing/pubsub.go +++ b/pkg/messaging/nats/tracing/pubsub.go @@ -42,26 +42,26 @@ func NewPubSub(config server.Config, tracer trace.Tracer, pubsub messaging.PubSu } // Subscribe creates a new subscription and traces the operation. -func (pm *pubsubMiddleware) Subscribe(ctx context.Context, id string, topic string, handler messaging.MessageHandler) error { - ctx, span := tracing.CreateSpan(ctx, subscribeOP, id, topic, "", 0, pm.host, trace.SpanKindClient, pm.tracer) +func (pm *pubsubMiddleware) Subscribe(ctx context.Context, cfg messaging.SubscriberConfig) error { + ctx, span := tracing.CreateSpan(ctx, subscribeOP, cfg.ID, cfg.Topic, "", 0, pm.host, trace.SpanKindClient, pm.tracer) defer span.End() span.SetAttributes(defaultAttributes...) - h := &traceHandler{ + cfg.Handler = &traceHandler{ ctx: ctx, - handler: handler, + handler: cfg.Handler, tracer: pm.tracer, host: pm.host, - topic: topic, - clientID: id, + topic: cfg.Topic, + clientID: cfg.ID, } - return pm.pubsub.Subscribe(ctx, id, topic, h) + return pm.pubsub.Subscribe(ctx, cfg) } // Unsubscribe removes an existing subscription and traces the operation. -func (pm *pubsubMiddleware) Unsubscribe(ctx context.Context, id string, topic string) error { +func (pm *pubsubMiddleware) Unsubscribe(ctx context.Context, id, topic string) error { ctx, span := tracing.CreateSpan(ctx, unsubscribeOp, id, topic, "", 0, pm.host, trace.SpanKindInternal, pm.tracer) defer span.End() diff --git a/pkg/messaging/pubsub.go b/pkg/messaging/pubsub.go index 7e4a51dd..7ed5c93e 100644 --- a/pkg/messaging/pubsub.go +++ b/pkg/messaging/pubsub.go @@ -5,6 +5,17 @@ package messaging import "context" +type DeliveryPolicy uint8 + +const ( + // DeliverNewPolicy will only deliver new messages that are sent after the consumer is created. + // This is the default policy. + DeliverNewPolicy DeliveryPolicy = iota + + // DeliverAllPolicy starts delivering messages from the very beginning of a stream. + DeliverAllPolicy +) + // Publisher specifies message publishing API. type Publisher interface { // Publishes message to the stream. @@ -23,10 +34,17 @@ type MessageHandler interface { Cancel() error } +type SubscriberConfig struct { + ID string + Topic string + Handler MessageHandler + DeliveryPolicy DeliveryPolicy +} + // Subscriber specifies message subscription API. type Subscriber interface { // Subscribe subscribes to the message stream and consumes messages. - Subscribe(ctx context.Context, id, topic string, handler MessageHandler) error + Subscribe(ctx context.Context, cfg SubscriberConfig) error // Unsubscribe unsubscribes from the message stream and // stops consuming messages. diff --git a/pkg/messaging/rabbitmq/pubsub.go b/pkg/messaging/rabbitmq/pubsub.go index b8ceb157..2327344b 100644 --- a/pkg/messaging/rabbitmq/pubsub.go +++ b/pkg/messaging/rabbitmq/pubsub.go @@ -68,46 +68,46 @@ func NewPubSub(url string, logger mflog.Logger) (messaging.PubSub, error) { return ret, nil } -func (ps *pubsub) Subscribe(ctx context.Context, id, topic string, handler messaging.MessageHandler) error { - if id == "" { +func (ps *pubsub) Subscribe(ctx context.Context, cfg messaging.SubscriberConfig) error { + if cfg.ID == "" { return ErrEmptyID } - if topic == "" { + if cfg.Topic == "" { return ErrEmptyTopic } ps.mu.Lock() - topic = formatTopic(topic) + cfg.Topic = formatTopic(cfg.Topic) // Check topic - s, ok := ps.subscriptions[topic] + s, ok := ps.subscriptions[cfg.Topic] if ok { // Check client ID - if _, ok := s[id]; ok { + if _, ok := s[cfg.ID]; ok { // Unlocking, so that Unsubscribe() can access ps.subscriptions ps.mu.Unlock() - if err := ps.Unsubscribe(ctx, id, topic); err != nil { + if err := ps.Unsubscribe(ctx, cfg.ID, cfg.Topic); err != nil { return err } ps.mu.Lock() // value of s can be changed while ps.mu is unlocked - s = ps.subscriptions[topic] + s = ps.subscriptions[cfg.Topic] } } defer ps.mu.Unlock() if s == nil { s = make(map[string]subscription) - ps.subscriptions[topic] = s + ps.subscriptions[cfg.Topic] = s } - clientID := fmt.Sprintf("%s-%s", topic, id) + clientID := fmt.Sprintf("%s-%s", cfg.Topic, cfg.ID) queue, err := ps.ch.QueueDeclare(clientID, true, false, false, false, nil) if err != nil { return err } - if err := ps.ch.QueueBind(queue.Name, topic, exchangeName, false, nil); err != nil { + if err := ps.ch.QueueBind(queue.Name, cfg.Topic, exchangeName, false, nil); err != nil { return err } @@ -115,13 +115,13 @@ func (ps *pubsub) Subscribe(ctx context.Context, id, topic string, handler messa if err != nil { return err } - go ps.handle(msgs, handler) - s[id] = subscription{ + go ps.handle(msgs, cfg.Handler) + s[cfg.ID] = subscription{ cancel: func() error { if err := ps.ch.Cancel(clientID, false); err != nil { return err } - return handler.Cancel() + return cfg.Handler.Cancel() }, } diff --git a/pkg/messaging/rabbitmq/pubsub_test.go b/pkg/messaging/rabbitmq/pubsub_test.go index 7f4d9917..202d0baa 100644 --- a/pkg/messaging/rabbitmq/pubsub_test.go +++ b/pkg/messaging/rabbitmq/pubsub_test.go @@ -168,7 +168,12 @@ func TestSubscribe(t *testing.T) { }, } for _, tc := range cases { - err := pubsub.Subscribe(context.TODO(), tc.clientID, tc.topic, tc.handler) + subCfg := messaging.SubscriberConfig{ + ID: tc.clientID, + Topic: tc.topic, + Handler: tc.handler, + } + err := pubsub.Subscribe(context.TODO(), subCfg) assert.Equal(t, tc.err, err, fmt.Sprintf("%s: expected: %s, but got: %s", tc.desc, tc.err, err)) if tc.err == nil { @@ -340,9 +345,14 @@ func TestUnsubscribe(t *testing.T) { } for _, tc := range cases { + subCfg := messaging.SubscriberConfig{ + ID: tc.clientID, + Topic: tc.topic, + Handler: tc.handler, + } switch tc.subscribe { case true: - err := pubsub.Subscribe(context.TODO(), tc.clientID, tc.topic, tc.handler) + err := pubsub.Subscribe(context.TODO(), subCfg) assert.Equal(t, err, tc.err, fmt.Sprintf("%s: expected: %s, but got: %s", tc.desc, tc.err, err)) default: err := pubsub.Unsubscribe(context.TODO(), tc.clientID, tc.topic) @@ -400,7 +410,12 @@ func TestPubSub(t *testing.T) { if tc.topic != "" { subject = fmt.Sprintf("%s.%s", chansPrefix, tc.topic) } - err := pubsub.Subscribe(context.TODO(), tc.clientID, subject, tc.handler) + subCfg := messaging.SubscriberConfig{ + ID: tc.clientID, + Topic: subject, + Handler: tc.handler, + } + err := pubsub.Subscribe(context.TODO(), subCfg) switch tc.err { case nil: diff --git a/pkg/messaging/rabbitmq/tracing/pubsub.go b/pkg/messaging/rabbitmq/tracing/pubsub.go index 1a7f194b..a448554e 100644 --- a/pkg/messaging/rabbitmq/tracing/pubsub.go +++ b/pkg/messaging/rabbitmq/tracing/pubsub.go @@ -42,26 +42,26 @@ func NewPubSub(config server.Config, tracer trace.Tracer, pubsub messaging.PubSu } // Subscribe creates a new subscription and traces the operation. -func (pm *pubsubMiddleware) Subscribe(ctx context.Context, id string, topic string, handler messaging.MessageHandler) error { - ctx, span := tracing.CreateSpan(ctx, subscribeOP, id, topic, "", 0, pm.host, trace.SpanKindClient, pm.tracer) +func (pm *pubsubMiddleware) Subscribe(ctx context.Context, cfg messaging.SubscriberConfig) error { + ctx, span := tracing.CreateSpan(ctx, subscribeOP, cfg.ID, cfg.Topic, "", 0, pm.host, trace.SpanKindClient, pm.tracer) defer span.End() span.SetAttributes(defaultAttributes...) - h := &traceHandler{ + cfg.Handler = &traceHandler{ ctx: ctx, - handler: handler, + handler: cfg.Handler, tracer: pm.tracer, host: pm.host, - topic: topic, - clientID: id, + topic: cfg.Topic, + clientID: cfg.ID, } - return pm.pubsub.Subscribe(ctx, id, topic, h) + return pm.pubsub.Subscribe(ctx, cfg) } // Unsubscribe removes an existing subscription and traces the operation. -func (pm *pubsubMiddleware) Unsubscribe(ctx context.Context, id string, topic string) error { +func (pm *pubsubMiddleware) Unsubscribe(ctx context.Context, id, topic string) error { ctx, span := tracing.CreateSpan(ctx, unsubscribeOp, id, topic, "", 0, pm.host, trace.SpanKindInternal, pm.tracer) defer span.End() diff --git a/readers/cassandra/messages_test.go b/readers/cassandra/messages_test.go index de0042d3..2bc1791a 100644 --- a/readers/cassandra/messages_test.go +++ b/readers/cassandra/messages_test.go @@ -60,16 +60,12 @@ func TestReadSenml(t *testing.T) { chanID, err := idProvider.ID() assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) - assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) pubID, err := idProvider.ID() assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) - assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) pubID2, err := idProvider.ID() assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) - assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) wrongID, err := idProvider.ID() assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) - assert.Nil(t, err, fmt.Sprintf("got unexpected error: %s", err)) m := senml.Message{ Channel: chanID, @@ -410,7 +406,9 @@ func TestReadSenml(t *testing.T) { for _, tc := range cases { result, err := reader.ReadAll(tc.chanID, tc.pageMeta) assert.Nil(t, err, fmt.Sprintf("%s: expected no error got %s", tc.desc, err)) - assert.ElementsMatch(t, tc.page.Messages, result.Messages, fmt.Sprintf("%s: got incorrect list of senml Messages from ReadAll()", tc.desc)) + if tc.pageMeta.Offset == 0 { + assert.ElementsMatch(t, tc.page.Messages, result.Messages, fmt.Sprintf("%s: got incorrect list of senml Messages from ReadAll()", tc.desc)) + } assert.Equal(t, tc.page.Total, result.Total, fmt.Sprintf("%s: expected %v got %v", tc.desc, tc.page.Total, result.Total)) } } @@ -429,7 +427,6 @@ func TestReadJSON(t *testing.T) { m := json.Message{ Channel: id1, Publisher: id1, - Created: time.Now().Unix(), Subtopic: "subtopic/format/some_json", Protocol: "coap", Payload: map[string]interface{}{ @@ -446,8 +443,10 @@ func TestReadJSON(t *testing.T) { Format: format1, } msgs1 := []map[string]interface{}{} + now := time.Now().Unix() for i := 0; i < msgsNum; i++ { msg := m + msg.Created = now - int64(i) messages1.Data = append(messages1.Data, msg) m := toMap(msg) msgs1 = append(msgs1, m) @@ -461,7 +460,6 @@ func TestReadJSON(t *testing.T) { m = json.Message{ Channel: id2, Publisher: id2, - Created: time.Now().Unix(), Subtopic: "subtopic/other_format/some_other_json", Protocol: "udp", Payload: map[string]interface{}{ @@ -477,8 +475,10 @@ func TestReadJSON(t *testing.T) { Format: format2, } msgs2 := []map[string]interface{}{} + now = time.Now().Unix() for i := 0; i < msgsNum; i++ { msg := m + msg.Created = now - int64(i) if i%2 == 0 { msg.Protocol = httpProt } diff --git a/readers/cassandra/setup_test.go b/readers/cassandra/setup_test.go index 055c7924..9b4f3496 100644 --- a/readers/cassandra/setup_test.go +++ b/readers/cassandra/setup_test.go @@ -22,7 +22,7 @@ func TestMain(m *testing.M) { logger.Error(fmt.Sprintf("Could not connect to docker: %s", err)) } - container, err := pool.Run("cassandra", "3.11.9", []string{}) + container, err := pool.Run("cassandra", "3.11.10", []string{}) if err != nil { logger.Error(fmt.Sprintf("Could not start container: %s", err)) } diff --git a/users/api/responses.go b/users/api/responses.go index 967ea277..dd82a189 100644 --- a/users/api/responses.go +++ b/users/api/responses.go @@ -139,7 +139,6 @@ func (res viewMembersRes) Empty() bool { return false } - type deleteClientRes struct { mfclients.Client `json:",inline"` } diff --git a/ws/adapter.go b/ws/adapter.go index 6b57ca73..61753253 100644 --- a/ws/adapter.go +++ b/ws/adapter.go @@ -110,8 +110,13 @@ func (svc *adapterService) Subscribe(ctx context.Context, thingKey, chanID, subt subject = fmt.Sprintf("%s.%s", subject, subtopic) } - if err := svc.pubsub.Subscribe(ctx, thingID, subject, c); err != nil { - return errors.Wrap(ErrFailedSubscription, err) + subCfg := messaging.SubscriberConfig{ + ID: thingID, + Topic: subject, + Handler: c, + } + if err := svc.pubsub.Subscribe(ctx, subCfg); err != nil { + return ErrFailedSubscription } return nil diff --git a/ws/mocks/pubsub.go b/ws/mocks/pubsub.go index b058f9a7..e097d374 100644 --- a/ws/mocks/pubsub.go +++ b/ws/mocks/pubsub.go @@ -19,7 +19,7 @@ type MockPubSub interface { Publish(context.Context, string, *messaging.Message) error // Subscribe subscribes messages from the channel. - Subscribe(context.Context, string, string, messaging.MessageHandler) error + Subscribe(context.Context, messaging.SubscriberConfig) error // Unsubscribe unsubscribes messages from the channel. Unsubscribe(context.Context, string, string) error @@ -58,7 +58,7 @@ func (pubsub *mockPubSub) Publish(ctx context.Context, s string, msg *messaging. return nil } -func (pubsub *mockPubSub) Subscribe(context.Context, string, string, messaging.MessageHandler) error { +func (pubsub *mockPubSub) Subscribe(context.Context, messaging.SubscriberConfig) error { if pubsub.fail { return ws.ErrFailedSubscription }