1
0
mirror of https://github.com/mainflux/mainflux.git synced 2025-04-26 13:48:53 +08:00
Signed-off-by: Drasko DRASKOVIC <drasko.draskovic@gmail.com>
This commit is contained in:
Drasko DRASKOVIC 2016-11-04 23:10:13 +01:00
parent c12a88d12f
commit d38ea79dde
11 changed files with 124 additions and 133 deletions

View File

@ -1,37 +1,37 @@
package clients package clients
import ( import (
"fmt"
"strings"
"time"
"log"
"encoding/json" "encoding/json"
"fmt"
"log"
"net/http" "net/http"
"strconv" "strconv"
"strings"
"time"
"github.com/mainflux/mainflux/config"
"github.com/mainflux/mainflux/db" "github.com/mainflux/mainflux/db"
"github.com/mainflux/mainflux/models" "github.com/mainflux/mainflux/models"
"github.com/mainflux/mainflux/config"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/krylovsk/gosenml" "github.com/krylovsk/gosenml"
"gopkg.in/mgo.v2/bson" "gopkg.in/mgo.v2/bson"
mqtt "github.com/eclipse/paho.mqtt.golang"
) )
type ( type (
ChannelWriteStatus struct { ChannelWriteStatus struct {
Nb int Nb int
Str string Str string
} }
MqttConn struct { MqttConn struct {
Opts *mqtt.ClientOptions Opts *mqtt.ClientOptions
Client mqtt.Client Client mqtt.Client
} }
) )
var ( var (
MqttClient mqtt.Client MqttClient mqtt.Client
WriteStatusChannel chan ChannelWriteStatus WriteStatusChannel chan ChannelWriteStatus
) )
@ -44,7 +44,7 @@ var msgHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message)
chanId := s[len(s)-1] chanId := s[len(s)-1]
status := WriteChannel(chanId, msg.Payload()) status := WriteChannel(chanId, msg.Payload())
// Send status to HTTP publisher // Send status to HTTP publisher
WriteStatusChannel <- status WriteStatusChannel <- status
fmt.Println(status) fmt.Println(status)
@ -75,7 +75,6 @@ func (mqc *MqttConn) MqttSub(cfg config.Config) {
WriteStatusChannel = make(chan ChannelWriteStatus) WriteStatusChannel = make(chan ChannelWriteStatus)
} }
/** /**
* WriteChannel() * WriteChannel()
* Generic function that updates the channel value. * Generic function that updates the channel value.

View File

@ -16,25 +16,24 @@ import (
type Config struct { type Config struct {
// HTTP // HTTP
HttpHost string HttpHost string
HttpPort int HttpPort int
// Mongo // Mongo
MongoHost string MongoHost string
MongoPort int MongoPort int
MongoDatabase string MongoDatabase string
// MQTT // MQTT
MqttHost string MqttHost string
MqttPort int MqttPort int
// Influx // Influx
InfluxHost string InfluxHost string
InfluxPort int InfluxPort int
InfluxDatabase string InfluxDatabase string
} }
func (cfg *Config) Parse() { func (cfg *Config) Parse() {
var confFile string var confFile string

View File

@ -12,19 +12,19 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"log" "log"
"time"
"strconv" "strconv"
"time"
"github.com/mainflux/mainflux/clients"
"github.com/mainflux/mainflux/db" "github.com/mainflux/mainflux/db"
"github.com/mainflux/mainflux/models" "github.com/mainflux/mainflux/models"
"github.com/mainflux/mainflux/clients"
"github.com/satori/go.uuid" "github.com/satori/go.uuid"
"gopkg.in/mgo.v2/bson" "gopkg.in/mgo.v2/bson"
"io" "io"
"net/http"
"io/ioutil" "io/ioutil"
"net/http"
"github.com/go-zoo/bone" "github.com/go-zoo/bone"
) )
@ -38,9 +38,9 @@ func CreateChannel(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=utf-8") w.Header().Set("Content-Type", "application/json; charset=utf-8")
data, err := ioutil.ReadAll(r.Body) data, err := ioutil.ReadAll(r.Body)
if err != nil { if err != nil {
panic(err) panic(err)
} }
if len(data) == 0 { if len(data) == 0 {
w.WriteHeader(http.StatusBadRequest) w.WriteHeader(http.StatusBadRequest)
@ -106,7 +106,7 @@ func CreateChannel(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
str := `{"response": "created", "id": "` + c.Id + `"}` str := `{"response": "created", "id": "` + c.Id + `"}`
io.WriteString(w, str) io.WriteString(w, str)
} }
/** /**
@ -127,7 +127,8 @@ func GetChannels(w http.ResponseWriter, r *http.Request) {
// Set default limit to -5 // Set default limit to -5
climit = -100 climit = -100
} else { } else {
climit, err = strconv.Atoi(s); if err != nil { climit, err = strconv.Atoi(s)
if err != nil {
w.WriteHeader(http.StatusBadRequest) w.WriteHeader(http.StatusBadRequest)
str := `{"response": "wrong count limit"}` str := `{"response": "wrong count limit"}`
io.WriteString(w, str) io.WriteString(w, str)
@ -140,7 +141,8 @@ func GetChannels(w http.ResponseWriter, r *http.Request) {
// Set default limit to -5 // Set default limit to -5
vlimit = -100 vlimit = -100
} else { } else {
vlimit, err = strconv.Atoi(s); if err != nil { vlimit, err = strconv.Atoi(s)
if err != nil {
w.WriteHeader(http.StatusBadRequest) w.WriteHeader(http.StatusBadRequest)
str := `{"response": "wrong value limit"}` str := `{"response": "wrong value limit"}`
io.WriteString(w, str) io.WriteString(w, str)
@ -151,8 +153,8 @@ func GetChannels(w http.ResponseWriter, r *http.Request) {
// Query DB // Query DB
results := []models.Channel{} results := []models.Channel{}
if err := Db.C("channels").Find(nil). if err := Db.C("channels").Find(nil).
Select(bson.M{"values": bson.M{"$slice": vlimit}}). Select(bson.M{"values": bson.M{"$slice": vlimit}}).
Sort("-_id").Limit(climit).All(&results); err != nil { Sort("-_id").Limit(climit).All(&results); err != nil {
log.Print(err) log.Print(err)
} }
@ -162,7 +164,7 @@ func GetChannels(w http.ResponseWriter, r *http.Request) {
if err != nil { if err != nil {
log.Print(err) log.Print(err)
} }
io.WriteString(w, string(res)) io.WriteString(w, string(res))
} }
/** /**
@ -177,7 +179,6 @@ func GetChannel(w http.ResponseWriter, r *http.Request) {
id := bone.GetValue(r, "channel_id") id := bone.GetValue(r, "channel_id")
var vlimit int var vlimit int
var err error var err error
s := r.URL.Query().Get("vlimit") s := r.URL.Query().Get("vlimit")
@ -185,7 +186,8 @@ func GetChannel(w http.ResponseWriter, r *http.Request) {
// Set default limit to -5 // Set default limit to -5
vlimit = -5 vlimit = -5
} else { } else {
vlimit, err = strconv.Atoi(s); if err != nil { vlimit, err = strconv.Atoi(s)
if err != nil {
w.WriteHeader(http.StatusBadRequest) w.WriteHeader(http.StatusBadRequest)
str := `{"response": "wrong limit"}` str := `{"response": "wrong limit"}`
io.WriteString(w, str) io.WriteString(w, str)
@ -195,8 +197,8 @@ func GetChannel(w http.ResponseWriter, r *http.Request) {
result := models.Channel{} result := models.Channel{}
if err := Db.C("channels").Find(bson.M{"id": id}). if err := Db.C("channels").Find(bson.M{"id": id}).
Select(bson.M{"values": bson.M{"$slice": vlimit}}). Select(bson.M{"values": bson.M{"$slice": vlimit}}).
One(&result); err != nil { One(&result); err != nil {
log.Print(err) log.Print(err)
w.WriteHeader(http.StatusNotFound) w.WriteHeader(http.StatusNotFound)
str := `{"response": "not found", "id": "` + id + `"}` str := `{"response": "not found", "id": "` + id + `"}`
@ -204,13 +206,12 @@ func GetChannel(w http.ResponseWriter, r *http.Request) {
return return
} }
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
res, err := json.Marshal(result) res, err := json.Marshal(result)
if err != nil { if err != nil {
log.Print(err) log.Print(err)
} }
io.WriteString(w, string(res)) io.WriteString(w, string(res))
} }
/** /**
@ -224,9 +225,9 @@ func UpdateChannel(w http.ResponseWriter, r *http.Request) {
defer Db.Close() defer Db.Close()
data, err := ioutil.ReadAll(r.Body) data, err := ioutil.ReadAll(r.Body)
if err != nil { if err != nil {
panic(err) panic(err)
} }
if len(data) == 0 { if len(data) == 0 {
w.WriteHeader(http.StatusBadRequest) w.WriteHeader(http.StatusBadRequest)
@ -255,7 +256,7 @@ func UpdateChannel(w http.ResponseWriter, r *http.Request) {
// Publish the channel update. // Publish the channel update.
// This will be catched by the MQTT main client (subscribed to all channel topics) // This will be catched by the MQTT main client (subscribed to all channel topics)
// and then written in the DB in the MQTT handler // and then written in the DB in the MQTT handler
token := clients.MqttClient.Publish("mainflux/" + id, 0, false, string(data)) token := clients.MqttClient.Publish("mainflux/"+id, 0, false, string(data))
token.Wait() token.Wait()
// Wait on status from MQTT handler (which executes DB write) // Wait on status from MQTT handler (which executes DB write)
@ -278,7 +279,7 @@ func DeleteChannel(w http.ResponseWriter, r *http.Request) {
id := bone.GetValue(r, "channel_id") id := bone.GetValue(r, "channel_id")
err := Db.C("channels").Remove(bson.M{"id": id}) err := Db.C("channels").Remove(bson.M{"id": id})
if err != nil { if err != nil {
log.Print(err) log.Print(err)
w.WriteHeader(http.StatusNotFound) w.WriteHeader(http.StatusNotFound)
str := `{"response": "not deleted", "id": "` + id + `"}` str := `{"response": "not deleted", "id": "` + id + `"}`
@ -288,7 +289,5 @@ func DeleteChannel(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
str := `{"response": "deleted", "id": "` + id + `"}` str := `{"response": "deleted", "id": "` + id + `"}`
io.WriteString(w, str) io.WriteString(w, str)
} }

View File

@ -21,8 +21,8 @@ import (
"gopkg.in/mgo.v2/bson" "gopkg.in/mgo.v2/bson"
"io" "io"
"net/http"
"io/ioutil" "io/ioutil"
"net/http"
"github.com/go-zoo/bone" "github.com/go-zoo/bone"
) )
@ -35,10 +35,10 @@ func CreateDevice(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=utf-8") w.Header().Set("Content-Type", "application/json; charset=utf-8")
data, err := ioutil.ReadAll(r.Body) data, err := ioutil.ReadAll(r.Body)
if err != nil { if err != nil {
println("HERE") println("HERE")
panic(err) panic(err)
} }
if len(data) == 0 { if len(data) == 0 {
w.WriteHeader(http.StatusBadRequest) w.WriteHeader(http.StatusBadRequest)
@ -53,13 +53,13 @@ func CreateDevice(w http.ResponseWriter, r *http.Request) {
} }
/* /*
if validateJsonSchema("device", body) != true { if validateJsonSchema("device", body) != true {
println("Invalid schema") println("Invalid schema")
w.WriteHeader(http.StatusBadRequest) w.WriteHeader(http.StatusBadRequest)
str := `{"response": "invalid json schema in request"}` str := `{"response": "invalid json schema in request"}`
io.WriteString(w, str) io.WriteString(w, str)
return return
} }
*/ */
// Init new Mongo session // Init new Mongo session
@ -97,7 +97,7 @@ func CreateDevice(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
str := `{"response": "created", "id": "` + d.Id + `"}` str := `{"response": "created", "id": "` + d.Id + `"}`
io.WriteString(w, str) io.WriteString(w, str)
} }
/** /**
@ -119,7 +119,7 @@ func GetDevices(w http.ResponseWriter, r *http.Request) {
if err != nil { if err != nil {
log.Print(err) log.Print(err)
} }
io.WriteString(w, string(res)) io.WriteString(w, string(res))
} }
/** /**
@ -152,7 +152,7 @@ func GetDevice(w http.ResponseWriter, r *http.Request) {
if err != nil { if err != nil {
log.Print(err) log.Print(err)
} }
io.WriteString(w, string(res)) io.WriteString(w, string(res))
} }
/** /**
@ -162,10 +162,10 @@ func UpdateDevice(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=utf-8") w.Header().Set("Content-Type", "application/json; charset=utf-8")
data, err := ioutil.ReadAll(r.Body) data, err := ioutil.ReadAll(r.Body)
if err != nil { if err != nil {
println("HERE") println("HERE")
panic(err) panic(err)
} }
if len(data) == 0 { if len(data) == 0 {
w.WriteHeader(http.StatusBadRequest) w.WriteHeader(http.StatusBadRequest)
@ -180,16 +180,15 @@ func UpdateDevice(w http.ResponseWriter, r *http.Request) {
} }
/* /*
if validateJsonSchema("device", body) != true { if validateJsonSchema("device", body) != true {
println("Invalid schema") println("Invalid schema")
w.WriteHeader(http.StatusBadRequest) w.WriteHeader(http.StatusBadRequest)
str := `{"response": "invalid json schema in request"}` str := `{"response": "invalid json schema in request"}`
io.WriteString(w, str) io.WriteString(w, str)
return return
} }
*/ */
Db := db.MgoDb{} Db := db.MgoDb{}
Db.Init() Db.Init()
defer Db.Close() defer Db.Close()
@ -254,5 +253,5 @@ func DeleteDevice(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
str := `{"response": "deleted", "id": "` + id + `"}` str := `{"response": "deleted", "id": "` + id + `"}`
io.WriteString(w, str) io.WriteString(w, str)
} }

