mirror of
https://github.com/mainflux/mainflux.git
synced 2025-04-26 13:48:53 +08:00
Fix ws adapter communication with broker and things service (#1899)
Signed-off-by: Rodney Osodo <socials@rodneyosodo.com>
This commit is contained in:
parent
320921a63e
commit
7c3add6949
@ -111,7 +111,7 @@ func main() {
|
|||||||
|
|
||||||
svc := newService(tc, nps, logger, tracer)
|
svc := newService(tc, nps, logger, tracer)
|
||||||
|
|
||||||
hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(svc, logger, cfg.InstanceID), logger)
|
hs := httpserver.New(ctx, cancel, svcName, httpServerConfig, api.MakeHandler(ctx, svc, logger, cfg.InstanceID), logger)
|
||||||
|
|
||||||
if cfg.SendTelemetry {
|
if cfg.SendTelemetry {
|
||||||
chc := chclient.New(svcName, mainflux.Version, logger, cancel)
|
chc := chclient.New(svcName, mainflux.Version, logger, cancel)
|
||||||
|
@ -79,7 +79,7 @@ func (svc *adapterService) Subscribe(ctx context.Context, key, chanID, subtopic
|
|||||||
Subject: key,
|
Subject: key,
|
||||||
Object: chanID,
|
Object: chanID,
|
||||||
Action: policies.ReadAction,
|
Action: policies.ReadAction,
|
||||||
EntityType: policies.GroupEntityType,
|
EntityType: policies.ThingEntityType,
|
||||||
}
|
}
|
||||||
res, err := svc.auth.Authorize(ctx, ar)
|
res, err := svc.auth.Authorize(ctx, ar)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -100,7 +100,7 @@ func (svc *adapterService) Unsubscribe(ctx context.Context, key, chanID, subtopi
|
|||||||
Subject: key,
|
Subject: key,
|
||||||
Object: chanID,
|
Object: chanID,
|
||||||
Action: policies.ReadAction,
|
Action: policies.ReadAction,
|
||||||
EntityType: policies.GroupEntityType,
|
EntityType: policies.ThingEntityType,
|
||||||
}
|
}
|
||||||
res, err := svc.auth.Authorize(ctx, ar)
|
res, err := svc.auth.Authorize(ctx, ar)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -97,7 +97,7 @@ func (h *handler) AuthConnect(ctx context.Context) error {
|
|||||||
return errors.ErrAuthentication
|
return errors.ErrAuthentication
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := h.es.Connect(ctx, string(s.Password)); err != nil {
|
if err := h.es.Connect(ctx, pwd); err != nil {
|
||||||
h.logger.Error(errors.Wrap(ErrFailedPublishConnectEvent, err).Error())
|
h.logger.Error(errors.Wrap(ErrFailedPublishConnectEvent, err).Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -249,7 +249,7 @@ func (h *handler) authAccess(ctx context.Context, password, topic, action string
|
|||||||
return errors.ErrAuthorization
|
return errors.ErrAuthorization
|
||||||
}
|
}
|
||||||
|
|
||||||
return err
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func parseSubtopic(subtopic string) (string, error) {
|
func parseSubtopic(subtopic string) (string, error) {
|
||||||
|
@ -75,7 +75,7 @@ func New(auth policies.AuthServiceClient, pubsub messaging.PubSub) Service {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (svc *adapterService) Publish(ctx context.Context, thingKey string, msg *messaging.Message) error {
|
func (svc *adapterService) Publish(ctx context.Context, thingKey string, msg *messaging.Message) error {
|
||||||
thid, err := svc.authorize(ctx, thingKey, msg.GetChannel())
|
thid, err := svc.authorize(ctx, thingKey, msg.GetChannel(), policies.WriteAction)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ErrUnauthorizedAccess
|
return ErrUnauthorizedAccess
|
||||||
}
|
}
|
||||||
@ -98,7 +98,7 @@ func (svc *adapterService) Subscribe(ctx context.Context, thingKey, chanID, subt
|
|||||||
return ErrUnauthorizedAccess
|
return ErrUnauthorizedAccess
|
||||||
}
|
}
|
||||||
|
|
||||||
thid, err := svc.authorize(ctx, thingKey, chanID)
|
thid, err := svc.authorize(ctx, thingKey, chanID, policies.ReadAction)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ErrUnauthorizedAccess
|
return ErrUnauthorizedAccess
|
||||||
}
|
}
|
||||||
@ -122,7 +122,7 @@ func (svc *adapterService) Unsubscribe(ctx context.Context, thingKey, chanID, su
|
|||||||
return ErrUnauthorizedAccess
|
return ErrUnauthorizedAccess
|
||||||
}
|
}
|
||||||
|
|
||||||
thid, err := svc.authorize(ctx, thingKey, chanID)
|
thid, err := svc.authorize(ctx, thingKey, chanID, policies.ReadAction)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ErrUnauthorizedAccess
|
return ErrUnauthorizedAccess
|
||||||
}
|
}
|
||||||
@ -137,11 +137,11 @@ func (svc *adapterService) Unsubscribe(ctx context.Context, thingKey, chanID, su
|
|||||||
|
|
||||||
// authorize checks if the thingKey is authorized to access the channel
|
// authorize checks if the thingKey is authorized to access the channel
|
||||||
// and returns the thingID if it is.
|
// and returns the thingID if it is.
|
||||||
func (svc *adapterService) authorize(ctx context.Context, thingKey, chanID string) (string, error) {
|
func (svc *adapterService) authorize(ctx context.Context, thingKey, chanID, action string) (string, error) {
|
||||||
ar := &policies.AuthorizeReq{
|
ar := &policies.AuthorizeReq{
|
||||||
Subject: thingKey,
|
Subject: thingKey,
|
||||||
Object: chanID,
|
Object: chanID,
|
||||||
Action: policies.ReadAction,
|
Action: action,
|
||||||
EntityType: policies.ThingEntityType,
|
EntityType: policies.ThingEntityType,
|
||||||
}
|
}
|
||||||
res, err := svc.auth.Authorize(ctx, ar)
|
res, err := svc.auth.Authorize(ctx, ar)
|
||||||
|
@ -4,6 +4,7 @@
|
|||||||
package api_test
|
package api_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
@ -37,7 +38,7 @@ func newService(cc policies.AuthServiceClient) (ws.Service, mocks.MockPubSub) {
|
|||||||
|
|
||||||
func newHTTPServer(svc ws.Service) *httptest.Server {
|
func newHTTPServer(svc ws.Service) *httptest.Server {
|
||||||
logger := mflog.NewMock()
|
logger := mflog.NewMock()
|
||||||
mux := api.MakeHandler(svc, logger, instanceID)
|
mux := api.MakeHandler(context.Background(), svc, logger, instanceID)
|
||||||
return httptest.NewServer(mux)
|
return httptest.NewServer(mux)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -21,9 +21,8 @@ import (
|
|||||||
|
|
||||||
var channelPartRegExp = regexp.MustCompile(`^/channels/([\w\-]+)/messages(/[^?]*)?(\?.*)?$`)
|
var channelPartRegExp = regexp.MustCompile(`^/channels/([\w\-]+)/messages(/[^?]*)?(\?.*)?$`)
|
||||||
|
|
||||||
func handshake(svc ws.Service) http.HandlerFunc {
|
func handshake(ctx context.Context, svc ws.Service) http.HandlerFunc {
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
ctx := r.Context()
|
|
||||||
req, err := decodeRequest(r)
|
req, err := decodeRequest(r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
encodeError(w, err)
|
encodeError(w, err)
|
||||||
@ -146,10 +145,10 @@ func process(ctx context.Context, svc ws.Service, req connReq, msgs <-chan []byt
|
|||||||
Payload: msg,
|
Payload: msg,
|
||||||
Created: time.Now().UnixNano(),
|
Created: time.Now().UnixNano(),
|
||||||
}
|
}
|
||||||
_ = svc.Publish(ctx, req.thingKey, &m)
|
|
||||||
}
|
if err := svc.Publish(ctx, req.thingKey, &m); err != nil {
|
||||||
if err := svc.Unsubscribe(ctx, req.thingKey, req.chanID, req.subtopic); err != nil {
|
logger.Warn(fmt.Sprintf("Failed to publish message: %s", err.Error()))
|
||||||
req.conn.Close()
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4,6 +4,7 @@
|
|||||||
package api
|
package api
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
@ -35,12 +36,12 @@ var (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// MakeHandler returns http handler with handshake endpoint.
|
// MakeHandler returns http handler with handshake endpoint.
|
||||||
func MakeHandler(svc ws.Service, l mflog.Logger, instanceID string) http.Handler {
|
func MakeHandler(ctx context.Context, svc ws.Service, l mflog.Logger, instanceID string) http.Handler {
|
||||||
logger = l
|
logger = l
|
||||||
|
|
||||||
mux := bone.New()
|
mux := bone.New()
|
||||||
mux.GetFunc("/channels/:chanID/messages", handshake(svc))
|
mux.GetFunc("/channels/:chanID/messages", handshake(ctx, svc))
|
||||||
mux.GetFunc("/channels/:chanID/messages/*", handshake(svc))
|
mux.GetFunc("/channels/:chanID/messages/*", handshake(ctx, svc))
|
||||||
mux.GetFunc("/version", mainflux.Health(protocol, instanceID))
|
mux.GetFunc("/version", mainflux.Health(protocol, instanceID))
|
||||||
mux.Handle("/metrics", promhttp.Handler())
|
mux.Handle("/metrics", promhttp.Handler())
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user