博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
mosquitto简单应用
阅读量:7128 次
发布时间:2019-06-28

本文共 8886 字,大约阅读时间需要 29 分钟。

1. 简述

一款实现了消息推送协议 MQTT v3.1 的开源消息代理软件,提供轻量级的,支持可发布/可订阅的的消息推送模式,使设备对设备之间的短消息通信变得简单,比如现在应用广泛的低功耗传感器,手机、嵌入式计算机、微型控制器等移动设备。一个典型的应用案例就是 Andy Stanford-ClarkMosquitto(MQTT协议创始人之一)在家中实现的远程监控和自动化,并在 OggCamp 的演讲上,对MQTT协议进行详细阐述。

官网:

  

  

 IBM的一个server(broker)示例用法说明

2. 函数

mosquitto结构体:

struct mosquitto;
struct mosquitto_message{    int mid;    char *topic;    void *payload;    int payloadlen;    int qos;    bool retain;};

 mosquitto支持推送和订阅消息模式:

/*  * Function: mosquitto_publish * * Publish a message on a given topic. *  * Parameters: *  mosq -       a valid mosquitto instance. *  mid -        pointer to an int. If not NULL, the function will set this *               to the message id of this particular message. This can be then *               used with the publish callback to determine when the message *               has been sent. *               Note that although the MQTT protocol doesn't use message ids *               for messages with QoS=0, libmosquitto assigns them message ids *               so they can be tracked with this parameter. *  topic -      null terminated string of the topic to publish to. *  payloadlen - the size of the payload (bytes). Valid values are between 0 and *               268,435,455. *  payload -    pointer to the data to send. If payloadlen > 0 this must be a *               valid memory location. *  qos -        integer value 0, 1 or 2 indicating the Quality of Service to be *               used for the message. *  retain -     set to true to make the message retained. * * Returns: *  MOSQ_ERR_SUCCESS -      on success. *  MOSQ_ERR_INVAL -        if the input parameters were invalid. *  MOSQ_ERR_NOMEM -        if an out of memory condition occurred. *  MOSQ_ERR_NO_CONN -      if the client isn't connected to a broker. *  MOSQ_ERR_PROTOCOL -     if there is a protocol error communicating with the *                          broker. *  MOSQ_ERR_PAYLOAD_SIZE - if payloadlen is too large. * * See Also:  *  
*/libmosq_EXPORT int mosquitto_publish(struct mosquitto *mosq, int *mid, const char *topic, int payloadlen, const void *payload, int qos, bool retain);
/* * Function: mosquitto_subscribe * * Subscribe to a topic. * * Parameters: *  mosq - a valid mosquitto instance. *  mid -  a pointer to an int. If not NULL, the function will set this to *         the message id of this particular message. This can be then used *         with the subscribe callback to determine when the message has been *         sent. *  sub -  the subscription pattern. *  qos -  the requested Quality of Service for this subscription. * * Returns: *  MOSQ_ERR_SUCCESS - on success. *  MOSQ_ERR_INVAL -   if the input parameters were invalid. *  MOSQ_ERR_NOMEM -   if an out of memory condition occurred. *  MOSQ_ERR_NO_CONN - if the client isn't connected to a broker. */libmosq_EXPORT int mosquitto_subscribe(struct mosquitto *mosq, int *mid, const char *sub, int qos);/* * Function: mosquitto_unsubscribe * * Unsubscribe from a topic. * * Parameters: *  mosq - a valid mosquitto instance. *  mid -  a pointer to an int. If not NULL, the function will set this to *         the message id of this particular message. This can be then used *         with the unsubscribe callback to determine when the message has been *         sent. *  sub -  the unsubscription pattern. * * Returns: *  MOSQ_ERR_SUCCESS - on success. *  MOSQ_ERR_INVAL -   if the input parameters were invalid. *  MOSQ_ERR_NOMEM -   if an out of memory condition occurred. *  MOSQ_ERR_NO_CONN - if the client isn't connected to a broker. */libmosq_EXPORT int mosquitto_unsubscribe(struct mosquitto *mosq, int *mid, const char *sub);

一般使用流程如下:

mosquitto_lib_init();mosq=mosquitto_new();mosquitto_connect_callback_set();  ----mosquitto_subscribe();mosquitto_disconnect_callback_set();mosquitto_message_callback_set();  ----接收解析消息 并推送mosquitto_publish()mosquitto_username_pw_set(mosq, "user", "pw");mosquitto_connect();mosquitto_loop(mosq, timeout, 1);  // 需要不断循环判断, 也可根据需要永远运行:mosquitto_loop_forever(mosq, timeout, 1)mosquitto_publish();mosquitto_destroy(mosq);mosquitto_lib_cleanup();

3. 应用

一般使用流程:

0. 依赖库安装

apt-get install openssl

apt-get install libssl-dev

apt-get install uuid-dev

1. 编译安装make make install2. 创建mosquitto用户mosquitto默认以mosquitto用户启动groupadd mosquittouseradd -g mosquitto mosquitto3. 配置文件修改根据需求修改配置文件/etc/mosquitto/mosquitto.conf 一般不修改直接可用,本机所有IP都可达,外部访问本机IP可达。4. 启动mosquitto -c /etc/mosquitto/mosquitto.conf -d5. 测试mosquitto_sub -t wang/mingmosquitto_pub -m "hello" mosquitto_pub -h 172.16.1.20 -p 1883 -t data_topic -m "hello_wang"

mosquitto是一个server broker,可直接运行测试客户端。

manpage上一个示例:

