mirror of
https://github.com/mainflux/mainflux.git
synced 2025-05-08 19:29:17 +08:00

* MF-1263 - Move repeating errors to the separate package (#1540) * MF-1263 - Mv duplicated errors to pkg/errors Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Revert test build flags Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix merge Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix comment Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> Co-authored-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * NOISSUE - Fix auth members list response (#1555) * NOISSUE - Fix auth members list response Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Move group type next to page details Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Rm membersRes Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix typo Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * MF-1261 - Use StatusUnauthorized for authn and StatusForbidden for authz (#1538) * MF-1261 - Use StatusUnauthorized for authn and StatusForbidden for authz Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * ErrExternalKey typo Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Rename ErrUnauthorizedAcces -> ErrAuthentication Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix bootstrap error Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix status code in openapi Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix test description Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix test description Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix test description Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Add errors cases Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix status codes Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Add gRPC stutus code Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix tests description Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix openapi and encodeError Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix grpc message Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix test descriptions Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Revert sdk error Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix typo Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * MF-1059 - Add TLS support for email (#1560) * Use gomail package for sending emails Signed-off-by: Ivan Milosevic <iva@blokovi.com> * remove print err Signed-off-by: Ivan Milosevic <iva@blokovi.com> * Add vendor Signed-off-by: Ivan Milosevic <iva@blokovi.com> * Rename email structure remove logger Signed-off-by: Ivan Milosevic <iva@blokovi.com> * typo in var name Signed-off-by: Ivan Milosevic <iva@blokovi.com> * rename var Signed-off-by: Ivan Milosevic <iva@blokovi.com> * remove MF_EMAIL_SECRET Signed-off-by: Ivan Milosevic <iva@blokovi.com> Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * NOISSUE - Refactor MQTT subscriber (#1561) * correct suscriber interface validator + refactore token error handling Signed-off-by: tzzed <zerouali.t@gmail.com> * apply review suggestion Signed-off-by: tzzed <zerouali.t@gmail.com> Co-authored-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * MF-1257 - Access messages from readers endpoint with user access token (#1470) * remove owner id Signed-off-by: Mirko Teodorovic <mirko.teodorovic@gmail.com> * add user auth for db reader Signed-off-by: mteodor <mirko.teodorovic@gmail.com> * add user auth for db reader Signed-off-by: mteodor <mirko.teodorovic@gmail.com> * enable mongodb reader for user token reading Signed-off-by: mteodor <mirko.teodorovic@gmail.com> * use uuid check for auth switch between thing key and user tok Signed-off-by: mteodor <mirko.teodorovic@gmail.com> * enable user token reading Signed-off-by: mteodor <mirko.teodorovic@gmail.com> * revert to correct version Signed-off-by: mteodor <mirko.teodorovic@gmail.com> * fix endpoint test, add additional tests Signed-off-by: mteodor <mirko.teodorovic@gmail.com> * remove logs,dead code Signed-off-by: mteodor <mirko.teodorovic@gmail.com> * fix logging messages Signed-off-by: mteodor <mirko.teodorovic@gmail.com> * remove auth interface, add authorization header type Signed-off-by: mteodor <mirko.teodorovic@gmail.com> * update api doc Signed-off-by: mteodor <mirko.teodorovic@gmail.com> * remove unused package Signed-off-by: mteodor <mirko.teodorovic@gmail.com> * some refactor of cases for authorization switch Signed-off-by: mteodor <mirko.teodorovic@gmail.com> * correct description in openapi Signed-off-by: mteodor <mirko.teodorovic@gmail.com> * fix endpoint test to match auth service change Signed-off-by: mteodor <mirko.teodorovic@gmail.com> * some rename Signed-off-by: mteodor <mirko.teodorovic@gmail.com> * initialize auth url Signed-off-by: mteodor <mirko.teodorovic@gmail.com> * add env variables for auth service Signed-off-by: mteodor <mirko.teodorovic@gmail.com> * fix spelling Signed-off-by: mteodor <mirko.teodorovic@gmail.com> * Things prefix and no prefix for Thing authorization, Bearer for user Signed-off-by: mteodor <mirko.teodorovic@gmail.com> * update readme file Signed-off-by: mteodor <mirko.teodorovic@gmail.com> * fix default things grpc port Signed-off-by: mteodor <mirko.teodorovic@gmail.com> * enable user reading for timescaledb Signed-off-by: mteodor <mirko.teodorovic@gmail.com> * remove not used error Signed-off-by: mteodor <mirko.teodorovic@gmail.com> * improve errors Signed-off-by: mteodor <mirko.teodorovic@gmail.com> * refactor authorize Signed-off-by: mteodor <mirko.teodorovic@gmail.com> * add chanID check Signed-off-by: mteodor <mirko.teodorovic@gmail.com> * inline some error checking Signed-off-by: mteodor <mirko.teodorovic@gmail.com> * fixing errors Signed-off-by: mteodor <mirko.teodorovic@gmail.com> * fixing errors Signed-off-by: mteodor <mirko.teodorovic@gmail.com> * improve test case description Signed-off-by: mteodor <mirko.teodorovic@gmail.com> * remove test code Signed-off-by: mteodor <mirko.teodorovic@gmail.com> * dont inline Signed-off-by: mteodor <mirko.teodorovic@gmail.com> * refactor a bit encodeError Signed-off-by: mteodor <mirko.teodorovic@gmail.com> * remove unused error Signed-off-by: mteodor <mirko.teodorovic@gmail.com> * remove unused error Signed-off-by: mteodor <mirko.teodorovic@gmail.com> * fix things auth grpc url Signed-off-by: mteodor <mirko.teodorovic@gmail.com> * rename variables for header prefix Signed-off-by: mteodor <mirko.teodorovic@gmail.com> Co-authored-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Initial commit of adding rabbitmq broker Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Initial commit of adding rabbitmq broker Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Initial commit for tests Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Bump up tests Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Add more tests Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Add go routines Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Initial commit of adding rabbitmq broker Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Initial commit for tests Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Bump up tests Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Add more tests Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Add go routines Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Fix tests Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Fix with wait groups Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * unsubscribe to stop delivering messages Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Remove exclusivity Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * MF-1551 - Fix Cobra usage commands and clean unnecessary struct types (#1558) * MF-1551 - Fix Cobra usage commands and clean unnecessary struct types Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Use linux syntax for cmd usage description Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix typo Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix cmd.Use Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * NOISSUE - Separate Keto hosts for read and write (#1563) * Separate keto hosts for read and write Signed-off-by: Ivan Milosevic <iva@blokovi.com> * update readme with new envars Signed-off-by: Ivan Milosevic <iva@blokovi.com> * rename read connection name Signed-off-by: Ivan Milosevic <iva@blokovi.com> Co-authored-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> Co-authored-by: Drasko DRASKOVIC <drasko.draskovic@gmail.com> Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Update dependencies (#1564) Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com> Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * MF-1240 - Return to service transport layer only service errors (#1559) * MF-1240 - Return to service transport layer only service errors Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Remove unecessary errors Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Rm duplicated errors and fix transport Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Revert http endpoint_test Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix conflict Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> Co-authored-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Implement cancel mechanisms Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Queuename as parameter Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Queuename as parameter Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * MF-1469 - Indicate proper authentication scheme in Authorization header (#1523) * MF-1469 - Indicate proper authentication scheme in Authorization header Signed-off-by: Stefan Kovacevic <jen2tri@gmail.com> * Fixing the remarks on the last push Signed-off-by: Stefan Kovacevic <jen2tri@gmail.com> * Remove Bearer prefix in all services and fix tests Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix remarks Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> Co-authored-by: Manuel Imperiale <manuel.imperiale@gmail.com> Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * NOISSUE - Add nats wrapper for COAP (#1569) * Add nats wrapper for COAP Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Pass pubsub as argument Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Defer close connection Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Defer close connection Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Rename endpoint to topic Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * MF-1348 - Add transport errors logging (#1544) * MF-1348 - Add go-kit transport level logging Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix reviews Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix reviews Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix merge Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix remark Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix go test flags Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Use httputil errors in things and http service Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix SDK tests Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Use httputil errors in certs and provision service Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Use httputil errors in consumers service Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * General renaming and add ErrMissingToken Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Rename httputil -> apiutil and use errors in users servive Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Use apiutil errors in auth, bootstrap, readers, things and twins Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Replace errors.Contain by comparison Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix remarks Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Simplify validateID Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Simplify validateID Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Simplify and rename ExtractAuthToken -> ExtractBearerToken Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix readers Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix auth key test and remarks Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Improve comment Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Simplify validateUUID check Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix typo Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> Co-authored-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * MF-1567 - Use Bearer, Thing or Basic scheme in Authorization header (#1568) Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * MF-1565 - Document Bearer, Thing and Basic Authorization header (#1566) * MF-1565 - Document Bearer Authorization header Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix auth, bootstrap, http and readers openapi Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix openapi Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Add enc key for bootstrap Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix typo Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Use global security Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix bearer formats Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Polish descriptions Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Fix boostrap and typo Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> Co-authored-by: Drasko DRASKOVIC <drasko.draskovic@gmail.com> Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * MF-1575 Add 'Name' field to ListMembers response in things svc (#1576) Signed-off-by: Ivan Balboteo <ivan@submer.com> Co-authored-by: Ivan Balboteo <ivan@submer.com> Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * MF-1580 - Influxdb Writer changes format of update-time to string (#1581) * - MF-1580 - Modified consumers/writers/influxdb/fields.go - influxdb-writer used to update data type of update-time to string - Commented line 12 of consumers/writers/influxdb/fields.go to resolve uneccessary data type conversion issue Signed-off-by: Hasan Tariq <hasantariqashraf@gmail.com> * - MF-1580 - Removed strconv package from consumers/writers/influxdb/fields.go since it is no longer needed - Removed line 12 from consumers/writers/influxdb/fields.go - Replaced retrun value of updateTime with msg.UpdateTime (line 16 in fields.go) Signed-off-by: Hasan Tariq <hasantariqashraf@gmail.com> * Fix InflxuDB readers Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com> Co-authored-by: Hasan Tariq <hasant@plcgroup.com> Co-authored-by: dusanb94 <dusan.borovcanin@mainflux.com> Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * NOISSUE - Unify MF_INFLUX_READER_DB_HOST and MF_INFLUX_WRITER_DB_HOST envars (#1585) Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * NOISSUE - Fix CoAP adapter (#1572) * Revert "NOISSUE - Add nats wrapper for COAP (#1569)" This reverts commit cc5d5195ab27fa94270ada616487b7053fd9c7bd. Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com> * Fix CoAP adapter Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com> * Update CoAP observation cancel Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com> * Fix observe Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com> * Fix GET handling Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com> * Revert authorization Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com> * Use constants instead of magic numbers Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com> * Remove an empty line Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com> * Extract special observe value to constant Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com> Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * MF-1582 - Fix lora-adapter MQTT client (#1583) * MF-1582 - Fix lora-adapter MQTT clien Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Add timeout config to the mqtt subscriber Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Rm comment Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> * Add sub timeout Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * NOISSUE - Update changelog and readme for release 0.13.0 (#1592) * Update release example Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com> * Update changelog and examples for 0.13.0 release Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com> Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Update VerneMQ release (#1593) Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com> Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * NOISSUE - Update changelog for release 0.13.0 (#1595) Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com> Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * unexport constants Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Change routingkey Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Remove wait groups Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * protecting map Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Add publisher to pubsub Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Change proto library Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Fix typos Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Reduce pubsub tests based on implementation Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Remove channel cancel Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Export constant Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * NOISSUE - Move invariant statements out of loop for cassandra-writer (#1596) Signed-off-by: fuzhy <fuzhy1997@outlook.com> Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Embedding publisher into pubsub Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Naming publisher Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * NOISSUE - Fix Nginx entrypoint script (#1597) * Fix Nginx entrypoint script Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com> * Update dependencies Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com> * Fix NginX entrypoint Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com> * Revert Makefile changes Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com> Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * MF-1525 - Add graceful stop for HTTP and GRPC servers (#1548) * Add : errgroup to cmd/auth Signed-off-by: Arvindh <arvindh91@gmail.com> * Add : Handle graceful stop for auth service Remove : errgroups from auth service Signed-off-by: Arvindh <arvindh91@gmail.com> * Add : Wait till server shutdown Signed-off-by: Arvindh <arvindh91@gmail.com> * Change : instead of waitgroup changed to errgroups Signed-off-by: Arvindh <arvindh91@gmail.com> * change : KillSignalHandler return type to error Signed-off-by: Arvindh <arvindh91@gmail.com> * Empty Commit Signed-off-by: Arvindh <arvindh91@gmail.com> * Add : Context to http server shutdown Rename : varaible from proto to protocol Signed-off-by: Arvindh <arvindh91@gmail.com> * change : to default log level Signed-off-by: Arvindh <arvindh91@gmail.com> * Add : Sign-off Signed-off-by: Arvindh <arvindh91@gmail.com> * Add: graceful stop of http and grpc server Signed-off-by: Arvindh <arvindh91@gmail.com> * Fix: typos and caps Signed-off-by: Arvindh <arvindh91@gmail.com> * Add: Signed-off Signed-off-by: Arvindh <arvindh91@gmail.com> * Rename: Func KillSignalHandler to SignalHandler Add: SIGABRT Signed-off-by: Arvindh <arvindh91@gmail.com> * Fix: auth service Signed-off-by: Arvindh <arvindh91@gmail.com> * Add: timeout for grpc gracefulstop Fix: typos Signed-off-by: Arvindh <arvindh91@gmail.com> * Add: .vscode folder to git ignore Signed-off-by: Arvindh <arvindh91@gmail.com> * change: variable name to stopWaitTime Signed-off-by: Arvindh <arvindh91@gmail.com> * remove: .vscode folder Signed-off-by: Arvindh <arvindh91@gmail.com> * remove: .vscode from .gitignore Signed-off-by: Arvindh <arvindh91@gmail.com> * Add : logger to handlers Signed-off-by: Arvindh <arvindh91@gmail.com> * Add : New line at end of .gitignore file Signed-off-by: Arvindh <arvindh91@gmail.com> * Fix : variable naming Add : graceful stop for timescale Signed-off-by: Arvindh <arvindh91@gmail.com> * Remove : unsued NATS library from import Signed-off-by: Arvindh <arvindh91@gmail.com> * Move: "https" and "https" to moved to const var Signed-off-by: Arvindh <arvindh91@gmail.com> * Move: "http" and "https" to moved to const var Signed-off-by: Arvindh <arvindh91@gmail.com> * update: branch with master Signed-off-by: Arvindh <arvindh91@gmail.com> Co-authored-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> Co-authored-by: Drasko DRASKOVIC <drasko.draskovic@gmail.com> Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * MF-1588 - Update Subscriber interface (#1598) * Initial commit Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Update subscriber interface Signed-off-by: dusanb94 <dusan.borovcanin@mainflux.com> * Add tests Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Add tests Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * check subscription map Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Check topic id after topic Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * reword description Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Setup empty queue Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Change mqtt implementation Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Switch statements Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Simplify Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Change mqtt subscriber Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Protect subscription map Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Fix subscription Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Set client id Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Format Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Change delete method Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> Co-authored-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Update rabbitmq subscriber interface Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * using publisher composition Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Change contenttype Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * rename topic for publish and subscribe Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Change errors to lower case Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Change errors to lower case Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * export errors Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * MF - 1590 - Fix fetching list of users with a zero limit (#1594) * Add max and min limit size Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Format Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Format Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> Co-authored-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * NOISSUE - Retrieve client key on cert issuing (#1607) Signed-off-by: Manuel Imperiale <manuel.imperiale@gmail.com> Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * fix bug (#1604) Signed-off-by: zhangchuanfeng <654300242@qq.com> Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * queue per subscription Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * queue per subscription Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Change routing method Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Direct method with one exchange to many queues, one consumer per queue Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * ♻️ Not casting data Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * ✏️ Fix typo Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * ♻️ remove passed queue name Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * 🔥 removing echange kind Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Combine tests Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Refactor unsubscribe method Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Fix merge conflict Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * ✅ sub and unsub to dummy topic Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * generate client id from topic and ID Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Rename topicID to clientID Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * update tests Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Reuse clientID Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Fix typos Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> * Seperate testpublish and testpubsub Signed-off-by: 0x6f736f646f <blackd0t@protonmail.com> Co-authored-by: Manuel Imperiale <manuel.Imperiale@gmail.com> Co-authored-by: Dušan Borovčanin <dusan.borovcanin@mainflux.com> Co-authored-by: Ivan Milošević <iva@blokovi.com> Co-authored-by: __touk__ <zerouali.t@gmail.com> Co-authored-by: Mirko Teodorovic <mirko.teodorovic@gmail.com> Co-authored-by: Drasko DRASKOVIC <drasko.draskovic@gmail.com> Co-authored-by: stefankovacevic123 <jen2tri@gmail.com> Co-authored-by: ibalboteo <ivanbalboteo@gmail.com> Co-authored-by: Ivan Balboteo <ivan@submer.com> Co-authored-by: Hasan98-git <67228396+Hasan98-git@users.noreply.github.com> Co-authored-by: Hasan Tariq <hasant@plcgroup.com> Co-authored-by: fuzhy <fuzhy1997@outlook.com> Co-authored-by: Arvindh <30824765+arvindh123@users.noreply.github.com> Co-authored-by: 张传峰 <59160162+zhang-chuanfeng@users.noreply.github.com>
871 lines
24 KiB
Go
871 lines
24 KiB
Go
// Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved.
|
|
// Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd.
|
|
// Use of this source code is governed by a BSD-style
|
|
// license that can be found in the LICENSE file.
|
|
|
|
package amqp091
|
|
|
|
import (
|
|
"bufio"
|
|
"crypto/tls"
|
|
"io"
|
|
"net"
|
|
"reflect"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
const (
|
|
maxChannelMax = (2 << 15) - 1
|
|
|
|
defaultHeartbeat = 10 * time.Second
|
|
defaultConnectionTimeout = 30 * time.Second
|
|
defaultProduct = "https://github.com/streadway/amqp"
|
|
defaultVersion = "β"
|
|
// Safer default that makes channel leaks a lot easier to spot
|
|
// before they create operational headaches. See https://github.com/rabbitmq/rabbitmq-server/issues/1593.
|
|
defaultChannelMax = (2 << 10) - 1
|
|
defaultLocale = "en_US"
|
|
)
|
|
|
|
// Config is used in DialConfig and Open to specify the desired tuning
|
|
// parameters used during a connection open handshake. The negotiated tuning
|
|
// will be stored in the returned connection's Config field.
|
|
type Config struct {
|
|
// The SASL mechanisms to try in the client request, and the successful
|
|
// mechanism used on the Connection object.
|
|
// If SASL is nil, PlainAuth from the URL is used.
|
|
SASL []Authentication
|
|
|
|
// Vhost specifies the namespace of permissions, exchanges, queues and
|
|
// bindings on the server. Dial sets this to the path parsed from the URL.
|
|
Vhost string
|
|
|
|
ChannelMax int // 0 max channels means 2^16 - 1
|
|
FrameSize int // 0 max bytes means unlimited
|
|
Heartbeat time.Duration // less than 1s uses the server's interval
|
|
|
|
// TLSClientConfig specifies the client configuration of the TLS connection
|
|
// when establishing a tls transport.
|
|
// If the URL uses an amqps scheme, then an empty tls.Config with the
|
|
// ServerName from the URL is used.
|
|
TLSClientConfig *tls.Config
|
|
|
|
// Properties is table of properties that the client advertises to the server.
|
|
// This is an optional setting - if the application does not set this,
|
|
// the underlying library will use a generic set of client properties.
|
|
Properties Table
|
|
|
|
// Connection locale that we expect to always be en_US
|
|
// Even though servers must return it as per the AMQP 0-9-1 spec,
|
|
// we are not aware of it being used other than to satisfy the spec requirements
|
|
Locale string
|
|
|
|
// Dial returns a net.Conn prepared for a TLS handshake with TSLClientConfig,
|
|
// then an AMQP connection handshake.
|
|
// If Dial is nil, net.DialTimeout with a 30s connection and 30s deadline is
|
|
// used during TLS and AMQP handshaking.
|
|
Dial func(network, addr string) (net.Conn, error)
|
|
}
|
|
|
|
// Connection manages the serialization and deserialization of frames from IO
|
|
// and dispatches the frames to the appropriate channel. All RPC methods and
|
|
// asynchronous Publishing, Delivery, Ack, Nack and Return messages are
|
|
// multiplexed on this channel. There must always be active receivers for
|
|
// every asynchronous message on this connection.
|
|
type Connection struct {
|
|
destructor sync.Once // shutdown once
|
|
sendM sync.Mutex // conn writer mutex
|
|
m sync.Mutex // struct field mutex
|
|
|
|
conn io.ReadWriteCloser
|
|
|
|
rpc chan message
|
|
writer *writer
|
|
sends chan time.Time // timestamps of each frame sent
|
|
deadlines chan readDeadliner // heartbeater updates read deadlines
|
|
|
|
allocator *allocator // id generator valid after openTune
|
|
channels map[uint16]*Channel
|
|
|
|
noNotify bool // true when we will never notify again
|
|
closes []chan *Error
|
|
blocks []chan Blocking
|
|
|
|
errors chan *Error
|
|
|
|
Config Config // The negotiated Config after connection.open
|
|
|
|
Major int // Server's major version
|
|
Minor int // Server's minor version
|
|
Properties Table // Server properties
|
|
Locales []string // Server locales
|
|
|
|
closed int32 // Will be 1 if the connection is closed, 0 otherwise. Should only be accessed as atomic
|
|
}
|
|
|
|
type readDeadliner interface {
|
|
SetReadDeadline(time.Time) error
|
|
}
|
|
|
|
// DefaultDial establishes a connection when config.Dial is not provided
|
|
func DefaultDial(connectionTimeout time.Duration) func(network, addr string) (net.Conn, error) {
|
|
return func(network, addr string) (net.Conn, error) {
|
|
conn, err := net.DialTimeout(network, addr, connectionTimeout)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Heartbeating hasn't started yet, don't stall forever on a dead server.
|
|
// A deadline is set for TLS and AMQP handshaking. After AMQP is established,
|
|
// the deadline is cleared in openComplete.
|
|
if err := conn.SetDeadline(time.Now().Add(connectionTimeout)); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return conn, nil
|
|
}
|
|
}
|
|
|
|
// Dial accepts a string in the AMQP URI format and returns a new Connection
|
|
// over TCP using PlainAuth. Defaults to a server heartbeat interval of 10
|
|
// seconds and sets the handshake deadline to 30 seconds. After handshake,
|
|
// deadlines are cleared.
|
|
//
|
|
// Dial uses the zero value of tls.Config when it encounters an amqps://
|
|
// scheme. It is equivalent to calling DialTLS(amqp, nil).
|
|
func Dial(url string) (*Connection, error) {
|
|
return DialConfig(url, Config{
|
|
Heartbeat: defaultHeartbeat,
|
|
Locale: defaultLocale,
|
|
})
|
|
}
|
|
|
|
// DialTLS accepts a string in the AMQP URI format and returns a new Connection
|
|
// over TCP using PlainAuth. Defaults to a server heartbeat interval of 10
|
|
// seconds and sets the initial read deadline to 30 seconds.
|
|
//
|
|
// DialTLS uses the provided tls.Config when encountering an amqps:// scheme.
|
|
func DialTLS(url string, amqps *tls.Config) (*Connection, error) {
|
|
return DialConfig(url, Config{
|
|
Heartbeat: defaultHeartbeat,
|
|
TLSClientConfig: amqps,
|
|
Locale: defaultLocale,
|
|
})
|
|
}
|
|
|
|
// DialTLS_ExternalAuth accepts a string in the AMQP URI format and returns a
|
|
// new Connection over TCP using EXTERNAL auth. Defaults to a server heartbeat
|
|
// interval of 10 seconds and sets the initial read deadline to 30 seconds.
|
|
//
|
|
// This mechanism is used, when RabbitMQ is configured for EXTERNAL auth with
|
|
// ssl_cert_login plugin for userless/passwordless logons
|
|
//
|
|
// DialTLS_ExternalAuth uses the provided tls.Config when encountering an
|
|
// amqps:// scheme.
|
|
func DialTLS_ExternalAuth(url string, amqps *tls.Config) (*Connection, error) {
|
|
return DialConfig(url, Config{
|
|
Heartbeat: defaultHeartbeat,
|
|
TLSClientConfig: amqps,
|
|
SASL: []Authentication{&ExternalAuth{}},
|
|
})
|
|
}
|
|
|
|
// DialConfig accepts a string in the AMQP URI format and a configuration for
|
|
// the transport and connection setup, returning a new Connection. Defaults to
|
|
// a server heartbeat interval of 10 seconds and sets the initial read deadline
|
|
// to 30 seconds.
|
|
func DialConfig(url string, config Config) (*Connection, error) {
|
|
var err error
|
|
var conn net.Conn
|
|
|
|
uri, err := ParseURI(url)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if config.SASL == nil {
|
|
config.SASL = []Authentication{uri.PlainAuth()}
|
|
}
|
|
|
|
if config.Vhost == "" {
|
|
config.Vhost = uri.Vhost
|
|
}
|
|
|
|
addr := net.JoinHostPort(uri.Host, strconv.FormatInt(int64(uri.Port), 10))
|
|
|
|
dialer := config.Dial
|
|
if dialer == nil {
|
|
dialer = DefaultDial(defaultConnectionTimeout)
|
|
}
|
|
|
|
conn, err = dialer("tcp", addr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if uri.Scheme == "amqps" {
|
|
if config.TLSClientConfig == nil {
|
|
config.TLSClientConfig = new(tls.Config)
|
|
}
|
|
|
|
// If ServerName has not been specified in TLSClientConfig,
|
|
// set it to the URI host used for this connection.
|
|
if config.TLSClientConfig.ServerName == "" {
|
|
config.TLSClientConfig.ServerName = uri.Host
|
|
}
|
|
|
|
client := tls.Client(conn, config.TLSClientConfig)
|
|
if err := client.Handshake(); err != nil {
|
|
|
|
conn.Close()
|
|
return nil, err
|
|
}
|
|
|
|
conn = client
|
|
}
|
|
|
|
return Open(conn, config)
|
|
}
|
|
|
|
/*
|
|
Open accepts an already established connection, or other io.ReadWriteCloser as
|
|
a transport. Use this method if you have established a TLS connection or wish
|
|
to use your own custom transport.
|
|
|
|
*/
|
|
func Open(conn io.ReadWriteCloser, config Config) (*Connection, error) {
|
|
c := &Connection{
|
|
conn: conn,
|
|
writer: &writer{bufio.NewWriter(conn)},
|
|
channels: make(map[uint16]*Channel),
|
|
rpc: make(chan message),
|
|
sends: make(chan time.Time),
|
|
errors: make(chan *Error, 1),
|
|
deadlines: make(chan readDeadliner, 1),
|
|
}
|
|
go c.reader(conn)
|
|
return c, c.open(config)
|
|
}
|
|
|
|
/*
|
|
LocalAddr returns the local TCP peer address, or ":0" (the zero value of net.TCPAddr)
|
|
as a fallback default value if the underlying transport does not support LocalAddr().
|
|
*/
|
|
func (c *Connection) LocalAddr() net.Addr {
|
|
if conn, ok := c.conn.(interface {
|
|
LocalAddr() net.Addr
|
|
}); ok {
|
|
return conn.LocalAddr()
|
|
}
|
|
return &net.TCPAddr{}
|
|
}
|
|
|
|
// ConnectionState returns basic TLS details of the underlying transport.
|
|
// Returns a zero value when the underlying connection does not implement
|
|
// ConnectionState() tls.ConnectionState.
|
|
func (c *Connection) ConnectionState() tls.ConnectionState {
|
|
if conn, ok := c.conn.(interface {
|
|
ConnectionState() tls.ConnectionState
|
|
}); ok {
|
|
return conn.ConnectionState()
|
|
}
|
|
return tls.ConnectionState{}
|
|
}
|
|
|
|
/*
|
|
NotifyClose registers a listener for close events either initiated by an error
|
|
accompanying a connection.close method or by a normal shutdown.
|
|
|
|
The chan provided will be closed when the Channel is closed and on a
|
|
graceful close, no error will be sent.
|
|
|
|
To reconnect after a transport or protocol error, register a listener here and
|
|
re-run your setup process.
|
|
|
|
*/
|
|
func (c *Connection) NotifyClose(receiver chan *Error) chan *Error {
|
|
c.m.Lock()
|
|
defer c.m.Unlock()
|
|
|
|
if c.noNotify {
|
|
close(receiver)
|
|
} else {
|
|
c.closes = append(c.closes, receiver)
|
|
}
|
|
|
|
return receiver
|
|
}
|
|
|
|
/*
|
|
NotifyBlocked registers a listener for RabbitMQ specific TCP flow control
|
|
method extensions connection.blocked and connection.unblocked. Flow control is
|
|
active with a reason when Blocking.Blocked is true. When a Connection is
|
|
blocked, all methods will block across all connections until server resources
|
|
become free again.
|
|
|
|
This optional extension is supported by the server when the
|
|
"connection.blocked" server capability key is true.
|
|
|
|
*/
|
|
func (c *Connection) NotifyBlocked(receiver chan Blocking) chan Blocking {
|
|
c.m.Lock()
|
|
defer c.m.Unlock()
|
|
|
|
if c.noNotify {
|
|
close(receiver)
|
|
} else {
|
|
c.blocks = append(c.blocks, receiver)
|
|
}
|
|
|
|
return receiver
|
|
}
|
|
|
|
/*
|
|
Close requests and waits for the response to close the AMQP connection.
|
|
|
|
It's advisable to use this message when publishing to ensure all kernel buffers
|
|
have been flushed on the server and client before exiting.
|
|
|
|
An error indicates that server may not have received this request to close but
|
|
the connection should be treated as closed regardless.
|
|
|
|
After returning from this call, all resources associated with this connection,
|
|
including the underlying io, Channels, Notify listeners and Channel consumers
|
|
will also be closed.
|
|
*/
|
|
func (c *Connection) Close() error {
|
|
if c.IsClosed() {
|
|
return ErrClosed
|
|
}
|
|
|
|
defer c.shutdown(nil)
|
|
return c.call(
|
|
&connectionClose{
|
|
ReplyCode: replySuccess,
|
|
ReplyText: "kthxbai",
|
|
},
|
|
&connectionCloseOk{},
|
|
)
|
|
}
|
|
|
|
func (c *Connection) closeWith(err *Error) error {
|
|
if c.IsClosed() {
|
|
return ErrClosed
|
|
}
|
|
|
|
defer c.shutdown(err)
|
|
return c.call(
|
|
&connectionClose{
|
|
ReplyCode: uint16(err.Code),
|
|
ReplyText: err.Reason,
|
|
},
|
|
&connectionCloseOk{},
|
|
)
|
|
}
|
|
|
|
// IsClosed returns true if the connection is marked as closed, otherwise false
|
|
// is returned.
|
|
func (c *Connection) IsClosed() bool {
|
|
return (atomic.LoadInt32(&c.closed) == 1)
|
|
}
|
|
|
|
func (c *Connection) send(f frame) error {
|
|
if c.IsClosed() {
|
|
return ErrClosed
|
|
}
|
|
|
|
c.sendM.Lock()
|
|
err := c.writer.WriteFrame(f)
|
|
c.sendM.Unlock()
|
|
|
|
if err != nil {
|
|
// shutdown could be re-entrant from signaling notify chans
|
|
go c.shutdown(&Error{
|
|
Code: FrameError,
|
|
Reason: err.Error(),
|
|
})
|
|
} else {
|
|
// Broadcast we sent a frame, reducing heartbeats, only
|
|
// if there is something that can receive - like a non-reentrant
|
|
// call or if the heartbeater isn't running
|
|
select {
|
|
case c.sends <- time.Now():
|
|
default:
|
|
}
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func (c *Connection) shutdown(err *Error) {
|
|
atomic.StoreInt32(&c.closed, 1)
|
|
|
|
c.destructor.Do(func() {
|
|
c.m.Lock()
|
|
defer c.m.Unlock()
|
|
|
|
if err != nil {
|
|
for _, c := range c.closes {
|
|
c <- err
|
|
}
|
|
}
|
|
|
|
if err != nil {
|
|
c.errors <- err
|
|
}
|
|
// Shutdown handler goroutine can still receive the result.
|
|
close(c.errors)
|
|
|
|
for _, c := range c.closes {
|
|
close(c)
|
|
}
|
|
|
|
for _, c := range c.blocks {
|
|
close(c)
|
|
}
|
|
|
|
// Shutdown the channel, but do not use closeChannel() as it calls
|
|
// releaseChannel() which requires the connection lock.
|
|
//
|
|
// Ranging over c.channels and calling releaseChannel() that mutates
|
|
// c.channels is racy - see commit 6063341 for an example.
|
|
for _, ch := range c.channels {
|
|
ch.shutdown(err)
|
|
}
|
|
|
|
c.conn.Close()
|
|
|
|
c.channels = map[uint16]*Channel{}
|
|
c.allocator = newAllocator(1, c.Config.ChannelMax)
|
|
c.noNotify = true
|
|
})
|
|
}
|
|
|
|
// All methods sent to the connection channel should be synchronous so we
|
|
// can handle them directly without a framing component
|
|
func (c *Connection) demux(f frame) {
|
|
if f.channel() == 0 {
|
|
c.dispatch0(f)
|
|
} else {
|
|
c.dispatchN(f)
|
|
}
|
|
}
|
|
|
|
func (c *Connection) dispatch0(f frame) {
|
|
switch mf := f.(type) {
|
|
case *methodFrame:
|
|
switch m := mf.Method.(type) {
|
|
case *connectionClose:
|
|
// Send immediately as shutdown will close our side of the writer.
|
|
c.send(&methodFrame{
|
|
ChannelId: 0,
|
|
Method: &connectionCloseOk{},
|
|
})
|
|
|
|
c.shutdown(newError(m.ReplyCode, m.ReplyText))
|
|
case *connectionBlocked:
|
|
for _, c := range c.blocks {
|
|
c <- Blocking{Active: true, Reason: m.Reason}
|
|
}
|
|
case *connectionUnblocked:
|
|
for _, c := range c.blocks {
|
|
c <- Blocking{Active: false}
|
|
}
|
|
default:
|
|
c.rpc <- m
|
|
}
|
|
case *heartbeatFrame:
|
|
// kthx - all reads reset our deadline. so we can drop this
|
|
default:
|
|
// lolwat - channel0 only responds to methods and heartbeats
|
|
c.closeWith(ErrUnexpectedFrame)
|
|
}
|
|
}
|
|
|
|
func (c *Connection) dispatchN(f frame) {
|
|
c.m.Lock()
|
|
channel := c.channels[f.channel()]
|
|
c.m.Unlock()
|
|
|
|
if channel != nil {
|
|
channel.recv(channel, f)
|
|
} else {
|
|
c.dispatchClosed(f)
|
|
}
|
|
}
|
|
|
|
// section 2.3.7: "When a peer decides to close a channel or connection, it
|
|
// sends a Close method. The receiving peer MUST respond to a Close with a
|
|
// Close-Ok, and then both parties can close their channel or connection. Note
|
|
// that if peers ignore Close, deadlock can happen when both peers send Close
|
|
// at the same time."
|
|
//
|
|
// When we don't have a channel, so we must respond with close-ok on a close
|
|
// method. This can happen between a channel exception on an asynchronous
|
|
// method like basic.publish and a synchronous close with channel.close.
|
|
// In that case, we'll get both a channel.close and channel.close-ok in any
|
|
// order.
|
|
func (c *Connection) dispatchClosed(f frame) {
|
|
// Only consider method frames, drop content/header frames
|
|
if mf, ok := f.(*methodFrame); ok {
|
|
switch mf.Method.(type) {
|
|
case *channelClose:
|
|
c.send(&methodFrame{
|
|
ChannelId: f.channel(),
|
|
Method: &channelCloseOk{},
|
|
})
|
|
case *channelCloseOk:
|
|
// we are already closed, so do nothing
|
|
default:
|
|
// unexpected method on closed channel
|
|
c.closeWith(ErrClosed)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Reads each frame off the IO and hand off to the connection object that
|
|
// will demux the streams and dispatch to one of the opened channels or
|
|
// handle on channel 0 (the connection channel).
|
|
func (c *Connection) reader(r io.Reader) {
|
|
buf := bufio.NewReader(r)
|
|
frames := &reader{buf}
|
|
conn, haveDeadliner := r.(readDeadliner)
|
|
|
|
for {
|
|
frame, err := frames.ReadFrame()
|
|
|
|
if err != nil {
|
|
c.shutdown(&Error{Code: FrameError, Reason: err.Error()})
|
|
return
|
|
}
|
|
|
|
c.demux(frame)
|
|
|
|
if haveDeadliner {
|
|
select {
|
|
case c.deadlines <- conn:
|
|
default:
|
|
// On c.Close() c.heartbeater() might exit just before c.deadlines <- conn is called.
|
|
// Which results in this goroutine being stuck forever.
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Ensures that at least one frame is being sent at the tuned interval with a
|
|
// jitter tolerance of 1s
|
|
func (c *Connection) heartbeater(interval time.Duration, done chan *Error) {
|
|
const maxServerHeartbeatsInFlight = 3
|
|
|
|
var sendTicks <-chan time.Time
|
|
if interval > 0 {
|
|
ticker := time.NewTicker(interval)
|
|
defer ticker.Stop()
|
|
sendTicks = ticker.C
|
|
}
|
|
|
|
lastSent := time.Now()
|
|
|
|
for {
|
|
select {
|
|
case at, stillSending := <-c.sends:
|
|
// When actively sending, depend on sent frames to reset server timer
|
|
if stillSending {
|
|
lastSent = at
|
|
} else {
|
|
return
|
|
}
|
|
|
|
case at := <-sendTicks:
|
|
// When idle, fill the space with a heartbeat frame
|
|
if at.Sub(lastSent) > interval-time.Second {
|
|
if err := c.send(&heartbeatFrame{}); err != nil {
|
|
// send heartbeats even after close/closeOk so we
|
|
// tick until the connection starts erroring
|
|
return
|
|
}
|
|
}
|
|
|
|
case conn := <-c.deadlines:
|
|
// When reading, reset our side of the deadline, if we've negotiated one with
|
|
// a deadline that covers at least 2 server heartbeats
|
|
if interval > 0 {
|
|
conn.SetReadDeadline(time.Now().Add(maxServerHeartbeatsInFlight * interval))
|
|
}
|
|
|
|
case <-done:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Convenience method to inspect the Connection.Properties["capabilities"]
|
|
// Table for server identified capabilities like "basic.ack" or
|
|
// "confirm.select".
|
|
func (c *Connection) isCapable(featureName string) bool {
|
|
capabilities, _ := c.Properties["capabilities"].(Table)
|
|
hasFeature, _ := capabilities[featureName].(bool)
|
|
return hasFeature
|
|
}
|
|
|
|
// allocateChannel records but does not open a new channel with a unique id.
|
|
// This method is the initial part of the channel lifecycle and paired with
|
|
// releaseChannel
|
|
func (c *Connection) allocateChannel() (*Channel, error) {
|
|
c.m.Lock()
|
|
defer c.m.Unlock()
|
|
|
|
if c.IsClosed() {
|
|
return nil, ErrClosed
|
|
}
|
|
|
|
id, ok := c.allocator.next()
|
|
if !ok {
|
|
return nil, ErrChannelMax
|
|
}
|
|
|
|
ch := newChannel(c, uint16(id))
|
|
c.channels[uint16(id)] = ch
|
|
|
|
return ch, nil
|
|
}
|
|
|
|
// releaseChannel removes a channel from the registry as the final part of the
|
|
// channel lifecycle
|
|
func (c *Connection) releaseChannel(id uint16) {
|
|
c.m.Lock()
|
|
defer c.m.Unlock()
|
|
|
|
delete(c.channels, id)
|
|
c.allocator.release(int(id))
|
|
}
|
|
|
|
// openChannel allocates and opens a channel, must be paired with closeChannel
|
|
func (c *Connection) openChannel() (*Channel, error) {
|
|
ch, err := c.allocateChannel()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := ch.open(); err != nil {
|
|
c.releaseChannel(ch.id)
|
|
return nil, err
|
|
}
|
|
return ch, nil
|
|
}
|
|
|
|
// closeChannel releases and initiates a shutdown of the channel. All channel
|
|
// closures should be initiated here for proper channel lifecycle management on
|
|
// this connection.
|
|
func (c *Connection) closeChannel(ch *Channel, e *Error) {
|
|
ch.shutdown(e)
|
|
c.releaseChannel(ch.id)
|
|
}
|
|
|
|
/*
|
|
Channel opens a unique, concurrent server channel to process the bulk of AMQP
|
|
messages. Any error from methods on this receiver will render the receiver
|
|
invalid and a new Channel should be opened.
|
|
|
|
*/
|
|
func (c *Connection) Channel() (*Channel, error) {
|
|
return c.openChannel()
|
|
}
|
|
|
|
func (c *Connection) call(req message, res ...message) error {
|
|
// Special case for when the protocol header frame is sent insted of a
|
|
// request method
|
|
if req != nil {
|
|
if err := c.send(&methodFrame{ChannelId: 0, Method: req}); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
select {
|
|
case err, ok := <-c.errors:
|
|
if !ok {
|
|
return ErrClosed
|
|
}
|
|
return err
|
|
|
|
case msg := <-c.rpc:
|
|
// Try to match one of the result types
|
|
for _, try := range res {
|
|
if reflect.TypeOf(msg) == reflect.TypeOf(try) {
|
|
// *res = *msg
|
|
vres := reflect.ValueOf(try).Elem()
|
|
vmsg := reflect.ValueOf(msg).Elem()
|
|
vres.Set(vmsg)
|
|
return nil
|
|
}
|
|
}
|
|
return ErrCommandInvalid
|
|
}
|
|
// unreachable
|
|
}
|
|
|
|
// Connection = open-Connection *use-Connection close-Connection
|
|
// open-Connection = C:protocol-header
|
|
// S:START C:START-OK
|
|
// *challenge
|
|
// S:TUNE C:TUNE-OK
|
|
// C:OPEN S:OPEN-OK
|
|
// challenge = S:SECURE C:SECURE-OK
|
|
// use-Connection = *channel
|
|
// close-Connection = C:CLOSE S:CLOSE-OK
|
|
// / S:CLOSE C:CLOSE-OK
|
|
func (c *Connection) open(config Config) error {
|
|
if err := c.send(&protocolHeader{}); err != nil {
|
|
return err
|
|
}
|
|
|
|
return c.openStart(config)
|
|
}
|
|
|
|
func (c *Connection) openStart(config Config) error {
|
|
start := &connectionStart{}
|
|
|
|
if err := c.call(nil, start); err != nil {
|
|
return err
|
|
}
|
|
|
|
c.Major = int(start.VersionMajor)
|
|
c.Minor = int(start.VersionMinor)
|
|
c.Properties = start.ServerProperties
|
|
c.Locales = strings.Split(start.Locales, " ")
|
|
|
|
// eventually support challenge/response here by also responding to
|
|
// connectionSecure.
|
|
auth, ok := pickSASLMechanism(config.SASL, strings.Split(start.Mechanisms, " "))
|
|
if !ok {
|
|
return ErrSASL
|
|
}
|
|
|
|
// Save this mechanism off as the one we chose
|
|
c.Config.SASL = []Authentication{auth}
|
|
|
|
// Set the connection locale to client locale
|
|
c.Config.Locale = config.Locale
|
|
|
|
return c.openTune(config, auth)
|
|
}
|
|
|
|
func (c *Connection) openTune(config Config, auth Authentication) error {
|
|
if len(config.Properties) == 0 {
|
|
config.Properties = Table{
|
|
"product": defaultProduct,
|
|
"version": defaultVersion,
|
|
}
|
|
}
|
|
|
|
config.Properties["capabilities"] = Table{
|
|
"connection.blocked": true,
|
|
"consumer_cancel_notify": true,
|
|
}
|
|
|
|
ok := &connectionStartOk{
|
|
ClientProperties: config.Properties,
|
|
Mechanism: auth.Mechanism(),
|
|
Response: auth.Response(),
|
|
Locale: config.Locale,
|
|
}
|
|
tune := &connectionTune{}
|
|
|
|
if err := c.call(ok, tune); err != nil {
|
|
// per spec, a connection can only be closed when it has been opened
|
|
// so at this point, we know it's an auth error, but the socket
|
|
// was closed instead. Return a meaningful error.
|
|
return ErrCredentials
|
|
}
|
|
|
|
// When the server and client both use default 0, then the max channel is
|
|
// only limited by uint16.
|
|
c.Config.ChannelMax = pick(config.ChannelMax, int(tune.ChannelMax))
|
|
if c.Config.ChannelMax == 0 {
|
|
c.Config.ChannelMax = defaultChannelMax
|
|
}
|
|
c.Config.ChannelMax = min(c.Config.ChannelMax, maxChannelMax)
|
|
|
|
// Frame size includes headers and end byte (len(payload)+8), even if
|
|
// this is less than FrameMinSize, use what the server sends because the
|
|
// alternative is to stop the handshake here.
|
|
c.Config.FrameSize = pick(config.FrameSize, int(tune.FrameMax))
|
|
|
|
// Save this off for resetDeadline()
|
|
c.Config.Heartbeat = time.Second * time.Duration(pick(
|
|
int(config.Heartbeat/time.Second),
|
|
int(tune.Heartbeat)))
|
|
|
|
// "The client should start sending heartbeats after receiving a
|
|
// Connection.Tune method"
|
|
go c.heartbeater(c.Config.Heartbeat/2, c.NotifyClose(make(chan *Error, 1)))
|
|
|
|
if err := c.send(&methodFrame{
|
|
ChannelId: 0,
|
|
Method: &connectionTuneOk{
|
|
ChannelMax: uint16(c.Config.ChannelMax),
|
|
FrameMax: uint32(c.Config.FrameSize),
|
|
Heartbeat: uint16(c.Config.Heartbeat / time.Second),
|
|
},
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
|
|
return c.openVhost(config)
|
|
}
|
|
|
|
func (c *Connection) openVhost(config Config) error {
|
|
req := &connectionOpen{VirtualHost: config.Vhost}
|
|
res := &connectionOpenOk{}
|
|
|
|
if err := c.call(req, res); err != nil {
|
|
// Cannot be closed yet, but we know it's a vhost problem
|
|
return ErrVhost
|
|
}
|
|
|
|
c.Config.Vhost = config.Vhost
|
|
|
|
return c.openComplete()
|
|
}
|
|
|
|
// openComplete performs any final Connection initialization dependent on the
|
|
// connection handshake and clears any state needed for TLS and AMQP handshaking.
|
|
func (c *Connection) openComplete() error {
|
|
// We clear the deadlines and let the heartbeater reset the read deadline if requested.
|
|
// RabbitMQ uses TCP flow control at this point for pushback so Writes can
|
|
// intentionally block.
|
|
if deadliner, ok := c.conn.(interface {
|
|
SetDeadline(time.Time) error
|
|
}); ok {
|
|
_ = deadliner.SetDeadline(time.Time{})
|
|
}
|
|
|
|
c.allocator = newAllocator(1, c.Config.ChannelMax)
|
|
return nil
|
|
}
|
|
|
|
func max(a, b int) int {
|
|
if a > b {
|
|
return a
|
|
}
|
|
return b
|
|
}
|
|
|
|
func min(a, b int) int {
|
|
if a < b {
|
|
return a
|
|
}
|
|
return b
|
|
}
|
|
|
|
func pick(client, server int) int {
|
|
if client == 0 || server == 0 {
|
|
return max(client, server)
|
|
}
|
|
return min(client, server)
|
|
}
|