From c2426ef2172576d0468a8259cac887a53bf0283c Mon Sep 17 00:00:00 2001 From: Shirou WAKAYAMA Date: Mon, 1 Sep 2014 23:31:27 +0900 Subject: [PATCH] implement pubsub command. --- main.go | 61 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ mqtt.go | 3 ++- 2 files changed, 63 insertions(+), 1 deletion(-) diff --git a/main.go b/main.go index cc6fcc5..d0dbe91 100644 --- a/main.go +++ b/main.go @@ -107,6 +107,51 @@ func subscribe(c *cli.Context) { } } +func pubsub(c *cli.Context) { + if c.Bool("d") { + log.SetLevel(log.DebugLevel) + } + opts := NewOption(c) + client, err := connect(c, opts) + if err != nil { + log.Error(err) + os.Exit(1) + } + + qos := c.Int("q") + subtopic := c.String("sub") + if subtopic == "" { + log.Errorf("Please specify sub topic") + os.Exit(1) + } + log.Infof("Sub Topic: %s", subtopic) + pubtopic := c.String("pub") + if pubtopic == "" { + log.Errorf("Please specify pub topic") + os.Exit(1) + } + log.Infof("Pub Topic: %s", pubtopic) + retain := c.Bool("r") + + go func() { + // Read from Stdin + scanner := bufio.NewScanner(os.Stdin) + for scanner.Scan() { + err = client.Publish(pubtopic, []byte(scanner.Text()), qos, retain) + if err != nil { + log.Error(err) + } + + } + }() + + err = client.Subscribe(subtopic, qos) + if err != nil { + log.Error(err) + } + +} + func main() { initFunc() @@ -168,6 +213,16 @@ func main() { }, ) subFlags := commonFlags + pubsubFlags := append(commonFlags, + cli.StringFlag{ + Name: "pub", + Usage: "publish topic", + }, + cli.StringFlag{ + Name: "sub", + Usage: "subscribe topic", + }, + ) app.Commands = []cli.Command{ { @@ -182,6 +237,12 @@ func main() { Flags: subFlags, Action: subscribe, }, + { + Name: "pubsub", + Usage: "subscribe and publish", + Flags: pubsubFlags, + Action: pubsub, + }, } app.Run(os.Args) } diff --git a/mqtt.go b/mqtt.go index 3507dab..bca4cf7 100644 --- a/mqtt.go +++ b/mqtt.go @@ -42,8 +42,9 @@ func (m *MQTTClient) Publish(topic string, payload []byte, qos int, retain bool) } func onMessageReceived(client *MQTT.MqttClient, message MQTT.Message) { - fmt.Print(string(message.Payload())) + fmt.Println(string(message.Payload())) } + func (m *MQTTClient) Subscribe(topic string, qos int) error { topicFilter, err := MQTT.NewTopicFilter(topic, byte(qos)) if err != nil {