View File

@ -21,5 +21,5 @@ func GetStatus(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=utf-8") w.Header().Set("Content-Type", "application/json; charset=utf-8")
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
str := `{"running": true}` str := `{"running": true}`
io.WriteString(w, str) io.WriteString(w, str)
} }

12
main.go
View File

@ -9,17 +9,17 @@
package main package main
import ( import (
"flag"
"fmt" "fmt"
"os" "github.com/fatih/color"
"strings" "github.com/mainflux/mainflux/clients"
"strconv"
"github.com/mainflux/mainflux/config" "github.com/mainflux/mainflux/config"
"github.com/mainflux/mainflux/db" "github.com/mainflux/mainflux/db"
"github.com/mainflux/mainflux/servers" "github.com/mainflux/mainflux/servers"
"github.com/mainflux/mainflux/clients" "os"
"github.com/fatih/color"
"runtime" "runtime"
"flag" "strconv"
"strings"
) )
type MainfluxLite struct { type MainfluxLite struct {

View File

@ -14,19 +14,19 @@ import (
type ( type (
Channel struct { Channel struct {
Id string `json:"id"` Id string `json:"id"`
Device string `json:"device"` Device string `json:"device"`
// Name is optional. If present, it is pre-pended to `bn` member of SenML. // Name is optional. If present, it is pre-pended to `bn` member of SenML.
Name string `json:"name"` Name string `json:"name"`
// Unit is optional. If present, it is pre-pended to `bu` member of SenML. // Unit is optional. If present, it is pre-pended to `bu` member of SenML.
Unit string `json:"unit"` Unit string `json:"unit"`
Values []gosenml.Entry `json:"values"` Values []gosenml.Entry `json:"values"`
Created string `json:"created"` Created string `json:"created"`
Updated string `json:"updated"` Updated string `json:"updated"`
Metadata map[string]interface{} `json:"metadata"` Metadata map[string]interface{} `json:"metadata"`
} }
) )

View File

@ -13,17 +13,17 @@ type (
Id string `json:"id"` Id string `json:"id"`
Name string `json:"name"` Name string `json:"name"`
Description string `json:"description"` Description string `json:"description"`
Online bool `json:"online"` Online bool `json:"online"`
ConnectedAt string `json:"connected_at"` ConnectedAt string `json:"connected_at"`
DisonnectedAt string `json:"disconnected_at"` DisonnectedAt string `json:"disconnected_at"`
Channels []Channel `json:"channels"` Channels []Channel `json:"channels"`
Created string `json:"created"` Created string `json:"created"`
Updated string `json:"updated"` Updated string `json:"updated"`
Metadata map[string]interface{} `json:"metadata"` Metadata map[string]interface{} `json:"metadata"`
} }
) )

