mirror of
https://github.com/mainflux/mainflux.git
synced 2025-04-24 13:48:49 +08:00

* update WS service to mproxy Signed-off-by: SammyOina <sammyoina@gmail.com> * Fix brokerstracing initialization in main.go The commit fixes the initialization of the brokerstracing package in the main.go file. The target server configuration is now correctly passed to the NewPubSub function, ensuring that the correct host and port are used for the PubSub service. This resolves an issue where the wrong server configuration was being used, leading to incorrect behavior. Signed-off-by: SammyOina <sammyoina@gmail.com> * Fix goroutine issue in main.go The commit fixes a goroutine issue in the main.go file. Previously, the `hs.Start()` function was not being executed in a goroutine, which caused the program to block. This commit wraps the `hs.Start()` function call in a goroutine to ensure it runs concurrently. This resolves the issue and allows the program to continue execution without blocking. Signed-off-by: SammyOina <sammyoina@gmail.com> * update to current mproxy Signed-off-by: SammyOina <sammyoina@gmail.com> * update dependencies Signed-off-by: SammyOina <sammyoina@gmail.com> * Fix targetWSHost value in proxyWS function The targetWSHost value in the proxyWS function was empty. This commit fixes the issue by setting the targetWSHost value to "localhost". Additionally, the target variable in the proxyWS function is updated to include the "ws://" protocol prefix. Signed-off-by: SammyOina <sammyoina@gmail.com> * update deps Signed-off-by: SammyOina <sammyoina@gmail.com> * remove authorize from unsubscribe Signed-off-by: SammyOina <sammyoina@gmail.com> --------- Signed-off-by: SammyOina <sammyoina@gmail.com>
116 lines
3.3 KiB
Go
116 lines
3.3 KiB
Go
// Copyright (c) Mainflux
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
|
|
package ws
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
|
|
"github.com/mainflux/mainflux"
|
|
"github.com/mainflux/mainflux/pkg/errors"
|
|
"github.com/mainflux/mainflux/pkg/messaging"
|
|
)
|
|
|
|
const chansPrefix = "channels"
|
|
|
|
var (
|
|
// ErrFailedMessagePublish indicates that message publishing failed.
|
|
ErrFailedMessagePublish = errors.New("failed to publish message")
|
|
|
|
// ErrFailedSubscription indicates that client couldn't subscribe to specified channel.
|
|
ErrFailedSubscription = errors.New("failed to subscribe to a channel")
|
|
|
|
// ErrFailedUnsubscribe indicates that client couldn't unsubscribe from specified channel.
|
|
ErrFailedUnsubscribe = errors.New("failed to unsubscribe from a channel")
|
|
|
|
// ErrFailedConnection indicates that service couldn't connect to message broker.
|
|
ErrFailedConnection = errors.New("failed to connect to message broker")
|
|
|
|
// ErrInvalidConnection indicates that client couldn't subscribe to message broker.
|
|
ErrInvalidConnection = errors.New("nats: invalid connection")
|
|
|
|
// ErrUnauthorizedAccess indicates that client provided missing or invalid credentials.
|
|
ErrUnauthorizedAccess = errors.New("missing or invalid credentials provided")
|
|
|
|
// ErrEmptyTopic indicate absence of thingKey in the request.
|
|
ErrEmptyTopic = errors.New("empty topic")
|
|
|
|
// ErrEmptyID indicate absence of channelID in the request.
|
|
ErrEmptyID = errors.New("empty id")
|
|
)
|
|
|
|
// Service specifies web socket service API.
|
|
type Service interface {
|
|
// Subscribe subscribes message from the broker using the thingKey for authorization,
|
|
// and the channelID for subscription. Subtopic is optional.
|
|
// If the subscription is successful, nil is returned otherwise error is returned.
|
|
Subscribe(ctx context.Context, thingKey, chanID, subtopic string, client *Client) error
|
|
}
|
|
|
|
var _ Service = (*adapterService)(nil)
|
|
|
|
type adapterService struct {
|
|
auth mainflux.AuthzServiceClient
|
|
pubsub messaging.PubSub
|
|
}
|
|
|
|
// New instantiates the WS adapter implementation.
|
|
func New(auth mainflux.AuthzServiceClient, pubsub messaging.PubSub) Service {
|
|
return &adapterService{
|
|
auth: auth,
|
|
pubsub: pubsub,
|
|
}
|
|
}
|
|
|
|
func (svc *adapterService) Subscribe(ctx context.Context, thingKey, chanID, subtopic string, c *Client) error {
|
|
if chanID == "" || thingKey == "" {
|
|
return ErrUnauthorizedAccess
|
|
}
|
|
|
|
thingID, err := svc.authorize(ctx, thingKey, chanID, "subscribe")
|
|
if err != nil {
|
|
return ErrUnauthorizedAccess
|
|
}
|
|
|
|
c.id = thingID
|
|
|
|
subject := fmt.Sprintf("%s.%s", chansPrefix, chanID)
|
|
if subtopic != "" {
|
|
subject = fmt.Sprintf("%s.%s", subject, subtopic)
|
|
}
|
|
|
|
subCfg := messaging.SubscriberConfig{
|
|
ID: thingID,
|
|
Topic: subject,
|
|
Handler: c,
|
|
}
|
|
if err := svc.pubsub.Subscribe(ctx, subCfg); err != nil {
|
|
return ErrFailedSubscription
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// authorize checks if the thingKey is authorized to access the channel
|
|
// and returns the thingID if it is.
|
|
func (svc *adapterService) authorize(ctx context.Context, thingKey, chanID, action string) (string, error) {
|
|
ar := &mainflux.AuthorizeReq{
|
|
Namespace: "",
|
|
SubjectType: "thing",
|
|
Permission: action,
|
|
Subject: thingKey,
|
|
Object: chanID,
|
|
ObjectType: "group",
|
|
}
|
|
res, err := svc.auth.Authorize(ctx, ar)
|
|
if err != nil {
|
|
return "", errors.Wrap(errors.ErrAuthorization, err)
|
|
}
|
|
if !res.GetAuthorized() {
|
|
return "", errors.Wrap(errors.ErrAuthorization, err)
|
|
}
|
|
|
|
return res.GetId(), nil
|
|
}
|