Commit b86676af by “liusq”

mqtt断线重连,配置修改重新创建连接07

parent 1d4fc2b1
#include "MqttSubscriber.h" #include "MqttSubscriber.h"
#include "mainwindow.h" #include "mainwindow.h"
#include "CameraHandle.h" #include "CameraHandle.h"
#include "HttpService.h" // 确保包含 HttpService 头文件
#include <QTimer>
MqttSubscriber* MqttSubscriber::instance = nullptr; MqttSubscriber* MqttSubscriber::instance = nullptr;
...@@ -12,10 +14,20 @@ MqttSubscriber* MqttSubscriber::getInstance(QObject* parent) { ...@@ -12,10 +14,20 @@ MqttSubscriber* MqttSubscriber::getInstance(QObject* parent) {
} }
void MqttSubscriber::init(vides_data::MqttConfig &config, QString &httpUrl, QString &serialNumber) { void MqttSubscriber::init(vides_data::MqttConfig &config, QString &httpUrl, QString &serialNumber) {
// 清理之前的客户端资源 // 如果客户端已存在,先断开连接再销毁
if (client != nullptr) { if (client != nullptr) {
MQTTAsync_destroy(&client); // 释放之前的MQTT客户端 if (MQTTAsync_isConnected(client)) {
client = nullptr; // 确保指针清空 // 断开连接
MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
int rc = MQTTAsync_disconnect(client, &disc_opts);
if (rc != MQTTASYNC_SUCCESS) {
qInfo() << "客户端断开连接失败,返回编码" << rc;
}
}
// 销毁现有的MQTT客户端
MQTTAsync_destroy(&client);
client = nullptr; // 重置客户端指针
} }
// 保存配置信息 // 保存配置信息
...@@ -23,13 +35,19 @@ void MqttSubscriber::init(vides_data::MqttConfig &config, QString &httpUrl, QStr ...@@ -23,13 +35,19 @@ void MqttSubscriber::init(vides_data::MqttConfig &config, QString &httpUrl, QStr
this->httpUrl = httpUrl; this->httpUrl = httpUrl;
this->serialNumber = serialNumber; this->serialNumber = serialNumber;
// 初始化MQTT客户端 // 初始化新的MQTT客户端
QByteArray bAddress = config.address.toUtf8(); QByteArray bAddress = config.address.toUtf8();
char* cAddress = bAddress.data(); char* cAddress = bAddress.data();
QByteArray bClientId = config.clientId.toUtf8(); QByteArray bClientId = config.clientId.toUtf8();
char* cClientId = bClientId.data(); char* cClientId = bClientId.data();
MQTTAsync_create(&client, cAddress, cClientId, MQTTCLIENT_PERSISTENCE_NONE, nullptr); int rc = MQTTAsync_create(&client, cAddress, cClientId, MQTTCLIENT_PERSISTENCE_NONE, nullptr);
if (rc != MQTTASYNC_SUCCESS) {
qInfo() << "MQTT客户端创建失败,返回编码" << rc;
return;
}
// 设置回调函数
MQTTAsync_setCallbacks(client, this, [](void* context, char* cause) { MQTTAsync_setCallbacks(client, this, [](void* context, char* cause) {
static_cast<MqttSubscriber*>(context)->connectionLost(cause); static_cast<MqttSubscriber*>(context)->connectionLost(cause);
}, [](void* context, char* topicName, int topicLen, MQTTAsync_message* m) { }, [](void* context, char* topicName, int topicLen, MQTTAsync_message* m) {
...@@ -38,7 +56,7 @@ void MqttSubscriber::init(vides_data::MqttConfig &config, QString &httpUrl, QStr ...@@ -38,7 +56,7 @@ void MqttSubscriber::init(vides_data::MqttConfig &config, QString &httpUrl, QStr
} }
MqttSubscriber::MqttSubscriber(QObject* parent) MqttSubscriber::MqttSubscriber(QObject* parent)
: QObject(parent), retryTimer(new QTimer(this)) { : QObject(parent), retryTimer(new QTimer(this)), client(nullptr) {
// 连接信号和槽 // 连接信号和槽
connect(this, &MqttSubscriber::connectionLostSignal, this, &MqttSubscriber::reconnectAndFetchConfig, Qt::QueuedConnection); connect(this, &MqttSubscriber::connectionLostSignal, this, &MqttSubscriber::reconnectAndFetchConfig, Qt::QueuedConnection);
...@@ -56,18 +74,10 @@ MqttSubscriber::~MqttSubscriber() { ...@@ -56,18 +74,10 @@ MqttSubscriber::~MqttSubscriber() {
} }
void MqttSubscriber::start() { void MqttSubscriber::start() {
// 确保每次开始连接前停止定时器 // 确保定时器停止
retryTimer->stop(); retryTimer->stop();
// 如果客户端已经连接,应该先断开旧的连接 // 设置连接选项
if (client != nullptr) {
MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
int rc = MQTTAsync_disconnect(client, &disc_opts);
if (rc != MQTTASYNC_SUCCESS) {
qInfo() << "旧连接断开失败,返回编码" << rc;
}
}
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
conn_opts.keepAliveInterval = 20; conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1; conn_opts.cleansession = 1;
...@@ -88,12 +98,14 @@ void MqttSubscriber::start() { ...@@ -88,12 +98,14 @@ void MqttSubscriber::start() {
}; };
conn_opts.context = this; conn_opts.context = this;
// 启动连接
int rc; int rc;
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) { if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) {
qInfo() << "启动连接失败,返回编码" << rc; qInfo() << "启动连接失败,返回编码" << rc;
} }
} }
void MqttSubscriber::onConnect(MQTTAsync_successData* response) { void MqttSubscriber::onConnect(MQTTAsync_successData* response) {
retryTimer->stop(); retryTimer->stop();
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
...@@ -128,17 +140,19 @@ void MqttSubscriber::reconnectAndFetchConfig() { ...@@ -128,17 +140,19 @@ void MqttSubscriber::reconnectAndFetchConfig() {
re_config.mqttConfig.topic = topic; re_config.mqttConfig.topic = topic;
this->config = re_config.mqttConfig; this->config = re_config.mqttConfig;
this->init(re_config.mqttConfig, httpUrl, serialNumber); this->init(re_config.mqttConfig, httpUrl, serialNumber);
start(); this->start();
} else { } else {
qInfo() << "配置拉取失败,等待10秒后重试"; qInfo() << "配置拉取失败,等待10秒后重试";
instace.deleteObj(res); instace.deleteObj(res);
retryTimer->start(); // 启动定时器,10秒后重试 QMetaObject::invokeMethod(retryTimer, "start", Qt::QueuedConnection);
} }
} }
void MqttSubscriber::onConnectFailure(MQTTAsync_failureData* response) { void MqttSubscriber::onConnectFailure(MQTTAsync_failureData* response) {
qInfo() << "连接失败, rc" << (response ? response->code : -1); qInfo() << "连接失败, rc" << (response ? response->code : -1);
retryTimer->start(); retryTimer->stop(); // 确保定时器在启动前被停止
// 确保定时器在主线程启动
QMetaObject::invokeMethod(retryTimer, "start", Qt::QueuedConnection);
} }
void MqttSubscriber::onSubscribe(MQTTAsync_successData* response) { void MqttSubscriber::onSubscribe(MQTTAsync_successData* response) {
...@@ -240,4 +254,3 @@ void MqttSubscriber::onPublishSuccess(MQTTAsync_successData* response) { ...@@ -240,4 +254,3 @@ void MqttSubscriber::onPublishSuccess(MQTTAsync_successData* response) {
void MqttSubscriber::onPublishFailure(MQTTAsync_failureData* response) { void MqttSubscriber::onPublishFailure(MQTTAsync_failureData* response) {
qInfo() << "消息发布失败, rc" << (response ? response->code : -1); qInfo() << "消息发布失败, rc" << (response ? response->code : -1);
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment