1
0
mirror of https://github.com/mainflux/mainflux.git synced 2025-04-27 13:48:49 +08:00

MF-898 - Change thing's service to use bulk connect (#946)

* Change thing's service to use bulk connect

Signed-off-by: Nick Neisen <nwneisen@gmail.com>

* Move owner check outside of loop

Signed-off-by: Nick Neisen <nwneisen@gmail.com>

* Change invalid user to unauthorized access error

Signed-off-by: Nick Neisen <nwneisen@gmail.com>
This commit is contained in:
Nick Neisen 2019-11-11 17:24:29 -07:00 committed by Manuel Imperiale
parent 5446d88abf
commit 67d518821a
5 changed files with 28 additions and 25 deletions

View File

@ -70,7 +70,7 @@ func (svc *mainfluxThings) ViewThing(_ context.Context, owner, id string) (thing
return things.Thing{}, things.ErrNotFound
}
func (svc *mainfluxThings) Connect(_ context.Context, owner, chanID, thingID string) error {
func (svc *mainfluxThings) Connect(_ context.Context, owner, chID string, thIDs ...string) error {
svc.mu.Lock()
defer svc.mu.Unlock()
@ -78,12 +78,13 @@ func (svc *mainfluxThings) Connect(_ context.Context, owner, chanID, thingID str
if err != nil {
return things.ErrUnauthorizedAccess
}
if svc.channels[chanID].Owner != userID.Value {
return things.ErrNotFound
if svc.channels[chID].Owner != userID.Value {
return things.ErrUnauthorizedAccess
}
for _, thID := range thIDs {
svc.connections[chID] = append(svc.connections[chID], thID)
}
svc.connections[chanID] = append(svc.connections[chanID], thingID)
return nil
}

View File

@ -201,9 +201,9 @@ func (lm *loggingMiddleware) RemoveChannel(ctx context.Context, token, id string
return lm.svc.RemoveChannel(ctx, token, id)
}
func (lm *loggingMiddleware) Connect(ctx context.Context, token, chanID, thingID string) (err error) {
func (lm *loggingMiddleware) Connect(ctx context.Context, token, chID string, thIDs ...string) (err error) {
defer func(begin time.Time) {
message := fmt.Sprintf("Method connect for token %s, channel %s and thing %s took %s to complete", token, chanID, thingID, time.Since(begin))
message := fmt.Sprintf("Method connect for token %s, channel %s and things %s took %s to complete", token, chID, thIDs, time.Since(begin))
if err != nil {
lm.logger.Warn(fmt.Sprintf("%s with error: %s.", message, err))
return
@ -211,7 +211,7 @@ func (lm *loggingMiddleware) Connect(ctx context.Context, token, chanID, thingID
lm.logger.Info(fmt.Sprintf("%s without errors.", message))
}(time.Now())
return lm.svc.Connect(ctx, token, chanID, thingID)
return lm.svc.Connect(ctx, token, chID, thIDs...)
}
func (lm *loggingMiddleware) Disconnect(ctx context.Context, token, chanID, thingID string) (err error) {

View File

@ -148,13 +148,13 @@ func (ms *metricsMiddleware) RemoveChannel(ctx context.Context, token, id string
return ms.svc.RemoveChannel(ctx, token, id)
}
func (ms *metricsMiddleware) Connect(ctx context.Context, token, chanID, thingID string) error {
func (ms *metricsMiddleware) Connect(ctx context.Context, token, chanID string, thIDs ...string) error {
defer func(begin time.Time) {
ms.counter.With("method", "connect").Add(1)
ms.latency.With("method", "connect").Observe(time.Since(begin).Seconds())
}(time.Now())
return ms.svc.Connect(ctx, token, chanID, thingID)
return ms.svc.Connect(ctx, token, chanID, thIDs...)
}
func (ms *metricsMiddleware) Disconnect(ctx context.Context, token, chanID, thingID string) error {

View File

@ -186,21 +186,23 @@ func (es eventStore) RemoveChannel(ctx context.Context, token, id string) error
return nil
}
func (es eventStore) Connect(ctx context.Context, token, chanID, thingID string) error {
if err := es.svc.Connect(ctx, token, chanID, thingID); err != nil {
func (es eventStore) Connect(ctx context.Context, token, chID string, thIDs ...string) error {
if err := es.svc.Connect(ctx, token, chID, thIDs...); err != nil {
return err
}
event := connectThingEvent{
chanID: chanID,
thingID: thingID,
for _, thID := range thIDs {
event := connectThingEvent{
chanID: chID,
thingID: thID,
}
record := &redis.XAddArgs{
Stream: streamID,
MaxLenApprox: streamLen,
Values: event.Encode(),
}
es.client.XAdd(record).Err()
}
record := &redis.XAddArgs{
Stream: streamID,
MaxLenApprox: streamLen,
Values: event.Encode(),
}
es.client.XAdd(record).Err()
return nil
}

View File

@ -84,8 +84,8 @@ type Service interface {
// belongs to the user identified by the provided key.
RemoveChannel(context.Context, string, string) error
// Connect adds thing to the channel's list of connected things.
Connect(context.Context, string, string, string) error
// Connect adds things to the channel's list of connected things.
Connect(context.Context, string, string, ...string) error
// Disconnect removes thing from the channel's list of connected
// things.
@ -283,13 +283,13 @@ func (ts *thingsService) RemoveChannel(ctx context.Context, token, id string) er
return ts.channels.Remove(ctx, res.GetValue(), id)
}
func (ts *thingsService) Connect(ctx context.Context, token, chanID, thingID string) error {
func (ts *thingsService) Connect(ctx context.Context, token, chID string, thIDs ...string) error {
res, err := ts.users.Identify(ctx, &mainflux.Token{Value: token})
if err != nil {
return ErrUnauthorizedAccess
}
return ts.channels.Connect(ctx, res.GetValue(), chanID, thingID)
return ts.channels.Connect(ctx, res.GetValue(), chID, thIDs...)
}
func (ts *thingsService) Disconnect(ctx context.Context, token, chanID, thingID string) error {