22
opts.go
View File

@ -10,15 +10,15 @@ package main
// Options block for gnatsd server. // Options block for gnatsd server.
type Options struct { type Options struct {
Host string Host string
Port int Port int
Trace bool Trace bool
Debug bool Debug bool
MaxConn int MaxConn int
Logtime bool Logtime bool
Authorization string Authorization string
Username string Username string
Password string Password string
PidFile string PidFile string
LogFile string LogFile string
} }

View File

@ -9,17 +9,15 @@
package servers package servers
import ( import (
"strconv"
"net/http" "net/http"
"strconv"
"github.com/mainflux/mainflux/controllers"
"github.com/mainflux/mainflux/config" "github.com/mainflux/mainflux/config"
"github.com/mainflux/mainflux/controllers"
"github.com/go-zoo/bone" "github.com/go-zoo/bone"
) )
func HttpServer(cfg config.Config) { func HttpServer(cfg config.Config) {
mux := bone.New() mux := bone.New()
@ -51,7 +49,7 @@ func HttpServer(cfg config.Config) {
/** /**
* Server * Server
*/ */
http.ListenAndServe(cfg.HttpHost + ":" + strconv.Itoa(cfg.HttpPort), mux) http.ListenAndServe(cfg.HttpHost+":"+strconv.Itoa(cfg.HttpPort), mux)
// Use following to start HTTPS server on the same port // Use following to start HTTPS server on the same port
//iris.ListenTLS(cfg.HttpHost + ":" + strconv.Itoa(cfg.HttpPort), "tls/mainflux.crt", "tls/mainflux.key") //iris.ListenTLS(cfg.HttpHost + ":" + strconv.Itoa(cfg.HttpPort), "tls/mainflux.crt", "tls/mainflux.key")

View File

@ -9,12 +9,12 @@
package servers package servers
import ( import (
"log"
"net/http"
"net/http/httptest"
"os"
"testing" "testing"
"time" "time"
"log"
"os"
"net/http"
"net/http/httptest"
"github.com/mainflux/mainflux/config" "github.com/mainflux/mainflux/config"
"github.com/mainflux/mainflux/controllers" "github.com/mainflux/mainflux/controllers"
@ -24,10 +24,9 @@ import (
"gopkg.in/mgo.v2" "gopkg.in/mgo.v2"
) )
func TestMain(m *testing.M) { func TestMain(m *testing.M) {
// We are in testing - notify the program // We are in testing - notify the program
// so that it is not confused if some other commad line // so that it is not confused if some other commad line
// arguments come in - for example when test is started with `go test -v ./...` // arguments come in - for example when test is started with `go test -v ./...`
// which is what Travis does // which is what Travis does
os.Setenv("TEST_ENV", "1") os.Setenv("TEST_ENV", "1")
@ -75,31 +74,29 @@ func TestServer(t *testing.T) {
var cfg config.Config var cfg config.Config
cfg.Parse() cfg.Parse()
// Create a request to pass to our handler. We don't have any query parameters for now, so we'll // Create a request to pass to our handler. We don't have any query parameters for now, so we'll
// pass 'nil' as the third parameter. // pass 'nil' as the third parameter.
req, err := http.NewRequest("GET", "/status", nil) req, err := http.NewRequest("GET", "/status", nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
// We create a ResponseRecorder (which satisfies http.ResponseWriter) to record the response. // We create a ResponseRecorder (which satisfies http.ResponseWriter) to record the response.
rr := httptest.NewRecorder() rr := httptest.NewRecorder()
handler := http.HandlerFunc(controllers.GetStatus) handler := http.HandlerFunc(controllers.GetStatus)
// Our handlers satisfy http.Handler, so we can call their ServeHTTP method // Our handlers satisfy http.Handler, so we can call their ServeHTTP method
// directly and pass in our Request and ResponseRecorder. // directly and pass in our Request and ResponseRecorder.
handler.ServeHTTP(rr, req) handler.ServeHTTP(rr, req)
// Check the status code is what we expect. // Check the status code is what we expect.
if status := rr.Code; status != http.StatusOK { if status := rr.Code; status != http.StatusOK {
t.Errorf("handler returned wrong status code: got %v want %v", status, http.StatusOK) t.Errorf("handler returned wrong status code: got %v want %v", status, http.StatusOK)
} }
// Check the response body is what we expect. // Check the response body is what we expect.
expected := `{"running": true}` expected := `{"running": true}`
if rr.Body.String() != expected { if rr.Body.String() != expected {
t.Errorf("handler returned unexpected body: got %v want %v", rr.Body.String(), expected) t.Errorf("handler returned unexpected body: got %v want %v", rr.Body.String(), expected)
} }
} }