mirror of
https://github.com/mainflux/mainflux.git
synced 2025-04-26 13:48:53 +08:00
NOISSUE - Raise code coverage in ws adapter (#242)
* Fix failed subscription handling in ws adapter Fix unsubscribing bug in ws adapter. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Add subscription fail and publish fail test cases Update mock implementation to receive publish error in order to support these test cases. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Update mainflux version to 0.2.3 Update project version and load tests version to 0.2.3. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Update version endpoint response format Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com>
This commit is contained in:
parent
58f3c735a0
commit
aea7db14b7
@ -1,7 +1,7 @@
|
||||
enablePlugins(GatlingPlugin)
|
||||
|
||||
name := "load-test"
|
||||
version := "0.2.2"
|
||||
version := "0.2.3"
|
||||
|
||||
scalaVersion := "2.12.4"
|
||||
|
||||
|
@ -5,17 +5,17 @@ import (
|
||||
"net/http"
|
||||
)
|
||||
|
||||
const version string = "0.2.2"
|
||||
const version string = "0.2.3"
|
||||
|
||||
type response struct {
|
||||
Version string
|
||||
Service string
|
||||
Service string `json:"service"`
|
||||
Version string `json:"version"`
|
||||
}
|
||||
|
||||
// Version exposes an HTTP handler for retrieving service version.
|
||||
func Version(service string) http.HandlerFunc {
|
||||
return http.HandlerFunc(func(rw http.ResponseWriter, _ *http.Request) {
|
||||
res := response{Version: version, Service: service}
|
||||
res := response{service, version}
|
||||
|
||||
data, _ := json.Marshal(res)
|
||||
|
||||
|
@ -6,6 +6,7 @@ import (
|
||||
|
||||
"github.com/mainflux/mainflux/ws"
|
||||
"github.com/mainflux/mainflux/ws/mocks"
|
||||
broker "github.com/nats-io/go-nats"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/mainflux/mainflux"
|
||||
@ -29,7 +30,7 @@ var (
|
||||
|
||||
func newService() ws.Service {
|
||||
subs := map[string]ws.Channel{chanID: channel}
|
||||
pubsub := mocks.NewService(subs)
|
||||
pubsub := mocks.NewService(subs, broker.ErrInvalidMsg)
|
||||
return ws.New(pubsub)
|
||||
}
|
||||
|
||||
|
@ -69,9 +69,9 @@ func handshake(svc ws.Service) http.HandlerFunc {
|
||||
// Subscribe to channel
|
||||
channel := ws.Channel{make(chan mainflux.RawMessage), make(chan bool)}
|
||||
sub.channel = channel
|
||||
if err = svc.Subscribe(sub.chanID, sub.channel); err != nil {
|
||||
if err := svc.Subscribe(sub.chanID, sub.channel); err != nil {
|
||||
logger.Warn(fmt.Sprintf("Failed to subscribe to NATS subject: %s", err))
|
||||
w.WriteHeader(http.StatusExpectationFailed)
|
||||
conn.Close()
|
||||
return
|
||||
}
|
||||
go sub.listen()
|
||||
@ -137,6 +137,7 @@ func (sub subscription) broadcast(svc ws.Service) {
|
||||
if err := svc.Publish(msg); err != nil {
|
||||
logger.Warn(fmt.Sprintf("Failed to publish message to NATS: %s", err))
|
||||
if err == ws.ErrFailedConnection {
|
||||
sub.conn.Close()
|
||||
sub.channel.Closed <- true
|
||||
return
|
||||
}
|
||||
|
@ -15,6 +15,7 @@ import (
|
||||
"github.com/mainflux/mainflux/ws"
|
||||
"github.com/mainflux/mainflux/ws/api"
|
||||
"github.com/mainflux/mainflux/ws/mocks"
|
||||
broker "github.com/nats-io/go-nats"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
@ -31,7 +32,7 @@ var (
|
||||
|
||||
func newService() ws.Service {
|
||||
subs := map[string]ws.Channel{chanID: channel}
|
||||
pubsub := mocks.NewService(subs)
|
||||
pubsub := mocks.NewService(subs, broker.ErrConnectionClosed)
|
||||
return ws.New(pubsub)
|
||||
}
|
||||
|
||||
@ -88,11 +89,14 @@ func TestHandshake(t *testing.T) {
|
||||
header bool
|
||||
token string
|
||||
status int
|
||||
msg []byte
|
||||
}{
|
||||
{"connect and send message", chanID, true, token, http.StatusSwitchingProtocols},
|
||||
{"connect with invalid token", chanID, true, "", http.StatusForbidden},
|
||||
{"connect with invalid channel id", "1", true, token, http.StatusNotFound},
|
||||
{"connect and send message with token as query parameter", chanID, false, token, http.StatusSwitchingProtocols},
|
||||
{"connect and send message", chanID, true, token, http.StatusSwitchingProtocols, msg},
|
||||
{"connect to non-existent channel", "123e4567-e89b-12d3-a456-000000000042", true, token, http.StatusSwitchingProtocols, []byte{}},
|
||||
{"connect with invalid token", chanID, true, "", http.StatusForbidden, []byte{}},
|
||||
{"connect with invalid channel id", "1", true, token, http.StatusNotFound, []byte{}},
|
||||
{"connect and send message with token as query parameter", chanID, false, token, http.StatusSwitchingProtocols, msg},
|
||||
{"connect and send message that cannot be published", chanID, true, token, http.StatusSwitchingProtocols, []byte{}},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
@ -101,7 +105,7 @@ func TestHandshake(t *testing.T) {
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
err = conn.WriteMessage(websocket.TextMessage, msg)
|
||||
err = conn.WriteMessage(websocket.TextMessage, tc.msg)
|
||||
assert.Nil(t, err, fmt.Sprintf("%s: unexpected error %s\n", tc.desc, err))
|
||||
}
|
||||
}
|
||||
|
18
ws/channel_test.go
Normal file
18
ws/channel_test.go
Normal file
@ -0,0 +1,18 @@
|
||||
package ws_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/mainflux/mainflux"
|
||||
"github.com/mainflux/mainflux/ws"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestClose(t *testing.T) {
|
||||
channel := ws.Channel{make(chan mainflux.RawMessage), make(chan bool)}
|
||||
channel.Close()
|
||||
_, closed := <-channel.Closed
|
||||
_, messagesClosed := <-channel.Messages
|
||||
assert.False(t, closed, "channel closed stayed open")
|
||||
assert.False(t, messagesClosed, "channel messages stayed open")
|
||||
}
|
@ -3,23 +3,23 @@ package mocks
|
||||
import (
|
||||
"github.com/mainflux/mainflux"
|
||||
"github.com/mainflux/mainflux/ws"
|
||||
broker "github.com/nats-io/go-nats"
|
||||
)
|
||||
|
||||
var _ ws.Service = (*mockService)(nil)
|
||||
|
||||
type mockService struct {
|
||||
subscriptions map[string]ws.Channel
|
||||
pubError error
|
||||
}
|
||||
|
||||
// NewService returns mock message publisher.
|
||||
func NewService(subs map[string]ws.Channel) ws.Service {
|
||||
return mockService{subs}
|
||||
func NewService(subs map[string]ws.Channel, pubError error) ws.Service {
|
||||
return mockService{subs, pubError}
|
||||
}
|
||||
|
||||
func (svc mockService) Publish(msg mainflux.RawMessage) error {
|
||||
if len(msg.Payload) == 0 {
|
||||
return broker.ErrInvalidMsg
|
||||
return svc.pubError
|
||||
}
|
||||
svc.subscriptions[msg.Channel].Messages <- msg
|
||||
return nil
|
||||
|
@ -59,13 +59,20 @@ func (pubsub *natsPubSub) Subscribe(chanID string, channel ws.Channel) error {
|
||||
return
|
||||
}
|
||||
|
||||
// Prevents sending message to closed channel
|
||||
select {
|
||||
case channel.Messages <- rawMsg:
|
||||
case <-channel.Closed:
|
||||
sub.Unsubscribe()
|
||||
channel.Close()
|
||||
}
|
||||
})
|
||||
|
||||
// Check if subscription should be closed
|
||||
go func() {
|
||||
<-channel.Closed
|
||||
sub.Unsubscribe()
|
||||
channel.Close()
|
||||
}()
|
||||
|
||||
return err
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user