#include 
#include
void my_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message){ if(message->payloadlen){ printf("%s %s\n", message->topic, message->payload); }else{ printf("%s (null)\n", message->topic); } fflush(stdout); } void my_connect_callback(struct mosquitto *mosq, void *userdata, int result) { int i; if(!result){ /* Subscribe to broker information topics on successful connect. */ mosquitto_subscribe(mosq, NULL, "$SYS/#", 2); }else{ fprintf(stderr, "Connect failed\n"); } } void my_subscribe_callback(struct mosquitto *mosq, void *userdata, int mid, int qos_count, const int *granted_qos) { int i; printf("Subscribed (mid: %d): %d", mid, granted_qos[0]); for(i=1; i

 自测程序

#include 
#include
#include
#include
#include
char peerid[] = "wangq";char host[] = "127.0.0.1";int port = 1883;int keepalive = 60;bool clean_session = true;struct mosquitto *mosq = NULL;pthread_t pmosid = 0;void my_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message){ if(message->payloadlen){ printf("====>recv:%s %s\n", message->topic, message->payload); }else{ printf("%s (null)\n", message->topic); } mosquitto_publish(mosq, NULL, "wang/result", sizeof("loveresult"), "loveresult", 2, false); sleep(2); fflush(stdout);}void my_connect_callback(struct mosquitto *mosq, void *userdata, int result){ if(!result){ /* Subscribe to broker information topics on successful connect. */ //mosquitto_subscribe(mosq, NULL, "$SYS/broker/uptime", 2); mosquitto_subscribe(mosq, NULL, "wang/hua", 1); }else{ fprintf(stderr, "Connect failed\n"); }}void my_subscribe_callback(struct mosquitto *mosq, void *userdata, int mid, int qos_count, const int *granted_qos){ int i; printf("Subscribed (mid: %d): %d", mid, granted_qos[0]); for(i=1; i
log:%s\n", str);}void mos_init(){ mosquitto_lib_init(); mosq = mosquitto_new(peerid, clean_session, NULL); if(!mosq){ fprintf(stderr, "Error: Out of memory.\n"); exit(-1); } mosquitto_log_callback_set(mosq, my_log_callback); mosquitto_connect_callback_set(mosq, my_connect_callback); mosquitto_message_callback_set(mosq, my_message_callback); mosquitto_subscribe_callback_set(mosq, my_subscribe_callback); mosquitto_will_set(mosq,"xiao/ming", sizeof("livewill"), "livewill", 2, false); mosquitto_threaded_set(mosq, 1);}void * pthread_mos(void *arg){ int toserver = -1; int timeout = 0; while(toserver){ toserver = mosquitto_connect(mosq, host, port, keepalive); if(toserver){ timeout++; fprintf(stderr, "Unable to connect server [%d] times.\n", timeout); if(timeout > 3){ fprintf(stderr, "Unable to connect server, exit.\n" ); pthread_exit(NULL); } sleep(10); } } mosquitto_loop_forever(mosq, -1, 1); mosquitto_disconnect(mosq); mosquitto_destroy(mosq); mosquitto_lib_cleanup(); pthread_exit(NULL);}int main(int argc, char *argv[]){ int ret = 0; mos_init(); ret = pthread_create(&pmosid, NULL, pthread_mos, NULL); if(ret != 0){ printf("create pthread mos error.\n"); exit(-1); } pthread_detach(pmosid); while(1){ mosquitto_publish(mosq, NULL, "wang/ming", sizeof("love"), "love", 2, false); sleep(2); }/* mosquitto_loop_forever(mosq, -1, 1); while(1){ mosquitto_publish(mosq, NULL, "wang/qin/hua", sizeof("love"), "love", 2, false); sleep(2); }*/ return 0;}

 运行结果:

#./a.out====>log:Client wangq sending CONNECT====>log:Client wangq received CONNACK (0)====>log:Client wangq sending SUBSCRIBE (Mid: 2, Topic: wang/hua, QoS: 1)====>log:Client wangq sending PUBLISH (d1, q2, r0, m1, 'wang/ming', ... (5 bytes))====>log:Client wangq received SUBACKSubscribed (mid: 2): 1====>log:Client wangq received PUBREC (Mid: 1)====>log:Client wangq sending PUBREL (Mid: 1)====>log:Client wangq received PUBCOMP (Mid: 1)====>log:Client wangq sending PUBLISH (d0, q2, r0, m3, 'wang/ming', ... (5 bytes))====>log:Client wangq received PUBREC (Mid: 3)====>log:Client wangq sending PUBREL (Mid: 3)====>log:Client wangq received PUBCOMP (Mid: 3)

订阅:

#mosquitto_sub -t wang/minglovelove

will订阅:

#mosquitto_sub -t xiao/ming livewill

 

参考:

1. 

2. 

转载地址:http://kehel.baihongyu.com/

你可能感兴趣的文章
数据库系统原理中的基本概念
查看>>
我的友情链接
查看>>
Windows sever 2008 R2 ---虚拟机安装
查看>>
PC 加入AD域的要求
查看>>
B-tree vs B+tree
查看>>
Eclipse中10个最有用的快捷键组合
查看>>
LAMP一键安装脚本
查看>>
vsphere层级架构
查看>>
我的友情链接
查看>>
我的友情链接
查看>>
转载-Linux新人必读,Linux发行版选择和软件安装的一些原则性问题
查看>>
Linux从入门到精通之监控软件Cacti
查看>>
徹底解決 Windows Server 2012 R2 惱人的輸入法問題
查看>>
linux 手动整理内存
查看>>
Android 向右滑动销毁(finish)Activity, 随着手势的滑动而滑动的效果
查看>>
Vue.js 自定义指令
查看>>
打开Nginx的rewrite日志
查看>>
[李景山php]每天laravel-20161121|StatusCommand.php
查看>>
通过Rancher部署并扩容Kubernetes集群基础篇二
查看>>
zabbix监控WEB页面及告警 实战
查看>>