1
0
mirror of https://github.com/mainflux/mainflux.git synced 2025-04-27 13:48:49 +08:00
Mainflux.mainflux/ws/adapter.go

72 lines
1.8 KiB
Go
Raw Normal View History

2018-05-11 01:00:10 +02:00
// Package ws contains the domain concept definitions needed to support
// Mainflux ws adapter service functionality.
MF-171 - Extract websocket adapter as separate service (#188) * Add websocket adapter Add websocket adapter with basic logging and metrics middleware. Add publish and subscribe to NATS subjects using websocket. Add websocket handshake authorization over mainflux manager. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Add test for websocket publish Add test for websocket adapter's publish method. Add dependecy injected logger to adapter. Remove unnecessary manager client dependency from adapter. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Add websocket library in dependencies Add gorilla/websocket dependency in dep toml and lock file. Add dependency in vendor dir. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Add direct websocket connection Add direct websocket connection. While messages are still beeing published over NATS, they are not received over src.ws subject in WebSocket adapter. Instead messages are sent directly over websocket connection. Add swagger file for WebSocket handshake endpoint. Update ReadMe to reference new swagger file. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Refactore websocket adapter code Extract listen part from handshake. Update WebSocket adapter code. Fix subscribe to NATS subject, so that it is independent from other adapters. Remove message base64 encoding from response.. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Remove connection management from websocket adapter Align notion of channel with NATS topic. Remove connection management from adapter logic. Add log messages to adapter's transport layer. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Update NATS subjects and add subscriber interface Update NATS subject name to channel.<channel_id>. Add and implement subscriber interface. Implement subscriber interface in adapter. Update readme to use new make <service_name> command. Refactor code. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Add basic tests for broadcast and subscribe Improve mock implementation of NATS pubsub. Add multiple basic test cases for broadcast and subscribe methods. Add logging for subscribe. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Add listen method test Add listen method test and refactor existing tests. Refactor listen method in adapter. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Move broadcast method to message broker interface Refactor broadcast method to receive send message callback and message that needs to be sent. Update tests accordingly. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Update pubsub API Remove listen method from public API. Move listen call to subscribe implementation. Update domain pubsub API in project root. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Add connection error handlers to adapter API Update publish and subscribe API to receive connection error handler. Update tests accordingly. Handle NATS connection error. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Update logs Replace go-kit logger with custom mainflux logger. Update log messages where needed.. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Refactor web socket adapter Remove MessagePubSub interface. Remove unnecessary callbacks. Add channels to web socket adapter implementation. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Add API layer tests Update existing tests and add API layer tests. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Update docs with web socket related data Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Refactor web socket adapter Update subscription struct, and refactor listen and broadcast methods. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Add version and metrics endpoint to ws adapter Update version to 0.2.0. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Fix race condition bug in ws adapter test Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Rename listen to broadcast and broadcast to listen Switch names between listen and broadcast methods. Move channel structure to service.go. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com>
2018-04-18 13:09:01 +02:00
package ws
import (
2018-05-11 01:00:10 +02:00
"errors"
MF-171 - Extract websocket adapter as separate service (#188) * Add websocket adapter Add websocket adapter with basic logging and metrics middleware. Add publish and subscribe to NATS subjects using websocket. Add websocket handshake authorization over mainflux manager. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Add test for websocket publish Add test for websocket adapter's publish method. Add dependecy injected logger to adapter. Remove unnecessary manager client dependency from adapter. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Add websocket library in dependencies Add gorilla/websocket dependency in dep toml and lock file. Add dependency in vendor dir. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Add direct websocket connection Add direct websocket connection. While messages are still beeing published over NATS, they are not received over src.ws subject in WebSocket adapter. Instead messages are sent directly over websocket connection. Add swagger file for WebSocket handshake endpoint. Update ReadMe to reference new swagger file. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Refactore websocket adapter code Extract listen part from handshake. Update WebSocket adapter code. Fix subscribe to NATS subject, so that it is independent from other adapters. Remove message base64 encoding from response.. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Remove connection management from websocket adapter Align notion of channel with NATS topic. Remove connection management from adapter logic. Add log messages to adapter's transport layer. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Update NATS subjects and add subscriber interface Update NATS subject name to channel.<channel_id>. Add and implement subscriber interface. Implement subscriber interface in adapter. Update readme to use new make <service_name> command. Refactor code. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Add basic tests for broadcast and subscribe Improve mock implementation of NATS pubsub. Add multiple basic test cases for broadcast and subscribe methods. Add logging for subscribe. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Add listen method test Add listen method test and refactor existing tests. Refactor listen method in adapter. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Move broadcast method to message broker interface Refactor broadcast method to receive send message callback and message that needs to be sent. Update tests accordingly. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Update pubsub API Remove listen method from public API. Move listen call to subscribe implementation. Update domain pubsub API in project root. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Add connection error handlers to adapter API Update publish and subscribe API to receive connection error handler. Update tests accordingly. Handle NATS connection error. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Update logs Replace go-kit logger with custom mainflux logger. Update log messages where needed.. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Refactor web socket adapter Remove MessagePubSub interface. Remove unnecessary callbacks. Add channels to web socket adapter implementation. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Add API layer tests Update existing tests and add API layer tests. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Update docs with web socket related data Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Refactor web socket adapter Update subscription struct, and refactor listen and broadcast methods. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Add version and metrics endpoint to ws adapter Update version to 0.2.0. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Fix race condition bug in ws adapter test Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Rename listen to broadcast and broadcast to listen Switch names between listen and broadcast methods. Move channel structure to service.go. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com>
2018-04-18 13:09:01 +02:00
"github.com/mainflux/mainflux"
broker "github.com/nats-io/go-nats"
)
2018-05-11 01:00:10 +02:00
var (
// ErrFailedMessagePublish indicates that message publishing failed.
ErrFailedMessagePublish = errors.New("failed to publish message")
// ErrFailedSubscription indicates that client couldn't subscribe to specified channel.
ErrFailedSubscription = errors.New("failed to subscribe to a channel")
// ErrFailedConnection indicates that service couldn't connect to message broker.
ErrFailedConnection = errors.New("failed to connect to message broker")
)
// Service specifies web socket service API.
type Service interface {
mainflux.MessagePublisher
// Subscribes to channel with specified id.
Subscribe(uint64, Channel) error
2018-05-11 01:00:10 +02:00
}
// Channel is used for receiving and sending messages.
type Channel struct {
Messages chan mainflux.RawMessage
Closed chan bool
}
// Close channel and stop message transfer.
func (channel Channel) Close() {
close(channel.Messages)
close(channel.Closed)
}
MF-171 - Extract websocket adapter as separate service (#188) * Add websocket adapter Add websocket adapter with basic logging and metrics middleware. Add publish and subscribe to NATS subjects using websocket. Add websocket handshake authorization over mainflux manager. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Add test for websocket publish Add test for websocket adapter's publish method. Add dependecy injected logger to adapter. Remove unnecessary manager client dependency from adapter. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Add websocket library in dependencies Add gorilla/websocket dependency in dep toml and lock file. Add dependency in vendor dir. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Add direct websocket connection Add direct websocket connection. While messages are still beeing published over NATS, they are not received over src.ws subject in WebSocket adapter. Instead messages are sent directly over websocket connection. Add swagger file for WebSocket handshake endpoint. Update ReadMe to reference new swagger file. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Refactore websocket adapter code Extract listen part from handshake. Update WebSocket adapter code. Fix subscribe to NATS subject, so that it is independent from other adapters. Remove message base64 encoding from response.. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Remove connection management from websocket adapter Align notion of channel with NATS topic. Remove connection management from adapter logic. Add log messages to adapter's transport layer. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Update NATS subjects and add subscriber interface Update NATS subject name to channel.<channel_id>. Add and implement subscriber interface. Implement subscriber interface in adapter. Update readme to use new make <service_name> command. Refactor code. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Add basic tests for broadcast and subscribe Improve mock implementation of NATS pubsub. Add multiple basic test cases for broadcast and subscribe methods. Add logging for subscribe. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Add listen method test Add listen method test and refactor existing tests. Refactor listen method in adapter. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Move broadcast method to message broker interface Refactor broadcast method to receive send message callback and message that needs to be sent. Update tests accordingly. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Update pubsub API Remove listen method from public API. Move listen call to subscribe implementation. Update domain pubsub API in project root. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Add connection error handlers to adapter API Update publish and subscribe API to receive connection error handler. Update tests accordingly. Handle NATS connection error. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Update logs Replace go-kit logger with custom mainflux logger. Update log messages where needed.. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Refactor web socket adapter Remove MessagePubSub interface. Remove unnecessary callbacks. Add channels to web socket adapter implementation. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Add API layer tests Update existing tests and add API layer tests. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Update docs with web socket related data Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Refactor web socket adapter Update subscription struct, and refactor listen and broadcast methods. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Add version and metrics endpoint to ws adapter Update version to 0.2.0. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Fix race condition bug in ws adapter test Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Rename listen to broadcast and broadcast to listen Switch names between listen and broadcast methods. Move channel structure to service.go. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com>
2018-04-18 13:09:01 +02:00
var _ Service = (*adapterService)(nil)
type adapterService struct {
pubsub Service
}
2018-05-11 01:00:10 +02:00
// New instantiates the WS adapter implementation.
MF-171 - Extract websocket adapter as separate service (#188) * Add websocket adapter Add websocket adapter with basic logging and metrics middleware. Add publish and subscribe to NATS subjects using websocket. Add websocket handshake authorization over mainflux manager. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Add test for websocket publish Add test for websocket adapter's publish method. Add dependecy injected logger to adapter. Remove unnecessary manager client dependency from adapter. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Add websocket library in dependencies Add gorilla/websocket dependency in dep toml and lock file. Add dependency in vendor dir. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Add direct websocket connection Add direct websocket connection. While messages are still beeing published over NATS, they are not received over src.ws subject in WebSocket adapter. Instead messages are sent directly over websocket connection. Add swagger file for WebSocket handshake endpoint. Update ReadMe to reference new swagger file. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Refactore websocket adapter code Extract listen part from handshake. Update WebSocket adapter code. Fix subscribe to NATS subject, so that it is independent from other adapters. Remove message base64 encoding from response.. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Remove connection management from websocket adapter Align notion of channel with NATS topic. Remove connection management from adapter logic. Add log messages to adapter's transport layer. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Update NATS subjects and add subscriber interface Update NATS subject name to channel.<channel_id>. Add and implement subscriber interface. Implement subscriber interface in adapter. Update readme to use new make <service_name> command. Refactor code. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Add basic tests for broadcast and subscribe Improve mock implementation of NATS pubsub. Add multiple basic test cases for broadcast and subscribe methods. Add logging for subscribe. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Add listen method test Add listen method test and refactor existing tests. Refactor listen method in adapter. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Move broadcast method to message broker interface Refactor broadcast method to receive send message callback and message that needs to be sent. Update tests accordingly. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Update pubsub API Remove listen method from public API. Move listen call to subscribe implementation. Update domain pubsub API in project root. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Add connection error handlers to adapter API Update publish and subscribe API to receive connection error handler. Update tests accordingly. Handle NATS connection error. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Update logs Replace go-kit logger with custom mainflux logger. Update log messages where needed.. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Refactor web socket adapter Remove MessagePubSub interface. Remove unnecessary callbacks. Add channels to web socket adapter implementation. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Add API layer tests Update existing tests and add API layer tests. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Update docs with web socket related data Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Refactor web socket adapter Update subscription struct, and refactor listen and broadcast methods. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Add version and metrics endpoint to ws adapter Update version to 0.2.0. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Fix race condition bug in ws adapter test Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Rename listen to broadcast and broadcast to listen Switch names between listen and broadcast methods. Move channel structure to service.go. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com>
2018-04-18 13:09:01 +02:00
func New(pubsub Service) Service {
return &adapterService{pubsub}
}
func (as *adapterService) Publish(msg mainflux.RawMessage) error {
if err := as.pubsub.Publish(msg); err != nil {
switch err {
case broker.ErrConnectionClosed, broker.ErrInvalidConnection:
return ErrFailedConnection
default:
return ErrFailedMessagePublish
}
}
return nil
}
func (as *adapterService) Subscribe(chanID uint64, channel Channel) error {
MF-171 - Extract websocket adapter as separate service (#188) * Add websocket adapter Add websocket adapter with basic logging and metrics middleware. Add publish and subscribe to NATS subjects using websocket. Add websocket handshake authorization over mainflux manager. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Add test for websocket publish Add test for websocket adapter's publish method. Add dependecy injected logger to adapter. Remove unnecessary manager client dependency from adapter. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Add websocket library in dependencies Add gorilla/websocket dependency in dep toml and lock file. Add dependency in vendor dir. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Add direct websocket connection Add direct websocket connection. While messages are still beeing published over NATS, they are not received over src.ws subject in WebSocket adapter. Instead messages are sent directly over websocket connection. Add swagger file for WebSocket handshake endpoint. Update ReadMe to reference new swagger file. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Refactore websocket adapter code Extract listen part from handshake. Update WebSocket adapter code. Fix subscribe to NATS subject, so that it is independent from other adapters. Remove message base64 encoding from response.. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Remove connection management from websocket adapter Align notion of channel with NATS topic. Remove connection management from adapter logic. Add log messages to adapter's transport layer. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Update NATS subjects and add subscriber interface Update NATS subject name to channel.<channel_id>. Add and implement subscriber interface. Implement subscriber interface in adapter. Update readme to use new make <service_name> command. Refactor code. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Add basic tests for broadcast and subscribe Improve mock implementation of NATS pubsub. Add multiple basic test cases for broadcast and subscribe methods. Add logging for subscribe. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Add listen method test Add listen method test and refactor existing tests. Refactor listen method in adapter. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Move broadcast method to message broker interface Refactor broadcast method to receive send message callback and message that needs to be sent. Update tests accordingly. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Update pubsub API Remove listen method from public API. Move listen call to subscribe implementation. Update domain pubsub API in project root. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Add connection error handlers to adapter API Update publish and subscribe API to receive connection error handler. Update tests accordingly. Handle NATS connection error. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Update logs Replace go-kit logger with custom mainflux logger. Update log messages where needed.. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Refactor web socket adapter Remove MessagePubSub interface. Remove unnecessary callbacks. Add channels to web socket adapter implementation. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Add API layer tests Update existing tests and add API layer tests. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Update docs with web socket related data Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Refactor web socket adapter Update subscription struct, and refactor listen and broadcast methods. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Add version and metrics endpoint to ws adapter Update version to 0.2.0. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Fix race condition bug in ws adapter test Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Rename listen to broadcast and broadcast to listen Switch names between listen and broadcast methods. Move channel structure to service.go. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com>
2018-04-18 13:09:01 +02:00
if err := as.pubsub.Subscribe(chanID, channel); err != nil {
return ErrFailedSubscription
}
return nil
}