From bc41c32cf58c8448370b566c1949e9715ee41d39 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ivan=20Milo=C5=A1evi=C4=87?= Date: Fri, 9 Oct 2020 01:55:09 +0200 Subject: [PATCH] update mproxy version (#1251) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Ivan Milošević --- cmd/mqtt/main.go | 5 +- go.mod | 2 +- go.sum | 4 +- .../mainflux/mproxy/pkg/mqtt/mqtt.go | 34 ++++++++++- .../mainflux/mproxy/pkg/session/client.go | 3 + .../mainflux/mproxy/pkg/session/session.go | 18 +++--- .../github.com/mainflux/mproxy/pkg/tls/tls.go | 56 +++++++++++++++++++ .../mproxy/pkg/websocket/websocket.go | 35 ++++++++++-- vendor/modules.txt | 3 +- 9 files changed, 137 insertions(+), 23 deletions(-) create mode 100644 vendor/github.com/mainflux/mproxy/pkg/tls/tls.go diff --git a/cmd/mqtt/main.go b/cmd/mqtt/main.go index dd5f2483..ba9d499f 100644 --- a/cmd/mqtt/main.go +++ b/cmd/mqtt/main.go @@ -296,13 +296,12 @@ func proxyMQTT(cfg config, logger mflog.Logger, handler session.Handler, errs ch target := fmt.Sprintf("%s:%s", cfg.mqttTargetHost, cfg.mqttTargetPort) mp := mp.New(address, target, handler, logger) - errs <- mp.Proxy() + errs <- mp.Listen() } func proxyWS(cfg config, logger mflog.Logger, handler session.Handler, errs chan error) { target := fmt.Sprintf("%s:%s", cfg.httpTargetHost, cfg.httpTargetPort) wp := ws.New(target, cfg.httpTargetPath, "ws", handler, logger) http.Handle("/mqtt", wp.Handler()) - p := fmt.Sprintf(":%s", cfg.httpPort) - errs <- http.ListenAndServe(p, nil) + errs <- wp.Listen(cfg.httpPort) } diff --git a/go.mod b/go.mod index 68c4abad..7c6bbf79 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/jmoiron/sqlx v1.2.1-0.20190826204134-d7d95172beb5 github.com/kr/text v0.2.0 // indirect github.com/lib/pq v1.7.0 - github.com/mainflux/mproxy v0.2.1 + github.com/mainflux/mproxy v0.2.2 github.com/mainflux/senml v1.5.0 github.com/mitchellh/mapstructure v1.1.2 github.com/nats-io/nats.go v1.10.0 diff --git a/go.sum b/go.sum index 51672e63..cd53119a 100644 --- a/go.sum +++ b/go.sum @@ -497,8 +497,8 @@ github.com/mainflux/mproxy v0.1.8 h1:vFpSbSTu327runxYnXoJOg2MrJ2etcMPUF1rPTrZbTk github.com/mainflux/mproxy v0.1.8/go.mod h1:NnhrUDytvV4pCI5LDuet86/WrymrUaX0/x1tlUHTKhU= github.com/mainflux/mproxy v0.2.1-0.20200603122422-b08e1fa2cf5c h1:vLMYmnA4W2GzUwpSAnRAB0eBJ9USQpNUo0+WTaVB9uI= github.com/mainflux/mproxy v0.2.1-0.20200603122422-b08e1fa2cf5c/go.mod h1:lFD56bDgNTslCLoTlZfo2DyQbkQOnoxEXmbE4VumRm4= -github.com/mainflux/mproxy v0.2.1 h1:Qu+r9GwXO3zhWEXPHYbW/TopDbMDQlfJx2VrmVEElJc= -github.com/mainflux/mproxy v0.2.1/go.mod h1:+T8h6ZupYPl6Lx9A0hqpcUQtcLyOBdzm/lfkjvPfGXo= +github.com/mainflux/mproxy v0.2.2 h1:Kq8x5iWINlZejlZatDdgIsM427oRWKr1gw7mrkh5j9M= +github.com/mainflux/mproxy v0.2.2/go.mod h1:+T8h6ZupYPl6Lx9A0hqpcUQtcLyOBdzm/lfkjvPfGXo= github.com/mainflux/senml v1.0.0 h1:oLS5aBhvdHjgQ8kfq3jX7yD+DaquhvpyvIWNsPil3X0= github.com/mainflux/senml v1.0.0/go.mod h1:g9i8pj4WMs29KkUpXivbe/PP0qJd1kt3b1CF77S8A3s= github.com/mainflux/senml v1.0.1 h1:qWKIGeUe7YEygM3xZcJ9Lbq+DHuT8V23dz1hgAYkYEY= diff --git a/vendor/github.com/mainflux/mproxy/pkg/mqtt/mqtt.go b/vendor/github.com/mainflux/mproxy/pkg/mqtt/mqtt.go index 913992bf..52fe4e9d 100644 --- a/vendor/github.com/mainflux/mproxy/pkg/mqtt/mqtt.go +++ b/vendor/github.com/mainflux/mproxy/pkg/mqtt/mqtt.go @@ -1,6 +1,7 @@ package mqtt import ( + "crypto/tls" "fmt" "io" "net" @@ -8,6 +9,11 @@ import ( "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/pkg/errors" "github.com/mainflux/mproxy/pkg/session" + mptls "github.com/mainflux/mproxy/pkg/tls" +) + +var ( + errCreateListener = errors.New("failed creating TLS listener") ) // Proxy is main MQTT proxy struct @@ -51,15 +57,21 @@ func (p Proxy) handle(inbound net.Conn) { } defer p.close(outbound) - s := session.New(inbound, outbound, p.handler, p.logger) + clientCert, err := mptls.ClientCert(inbound) + if err != nil { + p.logger.Error("Failed to get client certificate: " + err.Error()) + return + } + + s := session.New(inbound, outbound, p.handler, p.logger, clientCert) if err = s.Stream(); !errors.Contains(err, io.EOF) { p.logger.Warn("Broken connection for client: " + s.Client.ID + " with error: " + err.Error()) } } -// Proxy of the server, this will block. -func (p Proxy) Proxy() error { +// Listen of the server, this will block. +func (p Proxy) Listen() error { l, err := net.Listen("tcp", p.address) if err != nil { return err @@ -73,6 +85,22 @@ func (p Proxy) Proxy() error { return nil } +// ListenTLS - version of Listen with TLS encryption +func (p Proxy) ListenTLS(tlsCfg *tls.Config) error { + + l, err := tls.Listen("tcp", p.address, tlsCfg) + if err != nil { + return errors.Wrap(errCreateListener, err) + } + defer l.Close() + + // Acceptor loop + p.accept(l) + + p.logger.Info("Server Exiting...") + return nil +} + func (p Proxy) close(conn net.Conn) { if err := conn.Close(); err != nil { p.logger.Warn(fmt.Sprintf("Error closing connection %s", err.Error())) diff --git a/vendor/github.com/mainflux/mproxy/pkg/session/client.go b/vendor/github.com/mainflux/mproxy/pkg/session/client.go index 5b827b2f..e6a2465d 100644 --- a/vendor/github.com/mainflux/mproxy/pkg/session/client.go +++ b/vendor/github.com/mainflux/mproxy/pkg/session/client.go @@ -1,8 +1,11 @@ package session +import "crypto/x509" + // Client stores MQTT client data. type Client struct { ID string Username string Password []byte + Cert x509.Certificate } diff --git a/vendor/github.com/mainflux/mproxy/pkg/session/session.go b/vendor/github.com/mainflux/mproxy/pkg/session/session.go index 1030b534..48c3653a 100644 --- a/vendor/github.com/mainflux/mproxy/pkg/session/session.go +++ b/vendor/github.com/mainflux/mproxy/pkg/session/session.go @@ -1,6 +1,7 @@ package session import ( + "crypto/x509" "net" "github.com/eclipse/paho.mqtt.golang/packets" @@ -14,8 +15,8 @@ const ( ) var ( - errBroker = errors.New("error between mProxy and MQTT broker") - errClient = errors.New("error between mProxy and MQTT client") + errBroker = errors.New("failed proxying from MQTT client to MQTT broker") + errClient = errors.New("failed proxying from MQTT broker to MQTT client") ) type direction int @@ -30,12 +31,15 @@ type Session struct { } // New creates a new Session. -func New(inbound, outbound net.Conn, handler Handler, logger logger.Logger) *Session { +func New(inbound, outbound net.Conn, handler Handler, logger logger.Logger, cert x509.Certificate) *Session { return &Session{ logger: logger, inbound: inbound, outbound: outbound, handler: handler, + Client: Client{ + Cert: cert, + }, } } @@ -88,11 +92,9 @@ func (s *Session) stream(dir direction, r, w net.Conn, errs chan error) { func (s *Session) authorize(pkt packets.ControlPacket) error { switch p := pkt.(type) { case *packets.ConnectPacket: - s.Client = Client{ - ID: p.ClientIdentifier, - Username: p.Username, - Password: p.Password, - } + s.Client.ID = p.ClientIdentifier + s.Client.Username = p.Username + s.Client.Password = p.Password if err := s.handler.AuthConnect(&s.Client); err != nil { return err } diff --git a/vendor/github.com/mainflux/mproxy/pkg/tls/tls.go b/vendor/github.com/mainflux/mproxy/pkg/tls/tls.go new file mode 100644 index 00000000..e707e682 --- /dev/null +++ b/vendor/github.com/mainflux/mproxy/pkg/tls/tls.go @@ -0,0 +1,56 @@ +package tls + +import ( + "crypto/tls" + "crypto/x509" + "io/ioutil" + "net" + + "github.com/mainflux/mainflux/pkg/errors" +) + +var ( + errTLSdetails = errors.New("failed to get TLS details of connection") + errParseRoot = errors.New("failed to parse root certificate") +) + +// LoadTLSCfg return a TLS configuration that can be used in TLS servers +func LoadTLSCfg(ca, crt, key string) (*tls.Config, error) { + caCertPEM, err := ioutil.ReadFile(ca) + if err != nil { + return nil, err + } + + roots := x509.NewCertPool() + if ok := roots.AppendCertsFromPEM(caCertPEM); !ok { + return nil, errParseRoot + } + + cert, err := tls.LoadX509KeyPair(crt, key) + if err != nil { + return nil, err + } + return &tls.Config{ + Certificates: []tls.Certificate{cert}, + ClientAuth: tls.RequireAndVerifyClientCert, + ClientCAs: roots, + }, nil +} + +// ClientCert returns client certificate +func ClientCert(conn net.Conn) (x509.Certificate, error) { + switch connVal := conn.(type) { + case *tls.Conn: + if err := connVal.Handshake(); err != nil { + return x509.Certificate{}, err + } + state := connVal.ConnectionState() + if state.Version == 0 { + return x509.Certificate{}, errTLSdetails + } + cert := *state.PeerCertificates[0] + return cert, nil + default: + return x509.Certificate{}, nil + } +} diff --git a/vendor/github.com/mainflux/mproxy/pkg/websocket/websocket.go b/vendor/github.com/mainflux/mproxy/pkg/websocket/websocket.go index 30850c06..f542ac06 100644 --- a/vendor/github.com/mainflux/mproxy/pkg/websocket/websocket.go +++ b/vendor/github.com/mainflux/mproxy/pkg/websocket/websocket.go @@ -1,6 +1,8 @@ package websocket import ( + "crypto/tls" + "fmt" "net/http" "net/url" "time" @@ -8,6 +10,7 @@ import ( "github.com/gorilla/websocket" "github.com/mainflux/mainflux/logger" "github.com/mainflux/mproxy/pkg/session" + mptls "github.com/mainflux/mproxy/pkg/tls" ) // Proxy represents WS Proxy. @@ -19,7 +22,7 @@ type Proxy struct { logger logger.Logger } -// New - creates new HTTP proxy +// New - creates new WS proxy func New(target, path, scheme string, event session.Handler, logger logger.Logger) *Proxy { return &Proxy{ target: target, @@ -34,14 +37,14 @@ var upgrader = websocket.Upgrader{ // Timeout for WS upgrade request handshake HandshakeTimeout: 10 * time.Second, // Paho JS client expecting header Sec-WebSocket-Protocol:mqtt in Upgrade response during handshake. - Subprotocols: []string{"mqtt"}, + Subprotocols: []string{"mqttv3.1", "mqtt"}, // Allow CORS CheckOrigin: func(r *http.Request) bool { return true }, } -// Handler - proxies HTTP traffic +// Handler - proxies WS traffic func (p Proxy) Handler() http.Handler { return p.handle() } @@ -74,7 +77,7 @@ func (p Proxy) pass(in *websocket.Conn) { srv, _, err := dialer.Dial(url.String(), nil) if err != nil { - p.logger.Error("Unable to connect to broker, reason: " + err.Error()) + p.logger.Error("Unable to connect to broker: " + err.Error()) return } @@ -85,8 +88,30 @@ func (p Proxy) pass(in *websocket.Conn) { defer s.Close() defer c.Close() - session := session.New(c, s, p.event, p.logger) + clientCert, err := mptls.ClientCert(in.UnderlyingConn()) + if err != nil { + p.logger.Error("Failed to get client certificate: " + err.Error()) + return + } + + session := session.New(c, s, p.event, p.logger, clientCert) err = session.Stream() errc <- err p.logger.Warn("Broken connection for client: " + session.Client.ID + " with error: " + err.Error()) } + +// Listen of the server +func (p Proxy) Listen(wsPort string) error { + port := fmt.Sprintf(":%s", wsPort) + return http.ListenAndServe(port, nil) +} + +// ListenTLS - version of Listen with TLS encryption +func (p Proxy) ListenTLS(tlsCfg *tls.Config, crt, key, wssPort string) error { + port := fmt.Sprintf(":%s", wssPort) + server := &http.Server{ + Addr: port, + TLSConfig: tlsCfg, + } + return server.ListenAndServeTLS(crt, key) +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 84f86724..7b75c7a3 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -170,10 +170,11 @@ github.com/lib/pq/oid github.com/lib/pq/scram # github.com/magiconair/properties v1.8.1 github.com/magiconair/properties -# github.com/mainflux/mproxy v0.2.1 +# github.com/mainflux/mproxy v0.2.2 ## explicit github.com/mainflux/mproxy/pkg/mqtt github.com/mainflux/mproxy/pkg/session +github.com/mainflux/mproxy/pkg/tls github.com/mainflux/mproxy/pkg/websocket # github.com/mainflux/senml v1.5.0 ## explicit