diff --git a/cmd/ws/main.go b/cmd/ws/main.go index 0bc4a661..bab00e4c 100644 --- a/cmd/ws/main.go +++ b/cmd/ws/main.go @@ -87,7 +87,7 @@ func main() { defer thingsCloser.Close() cc := thingsapi.NewClient(conn, thingsTracer, cfg.thingsTimeout) - pubsub := nats.New(nc) + pubsub := nats.New(nc, logger) svc := newService(pubsub, logger) errs := make(chan error, 2) diff --git a/ws/adapter.go b/ws/adapter.go index 4173d9fe..185aed2f 100644 --- a/ws/adapter.go +++ b/ws/adapter.go @@ -103,5 +103,6 @@ func (as *adapterService) Subscribe(chanID, subtopic string, channel *Channel) e if err := as.pubsub.Subscribe(chanID, subtopic, channel); err != nil { return ErrFailedSubscription } + return nil } diff --git a/ws/api/transport.go b/ws/api/transport.go index a1a7d108..9a315442 100644 --- a/ws/api/transport.go +++ b/ws/api/transport.go @@ -78,7 +78,7 @@ func handshake(svc ws.Service) http.HandlerFunc { w.WriteHeader(http.StatusForbidden) return default: - logger.Warn(fmt.Sprintf("Failed to authorize: %s", err)) + logger.Warn(fmt.Sprintf("Failed to authorize: %s", err.Error())) w.WriteHeader(http.StatusServiceUnavailable) return } @@ -88,14 +88,14 @@ func handshake(svc ws.Service) http.HandlerFunc { channelParts := channelPartRegExp.FindStringSubmatch(r.RequestURI) if len(channelParts) < 2 { - logger.Warn(fmt.Sprintf("Empty channel id or malformed url")) + logger.Warn("Empty channel id or malformed url") w.WriteHeader(http.StatusBadRequest) return } sub.subtopic, err = parseSubtopic(channelParts[2]) if err != nil { - logger.Warn(fmt.Sprintf("Empty channel id or malformed url")) + logger.Warn("Empty channel id or malformed url") w.WriteHeader(http.StatusBadRequest) return } @@ -108,12 +108,17 @@ func handshake(svc ws.Service) http.HandlerFunc { } sub.conn = conn + logger.Debug(fmt.Sprintf("Successfully upgraded communication to WS on channel %s", sub.chanID)) + sub.channel = ws.NewChannel() if err := svc.Subscribe(sub.chanID, sub.subtopic, sub.channel); err != nil { logger.Warn(fmt.Sprintf("Failed to subscribe to NATS subject: %s", err)) conn.Close() return } + + logger.Debug(fmt.Sprintf("Successfully subscribed to NATS channel %s", sub.chanID)) + go sub.listen() // Start listening for messages from NATS. @@ -126,8 +131,7 @@ func parseSubtopic(subtopic string) (string, error) { return subtopic, nil } - var err error - subtopic, err = url.QueryUnescape(subtopic) + subtopic, err := url.QueryUnescape(subtopic) if err != nil { return "", errMalformedSubtopic } @@ -158,6 +162,7 @@ func authorize(r *http.Request) (subscription, error) { if authKey == "" { authKeys := bone.GetQuery(r, "authorization") if len(authKeys) == 0 { + logger.Debug("Missing authorization key.") return subscription{}, things.ErrUnauthorizedAccess } authKey = authKeys[0] @@ -176,6 +181,7 @@ func authorize(r *http.Request) (subscription, error) { } return subscription{}, err } + logger.Debug(fmt.Sprintf("Successfully authorized client %s on channel %s", id.GetValue(), chanID)) sub := subscription{ pubID: id.GetValue(), @@ -210,6 +216,7 @@ func (sub subscription) broadcast(svc ws.Service, contentType string) { for { _, payload, err := sub.conn.ReadMessage() if websocket.IsUnexpectedCloseError(err) { + logger.Debug(fmt.Sprintf("Closing WS connection: %s", err.Error())) sub.channel.Close() return } @@ -233,6 +240,8 @@ func (sub subscription) broadcast(svc ws.Service, contentType string) { return } } + + logger.Debug(fmt.Sprintf("Successfully published message to the channel %s", sub.chanID)) } } @@ -246,5 +255,7 @@ func (sub subscription) listen() { if err := sub.conn.WriteMessage(format, msg.Payload); err != nil { logger.Warn(fmt.Sprintf("Failed to broadcast message to thing: %s", err)) } + + logger.Debug("Wrote message successfully") } } diff --git a/ws/nats/publisher.go b/ws/nats/publisher.go index 17cef1ea..e654d5d4 100644 --- a/ws/nats/publisher.go +++ b/ws/nats/publisher.go @@ -16,6 +16,7 @@ import ( "github.com/gogo/protobuf/proto" "github.com/mainflux/mainflux" + log "github.com/mainflux/mainflux/logger" "github.com/mainflux/mainflux/ws" broker "github.com/nats-io/go-nats" ) @@ -29,12 +30,13 @@ const ( var _ ws.Service = (*natsPubSub)(nil) type natsPubSub struct { - nc *broker.Conn - cb *gobreaker.CircuitBreaker + nc *broker.Conn + cb *gobreaker.CircuitBreaker + logger log.Logger } // New instantiates NATS message publisher. -func New(nc *broker.Conn) ws.Service { +func New(nc *broker.Conn, logger log.Logger) ws.Service { st := gobreaker.Settings{ Name: "NATS", ReadyToTrip: func(counts gobreaker.Counts) bool { @@ -43,7 +45,11 @@ func New(nc *broker.Conn) ws.Service { }, } cb := gobreaker.NewCircuitBreaker(st) - return &natsPubSub{nc, cb} + return &natsPubSub{ + nc: nc, + cb: cb, + logger: logger, + } } func (pubsub *natsPubSub) fmtSubject(chanID, subtopic string) string { @@ -69,14 +75,18 @@ func (pubsub *natsPubSub) Subscribe(chanID, subtopic string, channel *ws.Channel sub, err := pubsub.nc.Subscribe(pubsub.fmtSubject(chanID, subtopic), func(msg *broker.Msg) { if msg == nil { + pubsub.logger.Warn("Received nil message") return } var rawMsg mainflux.RawMessage if err := proto.Unmarshal(msg.Data, &rawMsg); err != nil { + pubsub.logger.Warn(fmt.Sprintf("Failed to deserialize received message: %s", err.Error())) return } + pubsub.logger.Debug(fmt.Sprintf("Successfully received message from NATS from channel %s", rawMsg.GetChannel())) + // Sends message to messages channel channel.Send(rawMsg) })