mirror of
https://github.com/shirou/mqttcli.git
synced 2025-04-26 13:49:17 +08:00
implement pubsub command.
This commit is contained in:
parent
6d6dd5455e
commit
c2426ef217
61
main.go
61
main.go
@ -107,6 +107,51 @@ func subscribe(c *cli.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
func pubsub(c *cli.Context) {
|
||||
if c.Bool("d") {
|
||||
log.SetLevel(log.DebugLevel)
|
||||
}
|
||||
opts := NewOption(c)
|
||||
client, err := connect(c, opts)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
qos := c.Int("q")
|
||||
subtopic := c.String("sub")
|
||||
if subtopic == "" {
|
||||
log.Errorf("Please specify sub topic")
|
||||
os.Exit(1)
|
||||
}
|
||||
log.Infof("Sub Topic: %s", subtopic)
|
||||
pubtopic := c.String("pub")
|
||||
if pubtopic == "" {
|
||||
log.Errorf("Please specify pub topic")
|
||||
os.Exit(1)
|
||||
}
|
||||
log.Infof("Pub Topic: %s", pubtopic)
|
||||
retain := c.Bool("r")
|
||||
|
||||
go func() {
|
||||
// Read from Stdin
|
||||
scanner := bufio.NewScanner(os.Stdin)
|
||||
for scanner.Scan() {
|
||||
err = client.Publish(pubtopic, []byte(scanner.Text()), qos, retain)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
|
||||
}
|
||||
}()
|
||||
|
||||
err = client.Subscribe(subtopic, qos)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func main() {
|
||||
initFunc()
|
||||
|
||||
@ -168,6 +213,16 @@ func main() {
|
||||
},
|
||||
)
|
||||
subFlags := commonFlags
|
||||
pubsubFlags := append(commonFlags,
|
||||
cli.StringFlag{
|
||||
Name: "pub",
|
||||
Usage: "publish topic",
|
||||
},
|
||||
cli.StringFlag{
|
||||
Name: "sub",
|
||||
Usage: "subscribe topic",
|
||||
},
|
||||
)
|
||||
|
||||
app.Commands = []cli.Command{
|
||||
{
|
||||
@ -182,6 +237,12 @@ func main() {
|
||||
Flags: subFlags,
|
||||
Action: subscribe,
|
||||
},
|
||||
{
|
||||
Name: "pubsub",
|
||||
Usage: "subscribe and publish",
|
||||
Flags: pubsubFlags,
|
||||
Action: pubsub,
|
||||
},
|
||||
}
|
||||
app.Run(os.Args)
|
||||
}
|
||||
|
3
mqtt.go
3
mqtt.go
@ -42,8 +42,9 @@ func (m *MQTTClient) Publish(topic string, payload []byte, qos int, retain bool)
|
||||
}
|
||||
|
||||
func onMessageReceived(client *MQTT.MqttClient, message MQTT.Message) {
|
||||
fmt.Print(string(message.Payload()))
|
||||
fmt.Println(string(message.Payload()))
|
||||
}
|
||||
|
||||
func (m *MQTTClient) Subscribe(topic string, qos int) error {
|
||||
topicFilter, err := MQTT.NewTopicFilter(topic, byte(qos))
|
||||
if err != nil {
|
||||
|
Loading…
x
Reference in New Issue
Block a user