1
0
mirror of https://github.com/mainflux/mainflux.git synced 2025-04-26 13:48:53 +08:00

Add debug logs to the WS adapter (#848)

Signed-off-by: Aleksandar Novakovic <anovakovic01@gmail.com>
This commit is contained in:
Aleksandar Novaković 2019-09-11 15:34:04 +02:00 committed by Drasko DRASKOVIC
parent 97327ab05f
commit 4870119270
4 changed files with 32 additions and 10 deletions

View File

@ -87,7 +87,7 @@ func main() {
defer thingsCloser.Close()
cc := thingsapi.NewClient(conn, thingsTracer, cfg.thingsTimeout)
pubsub := nats.New(nc)
pubsub := nats.New(nc, logger)
svc := newService(pubsub, logger)
errs := make(chan error, 2)

View File

@ -103,5 +103,6 @@ func (as *adapterService) Subscribe(chanID, subtopic string, channel *Channel) e
if err := as.pubsub.Subscribe(chanID, subtopic, channel); err != nil {
return ErrFailedSubscription
}
return nil
}

View File

@ -78,7 +78,7 @@ func handshake(svc ws.Service) http.HandlerFunc {
w.WriteHeader(http.StatusForbidden)
return
default:
logger.Warn(fmt.Sprintf("Failed to authorize: %s", err))
logger.Warn(fmt.Sprintf("Failed to authorize: %s", err.Error()))
w.WriteHeader(http.StatusServiceUnavailable)
return
}
@ -88,14 +88,14 @@ func handshake(svc ws.Service) http.HandlerFunc {
channelParts := channelPartRegExp.FindStringSubmatch(r.RequestURI)
if len(channelParts) < 2 {
logger.Warn(fmt.Sprintf("Empty channel id or malformed url"))
logger.Warn("Empty channel id or malformed url")
w.WriteHeader(http.StatusBadRequest)
return
}
sub.subtopic, err = parseSubtopic(channelParts[2])
if err != nil {
logger.Warn(fmt.Sprintf("Empty channel id or malformed url"))
logger.Warn("Empty channel id or malformed url")
w.WriteHeader(http.StatusBadRequest)
return
}
@ -108,12 +108,17 @@ func handshake(svc ws.Service) http.HandlerFunc {
}
sub.conn = conn
logger.Debug(fmt.Sprintf("Successfully upgraded communication to WS on channel %s", sub.chanID))
sub.channel = ws.NewChannel()
if err := svc.Subscribe(sub.chanID, sub.subtopic, sub.channel); err != nil {
logger.Warn(fmt.Sprintf("Failed to subscribe to NATS subject: %s", err))
conn.Close()
return
}
logger.Debug(fmt.Sprintf("Successfully subscribed to NATS channel %s", sub.chanID))
go sub.listen()
// Start listening for messages from NATS.
@ -126,8 +131,7 @@ func parseSubtopic(subtopic string) (string, error) {
return subtopic, nil
}
var err error
subtopic, err = url.QueryUnescape(subtopic)
subtopic, err := url.QueryUnescape(subtopic)
if err != nil {
return "", errMalformedSubtopic
}
@ -158,6 +162,7 @@ func authorize(r *http.Request) (subscription, error) {
if authKey == "" {
authKeys := bone.GetQuery(r, "authorization")
if len(authKeys) == 0 {
logger.Debug("Missing authorization key.")
return subscription{}, things.ErrUnauthorizedAccess
}
authKey = authKeys[0]
@ -176,6 +181,7 @@ func authorize(r *http.Request) (subscription, error) {
}
return subscription{}, err
}
logger.Debug(fmt.Sprintf("Successfully authorized client %s on channel %s", id.GetValue(), chanID))
sub := subscription{
pubID: id.GetValue(),
@ -210,6 +216,7 @@ func (sub subscription) broadcast(svc ws.Service, contentType string) {
for {
_, payload, err := sub.conn.ReadMessage()
if websocket.IsUnexpectedCloseError(err) {
logger.Debug(fmt.Sprintf("Closing WS connection: %s", err.Error()))
sub.channel.Close()
return
}
@ -233,6 +240,8 @@ func (sub subscription) broadcast(svc ws.Service, contentType string) {
return
}
}
logger.Debug(fmt.Sprintf("Successfully published message to the channel %s", sub.chanID))
}
}
@ -246,5 +255,7 @@ func (sub subscription) listen() {
if err := sub.conn.WriteMessage(format, msg.Payload); err != nil {
logger.Warn(fmt.Sprintf("Failed to broadcast message to thing: %s", err))
}
logger.Debug("Wrote message successfully")
}
}

View File

@ -16,6 +16,7 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/mainflux/mainflux"
log "github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/ws"
broker "github.com/nats-io/go-nats"
)
@ -29,12 +30,13 @@ const (
var _ ws.Service = (*natsPubSub)(nil)
type natsPubSub struct {
nc *broker.Conn
cb *gobreaker.CircuitBreaker
nc *broker.Conn
cb *gobreaker.CircuitBreaker
logger log.Logger
}
// New instantiates NATS message publisher.
func New(nc *broker.Conn) ws.Service {
func New(nc *broker.Conn, logger log.Logger) ws.Service {
st := gobreaker.Settings{
Name: "NATS",
ReadyToTrip: func(counts gobreaker.Counts) bool {
@ -43,7 +45,11 @@ func New(nc *broker.Conn) ws.Service {
},
}
cb := gobreaker.NewCircuitBreaker(st)
return &natsPubSub{nc, cb}
return &natsPubSub{
nc: nc,
cb: cb,
logger: logger,
}
}
func (pubsub *natsPubSub) fmtSubject(chanID, subtopic string) string {
@ -69,14 +75,18 @@ func (pubsub *natsPubSub) Subscribe(chanID, subtopic string, channel *ws.Channel
sub, err := pubsub.nc.Subscribe(pubsub.fmtSubject(chanID, subtopic), func(msg *broker.Msg) {
if msg == nil {
pubsub.logger.Warn("Received nil message")
return
}
var rawMsg mainflux.RawMessage
if err := proto.Unmarshal(msg.Data, &rawMsg); err != nil {
pubsub.logger.Warn(fmt.Sprintf("Failed to deserialize received message: %s", err.Error()))
return
}
pubsub.logger.Debug(fmt.Sprintf("Successfully received message from NATS from channel %s", rawMsg.GetChannel()))
// Sends message to messages channel
channel.Send(rawMsg)
})