Commit 95f1eaee by 郭峰

Merge branch 'feature/mqtt-feature' into 'release'

mqtt断线重连,配置修改重新创建连接06

See merge request !21
parents ae7815db 1d4fc2b1
#include "MqttSubscriber.h" #include "MqttSubscriber.h"
#include "mainwindow.h" #include "mainwindow.h"
#include "CameraHandle.h" #include "CameraHandle.h"
MqttSubscriber* MqttSubscriber::instance = nullptr; MqttSubscriber* MqttSubscriber::instance = nullptr;
MqttSubscriber* MqttSubscriber::getInstance(QObject* parent) { MqttSubscriber* MqttSubscriber::getInstance(QObject* parent) {
...@@ -10,54 +11,73 @@ MqttSubscriber* MqttSubscriber::getInstance(QObject* parent) { ...@@ -10,54 +11,73 @@ MqttSubscriber* MqttSubscriber::getInstance(QObject* parent) {
return instance; return instance;
} }
void MqttSubscriber::init(vides_data::MqttConfig &config,QString &httpUrl,QString &serialNumber){ void MqttSubscriber::init(vides_data::MqttConfig &config, QString &httpUrl, QString &serialNumber) {
this->config=config; // 清理之前的客户端资源
this->httpUrl=httpUrl; if (client != nullptr) {
this->serialNumber=serialNumber; MQTTAsync_destroy(&client); // 释放之前的MQTT客户端
QByteArray bAddress = config.address.toUtf8(); client = nullptr; // 确保指针清空
char* cAddress=bAddress.data(); }
// 保存配置信息
this->config = config;
this->httpUrl = httpUrl;
this->serialNumber = serialNumber;
// 初始化MQTT客户端
QByteArray bAddress = config.address.toUtf8();
char* cAddress = bAddress.data();
QByteArray bClientId = config.clientId.toUtf8(); QByteArray bClientId = config.clientId.toUtf8();
char* cClientId=bClientId.data(); char* cClientId = bClientId.data();
MQTTAsync_create(&client,cAddress,cClientId, MQTTCLIENT_PERSISTENCE_NONE, nullptr); MQTTAsync_create(&client, cAddress, cClientId, MQTTCLIENT_PERSISTENCE_NONE, nullptr);
MQTTAsync_setCallbacks(client, this, [](void* context, char* cause) { MQTTAsync_setCallbacks(client, this, [](void* context, char* cause) {
static_cast<MqttSubscriber*>(context)->connectionLost(cause); static_cast<MqttSubscriber*>(context)->connectionLost(cause);
}, [](void* context, char* topicName, int topicLen, MQTTAsync_message* m) { }, [](void* context, char* topicName, int topicLen, MQTTAsync_message* m) {
return static_cast<MqttSubscriber*>(context)->messageArrived(topicName, topicLen, m); return static_cast<MqttSubscriber*>(context)->messageArrived(topicName, topicLen, m);
}, nullptr); }, nullptr);
// 连接信号和槽
connect(this, &MqttSubscriber::connectionLostSignal, this, &MqttSubscriber::reconnectAndFetchConfig,Qt::QueuedConnection);
} }
MqttSubscriber::MqttSubscriber(QObject* parent) MqttSubscriber::MqttSubscriber(QObject* parent)
: QObject(parent), retryTimer(new QTimer(this)) { : QObject(parent), retryTimer(new QTimer(this)) {
// 连接信号和槽
connect(this, &MqttSubscriber::connectionLostSignal, this, &MqttSubscriber::reconnectAndFetchConfig, Qt::QueuedConnection);
retryTimer->setInterval(10000); // 设置重试间隔为10秒 retryTimer->setInterval(10000); // 设置重试间隔为10秒
retryTimer->setSingleShot(true); // 单次触发 retryTimer->setSingleShot(true); // 单次触发
connect(retryTimer, &QTimer::timeout, this, &MqttSubscriber::reconnectAndFetchConfig,Qt::QueuedConnection); connect(retryTimer, &QTimer::timeout, this, &MqttSubscriber::reconnectAndFetchConfig, Qt::QueuedConnection);
} }
MqttSubscriber::~MqttSubscriber() { MqttSubscriber::~MqttSubscriber() {
MQTTAsync_destroy(&client); if (client != nullptr) {
MQTTAsync_destroy(&client);
client = nullptr;
}
instance = nullptr; instance = nullptr;
} }
void MqttSubscriber::start() { void MqttSubscriber::start() {
retryTimer->stop(); // 确保每次开始连接前停止定时器 // 确保每次开始连接前停止定时器
retryTimer->stop();
// 如果客户端已经连接,应该先断开旧的连接
if (client != nullptr) {
MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
int rc = MQTTAsync_disconnect(client, &disc_opts);
if (rc != MQTTASYNC_SUCCESS) {
qInfo() << "旧连接断开失败,返回编码" << rc;
}
}
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
conn_opts.keepAliveInterval = 20; conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1; conn_opts.cleansession = 1;
QByteArray bUsername = config.username.toUtf8(); QByteArray bUsername = config.username.toUtf8();
char* cUsername=bUsername.data(); char* cUsername = bUsername.data();
QByteArray bPassword = config.password.toUtf8(); QByteArray bPassword = config.password.toUtf8();
char* cPassword=bPassword.data(); char* cPassword = bPassword.data();
conn_opts.username = cUsername; conn_opts.username = cUsername;
conn_opts.password = cPassword; conn_opts.password = cPassword;
conn_opts.onSuccess = [](void* context, MQTTAsync_successData* response) { conn_opts.onSuccess = [](void* context, MQTTAsync_successData* response) {
...@@ -67,7 +87,7 @@ void MqttSubscriber::start() { ...@@ -67,7 +87,7 @@ void MqttSubscriber::start() {
static_cast<MqttSubscriber*>(context)->onConnectFailure(response); static_cast<MqttSubscriber*>(context)->onConnectFailure(response);
}; };
conn_opts.context = this; conn_opts.context = this;
int rc; int rc;
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) { if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) {
qInfo() << "启动连接失败,返回编码" << rc; qInfo() << "启动连接失败,返回编码" << rc;
...@@ -84,29 +104,30 @@ void MqttSubscriber::onConnect(MQTTAsync_successData* response) { ...@@ -84,29 +104,30 @@ void MqttSubscriber::onConnect(MQTTAsync_successData* response) {
static_cast<MqttSubscriber*>(context)->onSubscribeFailure(response); static_cast<MqttSubscriber*>(context)->onSubscribeFailure(response);
}; };
opts.context = this; opts.context = this;
QByteArray bTopic = config.topic.toUtf8(); QByteArray bTopic = config.topic.toUtf8();
char* cTopic=bTopic.data(); char* cTopic = bTopic.data();
int rc; int rc;
if ((rc = MQTTAsync_subscribe(client, cTopic, config.qos, &opts)) != MQTTASYNC_SUCCESS) { if ((rc = MQTTAsync_subscribe(client, cTopic, config.qos, &opts)) != MQTTASYNC_SUCCESS) {
qInfo() << "启动订阅失败,返回编码" << rc<<response->token; qInfo() << "启动订阅失败,返回编码" << rc << response->token;
} }
} }
void MqttSubscriber::reconnectAndFetchConfig() { void MqttSubscriber::reconnectAndFetchConfig() {
qInfo() << "重新连接并获取配置 "; qInfo() << "重新连接并获取配置";
Common & instace= Common::getInstance(); Common& instace = Common::getInstance();
vides_data::responseConfig re_config; vides_data::responseConfig re_config;
// 使用 HttpService 从远程云端拉取配置 // 使用 HttpService 从远程云端拉取配置
HttpService httpService(httpUrl); // 替换为实际的远程URL HttpService httpService(httpUrl); // 替换为实际的远程URL
vides_data::response *res = httpService.httpDeviceConfig(serialNumber,re_config); vides_data::response* res = httpService.httpDeviceConfig(serialNumber, re_config);
if (res->code == 0) { if (res->code == 0) {
instace.deleteObj(res); instace.deleteObj(res);
re_config.mqttConfig.clientId=serialNumber; re_config.mqttConfig.clientId = serialNumber;
QString topic = QStringLiteral("/thingshub/%1/device/reply").arg(serialNumber); QString topic = QStringLiteral("/thingshub/%1/device/reply").arg(serialNumber);
re_config.mqttConfig.topic=topic; re_config.mqttConfig.topic = topic;
this->config=re_config.mqttConfig; this->config = re_config.mqttConfig;
this->init(re_config.mqttConfig,httpUrl,serialNumber); this->init(re_config.mqttConfig, httpUrl, serialNumber);
start(); start();
} else { } else {
qInfo() << "配置拉取失败,等待10秒后重试"; qInfo() << "配置拉取失败,等待10秒后重试";
...@@ -118,11 +139,10 @@ void MqttSubscriber::reconnectAndFetchConfig() { ...@@ -118,11 +139,10 @@ void MqttSubscriber::reconnectAndFetchConfig() {
void MqttSubscriber::onConnectFailure(MQTTAsync_failureData* response) { void MqttSubscriber::onConnectFailure(MQTTAsync_failureData* response) {
qInfo() << "连接失败, rc" << (response ? response->code : -1); qInfo() << "连接失败, rc" << (response ? response->code : -1);
retryTimer->start(); retryTimer->start();
} }
void MqttSubscriber::onSubscribe(MQTTAsync_successData* response) { void MqttSubscriber::onSubscribe(MQTTAsync_successData* response) {
qInfo() << "订阅成功"<<response->token; qInfo() << "订阅成功" << response->token;
} }
void MqttSubscriber::onSubscribeFailure(MQTTAsync_failureData* response) { void MqttSubscriber::onSubscribeFailure(MQTTAsync_failureData* response) {
...@@ -135,13 +155,12 @@ void MqttSubscriber::connectionLost(char* cause) { ...@@ -135,13 +155,12 @@ void MqttSubscriber::connectionLost(char* cause) {
qInfo() << "Cause:" << cause; qInfo() << "Cause:" << cause;
} }
emit connectionLostSignal(); // 发出连接丢失信号 emit connectionLostSignal(); // 发出连接丢失信号
} }
int MqttSubscriber::messageArrived(char* topicName, int topicLen, MQTTAsync_message* m) { int MqttSubscriber::messageArrived(char* topicName, int topicLen, MQTTAsync_message* m) {
QString topic(topicName); QString topic(topicName);
QString payload = QString::fromUtf8(reinterpret_cast<const char*>(m->payload), m->payloadlen); QString payload = QString::fromUtf8(reinterpret_cast<const char*>(m->payload), m->payloadlen);
vides_data::responseMqttData response; vides_data::responseMqttData response;
QJsonDocument jsonDoc = QJsonDocument::fromJson(payload.toUtf8()); QJsonDocument jsonDoc = QJsonDocument::fromJson(payload.toUtf8());
if (!jsonDoc.isNull() && jsonDoc.isObject()) { if (!jsonDoc.isNull() && jsonDoc.isObject()) {
...@@ -152,57 +171,53 @@ int MqttSubscriber::messageArrived(char* topicName, int topicLen, MQTTAsync_mess ...@@ -152,57 +171,53 @@ int MqttSubscriber::messageArrived(char* topicName, int topicLen, MQTTAsync_mess
} else { } else {
qInfo() << "Failed to parse JSON payload"; qInfo() << "Failed to parse JSON payload";
} }
int res=-2;
//1开 2关 3 重启 4 GB28181开 5 GB28181关 int res = -2;
CameraHandle*cameraHandle= MainWindow::sp_this->findHandle(response.sn); CameraHandle* cameraHandle = MainWindow::sp_this->findHandle(response.sn);
if(cameraHandle==nullptr){ if (cameraHandle == nullptr) {
qInfo() << "不存在该相机"; qInfo() << "不存在该相机";
res=-1; res = -1;
}else{ } else {
if(response.msg_type==2){ if (response.msg_type == 2) {
res=cameraHandle->deviceShutdown(); res = cameraHandle->deviceShutdown();
} } else if (response.msg_type == 3) {
if(response.msg_type==3){ res = cameraHandle->deviceReboot();
res= cameraHandle->deviceReboot(); } else if (response.msg_type == 4) {
} res = cameraHandle->updateSdkDevStatus(true);
if(response.msg_type==4){ } else if (response.msg_type == 5) {
res=cameraHandle->updateSdkDevStatus(true); res = cameraHandle->updateSdkDevStatus(false);
}
if(response.msg_type==5){
res= cameraHandle->updateSdkDevStatus(false);
} }
} }
vides_data::requestMqttData request; vides_data::requestMqttData request;
request.code =res>=0?0:0x01; request.code = (res >= 0) ? 0 : 0x01;
request.uniq = response.uniq; request.uniq = response.uniq;
request.sn=response.sn ; request.sn = response.sn;
sendSubscriptionConfirmation(request); sendSubscriptionConfirmation(request);
MQTTAsync_freeMessage(&m); MQTTAsync_freeMessage(&m);
MQTTAsync_free(topicName); MQTTAsync_free(topicName);
return 1; return 1;
} }
void MqttSubscriber::sendSubscriptionConfirmation(const vides_data::requestMqttData& response) { void MqttSubscriber::sendSubscriptionConfirmation(const vides_data::requestMqttData& response) {
QString responseTopic = "/thingshub/" + response.sn + "/device/post"; QString responseTopic = "/thingshub/" + response.sn + "/device/post";
QByteArray bResponseTopic = responseTopic.toUtf8(); QByteArray bResponseTopic = responseTopic.toUtf8();
char* cResponseTopic = bResponseTopic.data(); char* cResponseTopic = bResponseTopic.data();
qInfo() << "sendSubscriptionConfirmation"<<cResponseTopic; qInfo() << "sendSubscriptionConfirmation" << cResponseTopic;
// 将 struct 转换成 JSON 格式
QJsonObject json; QJsonObject json;
json["code"] = response.code; json["code"] = response.code;
json["uniq"] = response.uniq; json["uniq"] = response.uniq;
QJsonDocument jsonDoc(json); QJsonDocument jsonDoc(json);
QByteArray payload = jsonDoc.toJson(QJsonDocument::Compact); QByteArray payload = jsonDoc.toJson(QJsonDocument::Compact);
MQTTAsync_message pubmsg = MQTTAsync_message_initializer; MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
pubmsg.payload = const_cast<char*>(payload.data()); pubmsg.payload = const_cast<char*>(payload.data());
pubmsg.payloadlen = payload.size(); pubmsg.payloadlen = payload.size();
pubmsg.qos = config.qos; pubmsg.qos = config.qos;
pubmsg.retained = 0; pubmsg.retained = 0;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
opts.onSuccess = [](void* context, MQTTAsync_successData* response) { opts.onSuccess = [](void* context, MQTTAsync_successData* response) {
static_cast<MqttSubscriber*>(context)->onPublishSuccess(response); static_cast<MqttSubscriber*>(context)->onPublishSuccess(response);
...@@ -211,7 +226,7 @@ void MqttSubscriber::sendSubscriptionConfirmation(const vides_data::requestMqttD ...@@ -211,7 +226,7 @@ void MqttSubscriber::sendSubscriptionConfirmation(const vides_data::requestMqttD
static_cast<MqttSubscriber*>(context)->onPublishFailure(response); static_cast<MqttSubscriber*>(context)->onPublishFailure(response);
}; };
opts.context = this; opts.context = this;
int rc; int rc;
if ((rc = MQTTAsync_sendMessage(client, cResponseTopic, &pubmsg, &opts)) != MQTTASYNC_SUCCESS) { if ((rc = MQTTAsync_sendMessage(client, cResponseTopic, &pubmsg, &opts)) != MQTTASYNC_SUCCESS) {
qInfo() << "发送消息失败,返回编码" << rc; qInfo() << "发送消息失败,返回编码" << rc;
...@@ -219,9 +234,10 @@ void MqttSubscriber::sendSubscriptionConfirmation(const vides_data::requestMqttD ...@@ -219,9 +234,10 @@ void MqttSubscriber::sendSubscriptionConfirmation(const vides_data::requestMqttD
} }
void MqttSubscriber::onPublishSuccess(MQTTAsync_successData* response) { void MqttSubscriber::onPublishSuccess(MQTTAsync_successData* response) {
qInfo() << "消息已成功发布"<<response->token; qInfo() << "消息已成功发布" << response->token;
} }
void MqttSubscriber::onPublishFailure(MQTTAsync_failureData* response) { void MqttSubscriber::onPublishFailure(MQTTAsync_failureData* response) {
qInfo() << "消息发布失败, rc" << (response ? response->code : -1); qInfo() << "消息发布失败, rc" << (response ? response->code : -1);
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment