mirror of
https://github.com/mainflux/mainflux.git
synced 2025-04-27 13:48:49 +08:00

* Add websocket adapter Add websocket adapter with basic logging and metrics middleware. Add publish and subscribe to NATS subjects using websocket. Add websocket handshake authorization over mainflux manager. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Add test for websocket publish Add test for websocket adapter's publish method. Add dependecy injected logger to adapter. Remove unnecessary manager client dependency from adapter. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Add websocket library in dependencies Add gorilla/websocket dependency in dep toml and lock file. Add dependency in vendor dir. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Add direct websocket connection Add direct websocket connection. While messages are still beeing published over NATS, they are not received over src.ws subject in WebSocket adapter. Instead messages are sent directly over websocket connection. Add swagger file for WebSocket handshake endpoint. Update ReadMe to reference new swagger file. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Refactore websocket adapter code Extract listen part from handshake. Update WebSocket adapter code. Fix subscribe to NATS subject, so that it is independent from other adapters. Remove message base64 encoding from response.. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Remove connection management from websocket adapter Align notion of channel with NATS topic. Remove connection management from adapter logic. Add log messages to adapter's transport layer. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Update NATS subjects and add subscriber interface Update NATS subject name to channel.<channel_id>. Add and implement subscriber interface. Implement subscriber interface in adapter. Update readme to use new make <service_name> command. Refactor code. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Add basic tests for broadcast and subscribe Improve mock implementation of NATS pubsub. Add multiple basic test cases for broadcast and subscribe methods. Add logging for subscribe. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Add listen method test Add listen method test and refactor existing tests. Refactor listen method in adapter. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Move broadcast method to message broker interface Refactor broadcast method to receive send message callback and message that needs to be sent. Update tests accordingly. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Update pubsub API Remove listen method from public API. Move listen call to subscribe implementation. Update domain pubsub API in project root. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Add connection error handlers to adapter API Update publish and subscribe API to receive connection error handler. Update tests accordingly. Handle NATS connection error. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Update logs Replace go-kit logger with custom mainflux logger. Update log messages where needed.. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Refactor web socket adapter Remove MessagePubSub interface. Remove unnecessary callbacks. Add channels to web socket adapter implementation. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Add API layer tests Update existing tests and add API layer tests. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Update docs with web socket related data Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Refactor web socket adapter Update subscription struct, and refactor listen and broadcast methods. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Add version and metrics endpoint to ws adapter Update version to 0.2.0. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Fix race condition bug in ws adapter test Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com> * Rename listen to broadcast and broadcast to listen Switch names between listen and broadcast methods. Move channel structure to service.go. Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com>
292 lines
9.5 KiB
Go
292 lines
9.5 KiB
Go
// Copyright 2013 The Gorilla WebSocket Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style
|
|
// license that can be found in the LICENSE file.
|
|
|
|
package websocket
|
|
|
|
import (
|
|
"bufio"
|
|
"errors"
|
|
"net"
|
|
"net/http"
|
|
"net/url"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
// HandshakeError describes an error with the handshake from the peer.
|
|
type HandshakeError struct {
|
|
message string
|
|
}
|
|
|
|
func (e HandshakeError) Error() string { return e.message }
|
|
|
|
// Upgrader specifies parameters for upgrading an HTTP connection to a
|
|
// WebSocket connection.
|
|
type Upgrader struct {
|
|
// HandshakeTimeout specifies the duration for the handshake to complete.
|
|
HandshakeTimeout time.Duration
|
|
|
|
// ReadBufferSize and WriteBufferSize specify I/O buffer sizes. If a buffer
|
|
// size is zero, then buffers allocated by the HTTP server are used. The
|
|
// I/O buffer sizes do not limit the size of the messages that can be sent
|
|
// or received.
|
|
ReadBufferSize, WriteBufferSize int
|
|
|
|
// Subprotocols specifies the server's supported protocols in order of
|
|
// preference. If this field is set, then the Upgrade method negotiates a
|
|
// subprotocol by selecting the first match in this list with a protocol
|
|
// requested by the client.
|
|
Subprotocols []string
|
|
|
|
// Error specifies the function for generating HTTP error responses. If Error
|
|
// is nil, then http.Error is used to generate the HTTP response.
|
|
Error func(w http.ResponseWriter, r *http.Request, status int, reason error)
|
|
|
|
// CheckOrigin returns true if the request Origin header is acceptable. If
|
|
// CheckOrigin is nil, the host in the Origin header must not be set or
|
|
// must match the host of the request.
|
|
CheckOrigin func(r *http.Request) bool
|
|
|
|
// EnableCompression specify if the server should attempt to negotiate per
|
|
// message compression (RFC 7692). Setting this value to true does not
|
|
// guarantee that compression will be supported. Currently only "no context
|
|
// takeover" modes are supported.
|
|
EnableCompression bool
|
|
}
|
|
|
|
func (u *Upgrader) returnError(w http.ResponseWriter, r *http.Request, status int, reason string) (*Conn, error) {
|
|
err := HandshakeError{reason}
|
|
if u.Error != nil {
|
|
u.Error(w, r, status, err)
|
|
} else {
|
|
w.Header().Set("Sec-Websocket-Version", "13")
|
|
http.Error(w, http.StatusText(status), status)
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
// checkSameOrigin returns true if the origin is not set or is equal to the request host.
|
|
func checkSameOrigin(r *http.Request) bool {
|
|
origin := r.Header["Origin"]
|
|
if len(origin) == 0 {
|
|
return true
|
|
}
|
|
u, err := url.Parse(origin[0])
|
|
if err != nil {
|
|
return false
|
|
}
|
|
return u.Host == r.Host
|
|
}
|
|
|
|
func (u *Upgrader) selectSubprotocol(r *http.Request, responseHeader http.Header) string {
|
|
if u.Subprotocols != nil {
|
|
clientProtocols := Subprotocols(r)
|
|
for _, serverProtocol := range u.Subprotocols {
|
|
for _, clientProtocol := range clientProtocols {
|
|
if clientProtocol == serverProtocol {
|
|
return clientProtocol
|
|
}
|
|
}
|
|
}
|
|
} else if responseHeader != nil {
|
|
return responseHeader.Get("Sec-Websocket-Protocol")
|
|
}
|
|
return ""
|
|
}
|
|
|
|
// Upgrade upgrades the HTTP server connection to the WebSocket protocol.
|
|
//
|
|
// The responseHeader is included in the response to the client's upgrade
|
|
// request. Use the responseHeader to specify cookies (Set-Cookie) and the
|
|
// application negotiated subprotocol (Sec-Websocket-Protocol).
|
|
//
|
|
// If the upgrade fails, then Upgrade replies to the client with an HTTP error
|
|
// response.
|
|
func (u *Upgrader) Upgrade(w http.ResponseWriter, r *http.Request, responseHeader http.Header) (*Conn, error) {
|
|
if r.Method != "GET" {
|
|
return u.returnError(w, r, http.StatusMethodNotAllowed, "websocket: not a websocket handshake: request method is not GET")
|
|
}
|
|
|
|
if _, ok := responseHeader["Sec-Websocket-Extensions"]; ok {
|
|
return u.returnError(w, r, http.StatusInternalServerError, "websocket: application specific 'Sec-Websocket-Extensions' headers are unsupported")
|
|
}
|
|
|
|
if !tokenListContainsValue(r.Header, "Connection", "upgrade") {
|
|
return u.returnError(w, r, http.StatusBadRequest, "websocket: not a websocket handshake: 'upgrade' token not found in 'Connection' header")
|
|
}
|
|
|
|
if !tokenListContainsValue(r.Header, "Upgrade", "websocket") {
|
|
return u.returnError(w, r, http.StatusBadRequest, "websocket: not a websocket handshake: 'websocket' token not found in 'Upgrade' header")
|
|
}
|
|
|
|
if !tokenListContainsValue(r.Header, "Sec-Websocket-Version", "13") {
|
|
return u.returnError(w, r, http.StatusBadRequest, "websocket: unsupported version: 13 not found in 'Sec-Websocket-Version' header")
|
|
}
|
|
|
|
checkOrigin := u.CheckOrigin
|
|
if checkOrigin == nil {
|
|
checkOrigin = checkSameOrigin
|
|
}
|
|
if !checkOrigin(r) {
|
|
return u.returnError(w, r, http.StatusForbidden, "websocket: 'Origin' header value not allowed")
|
|
}
|
|
|
|
challengeKey := r.Header.Get("Sec-Websocket-Key")
|
|
if challengeKey == "" {
|
|
return u.returnError(w, r, http.StatusBadRequest, "websocket: not a websocket handshake: `Sec-Websocket-Key' header is missing or blank")
|
|
}
|
|
|
|
subprotocol := u.selectSubprotocol(r, responseHeader)
|
|
|
|
// Negotiate PMCE
|
|
var compress bool
|
|
if u.EnableCompression {
|
|
for _, ext := range parseExtensions(r.Header) {
|
|
if ext[""] != "permessage-deflate" {
|
|
continue
|
|
}
|
|
compress = true
|
|
break
|
|
}
|
|
}
|
|
|
|
var (
|
|
netConn net.Conn
|
|
err error
|
|
)
|
|
|
|
h, ok := w.(http.Hijacker)
|
|
if !ok {
|
|
return u.returnError(w, r, http.StatusInternalServerError, "websocket: response does not implement http.Hijacker")
|
|
}
|
|
var brw *bufio.ReadWriter
|
|
netConn, brw, err = h.Hijack()
|
|
if err != nil {
|
|
return u.returnError(w, r, http.StatusInternalServerError, err.Error())
|
|
}
|
|
|
|
if brw.Reader.Buffered() > 0 {
|
|
netConn.Close()
|
|
return nil, errors.New("websocket: client sent data before handshake is complete")
|
|
}
|
|
|
|
c := newConnBRW(netConn, true, u.ReadBufferSize, u.WriteBufferSize, brw)
|
|
c.subprotocol = subprotocol
|
|
|
|
if compress {
|
|
c.newCompressionWriter = compressNoContextTakeover
|
|
c.newDecompressionReader = decompressNoContextTakeover
|
|
}
|
|
|
|
p := c.writeBuf[:0]
|
|
p = append(p, "HTTP/1.1 101 Switching Protocols\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Accept: "...)
|
|
p = append(p, computeAcceptKey(challengeKey)...)
|
|
p = append(p, "\r\n"...)
|
|
if c.subprotocol != "" {
|
|
p = append(p, "Sec-Websocket-Protocol: "...)
|
|
p = append(p, c.subprotocol...)
|
|
p = append(p, "\r\n"...)
|
|
}
|
|
if compress {
|
|
p = append(p, "Sec-Websocket-Extensions: permessage-deflate; server_no_context_takeover; client_no_context_takeover\r\n"...)
|
|
}
|
|
for k, vs := range responseHeader {
|
|
if k == "Sec-Websocket-Protocol" {
|
|
continue
|
|
}
|
|
for _, v := range vs {
|
|
p = append(p, k...)
|
|
p = append(p, ": "...)
|
|
for i := 0; i < len(v); i++ {
|
|
b := v[i]
|
|
if b <= 31 {
|
|
// prevent response splitting.
|
|
b = ' '
|
|
}
|
|
p = append(p, b)
|
|
}
|
|
p = append(p, "\r\n"...)
|
|
}
|
|
}
|
|
p = append(p, "\r\n"...)
|
|
|
|
// Clear deadlines set by HTTP server.
|
|
netConn.SetDeadline(time.Time{})
|
|
|
|
if u.HandshakeTimeout > 0 {
|
|
netConn.SetWriteDeadline(time.Now().Add(u.HandshakeTimeout))
|
|
}
|
|
if _, err = netConn.Write(p); err != nil {
|
|
netConn.Close()
|
|
return nil, err
|
|
}
|
|
if u.HandshakeTimeout > 0 {
|
|
netConn.SetWriteDeadline(time.Time{})
|
|
}
|
|
|
|
return c, nil
|
|
}
|
|
|
|
// Upgrade upgrades the HTTP server connection to the WebSocket protocol.
|
|
//
|
|
// This function is deprecated, use websocket.Upgrader instead.
|
|
//
|
|
// The application is responsible for checking the request origin before
|
|
// calling Upgrade. An example implementation of the same origin policy is:
|
|
//
|
|
// if req.Header.Get("Origin") != "http://"+req.Host {
|
|
// http.Error(w, "Origin not allowed", 403)
|
|
// return
|
|
// }
|
|
//
|
|
// If the endpoint supports subprotocols, then the application is responsible
|
|
// for negotiating the protocol used on the connection. Use the Subprotocols()
|
|
// function to get the subprotocols requested by the client. Use the
|
|
// Sec-Websocket-Protocol response header to specify the subprotocol selected
|
|
// by the application.
|
|
//
|
|
// The responseHeader is included in the response to the client's upgrade
|
|
// request. Use the responseHeader to specify cookies (Set-Cookie) and the
|
|
// negotiated subprotocol (Sec-Websocket-Protocol).
|
|
//
|
|
// The connection buffers IO to the underlying network connection. The
|
|
// readBufSize and writeBufSize parameters specify the size of the buffers to
|
|
// use. Messages can be larger than the buffers.
|
|
//
|
|
// If the request is not a valid WebSocket handshake, then Upgrade returns an
|
|
// error of type HandshakeError. Applications should handle this error by
|
|
// replying to the client with an HTTP error response.
|
|
func Upgrade(w http.ResponseWriter, r *http.Request, responseHeader http.Header, readBufSize, writeBufSize int) (*Conn, error) {
|
|
u := Upgrader{ReadBufferSize: readBufSize, WriteBufferSize: writeBufSize}
|
|
u.Error = func(w http.ResponseWriter, r *http.Request, status int, reason error) {
|
|
// don't return errors to maintain backwards compatibility
|
|
}
|
|
u.CheckOrigin = func(r *http.Request) bool {
|
|
// allow all connections by default
|
|
return true
|
|
}
|
|
return u.Upgrade(w, r, responseHeader)
|
|
}
|
|
|
|
// Subprotocols returns the subprotocols requested by the client in the
|
|
// Sec-Websocket-Protocol header.
|
|
func Subprotocols(r *http.Request) []string {
|
|
h := strings.TrimSpace(r.Header.Get("Sec-Websocket-Protocol"))
|
|
if h == "" {
|
|
return nil
|
|
}
|
|
protocols := strings.Split(h, ",")
|
|
for i := range protocols {
|
|
protocols[i] = strings.TrimSpace(protocols[i])
|
|
}
|
|
return protocols
|
|
}
|
|
|
|
// IsWebSocketUpgrade returns true if the client requested upgrade to the
|
|
// WebSocket protocol.
|
|
func IsWebSocketUpgrade(r *http.Request) bool {
|
|
return tokenListContainsValue(r.Header, "Connection", "upgrade") &&
|
|
tokenListContainsValue(r.Header, "Upgrade", "websocket")
|
|
}
|