Commit 1d4fc2b1 by “liusq”

mqtt断线重连,配置修改重新创建连接06

parent 0d1cc66f
#include "MqttSubscriber.h"
#include "mainwindow.h"
#include "CameraHandle.h"
MqttSubscriber* MqttSubscriber::instance = nullptr;
MqttSubscriber* MqttSubscriber::getInstance(QObject* parent) {
......@@ -10,54 +11,73 @@ MqttSubscriber* MqttSubscriber::getInstance(QObject* parent) {
return instance;
}
void MqttSubscriber::init(vides_data::MqttConfig &config,QString &httpUrl,QString &serialNumber){
this->config=config;
this->httpUrl=httpUrl;
this->serialNumber=serialNumber;
QByteArray bAddress = config.address.toUtf8();
char* cAddress=bAddress.data();
void MqttSubscriber::init(vides_data::MqttConfig &config, QString &httpUrl, QString &serialNumber) {
// 清理之前的客户端资源
if (client != nullptr) {
MQTTAsync_destroy(&client); // 释放之前的MQTT客户端
client = nullptr; // 确保指针清空
}
// 保存配置信息
this->config = config;
this->httpUrl = httpUrl;
this->serialNumber = serialNumber;
// 初始化MQTT客户端
QByteArray bAddress = config.address.toUtf8();
char* cAddress = bAddress.data();
QByteArray bClientId = config.clientId.toUtf8();
char* cClientId=bClientId.data();
char* cClientId = bClientId.data();
MQTTAsync_create(&client,cAddress,cClientId, MQTTCLIENT_PERSISTENCE_NONE, nullptr);
MQTTAsync_create(&client, cAddress, cClientId, MQTTCLIENT_PERSISTENCE_NONE, nullptr);
MQTTAsync_setCallbacks(client, this, [](void* context, char* cause) {
static_cast<MqttSubscriber*>(context)->connectionLost(cause);
}, [](void* context, char* topicName, int topicLen, MQTTAsync_message* m) {
return static_cast<MqttSubscriber*>(context)->messageArrived(topicName, topicLen, m);
}, nullptr);
// 连接信号和槽
connect(this, &MqttSubscriber::connectionLostSignal, this, &MqttSubscriber::reconnectAndFetchConfig,Qt::QueuedConnection);
}
MqttSubscriber::MqttSubscriber(QObject* parent)
: QObject(parent), retryTimer(new QTimer(this)) {
// 连接信号和槽
connect(this, &MqttSubscriber::connectionLostSignal, this, &MqttSubscriber::reconnectAndFetchConfig, Qt::QueuedConnection);
retryTimer->setInterval(10000); // 设置重试间隔为10秒
retryTimer->setSingleShot(true); // 单次触发
connect(retryTimer, &QTimer::timeout, this, &MqttSubscriber::reconnectAndFetchConfig,Qt::QueuedConnection);
connect(retryTimer, &QTimer::timeout, this, &MqttSubscriber::reconnectAndFetchConfig, Qt::QueuedConnection);
}
MqttSubscriber::~MqttSubscriber() {
MQTTAsync_destroy(&client);
if (client != nullptr) {
MQTTAsync_destroy(&client);
client = nullptr;
}
instance = nullptr;
}
void MqttSubscriber::start() {
retryTimer->stop(); // 确保每次开始连接前停止定时器
// 确保每次开始连接前停止定时器
retryTimer->stop();
// 如果客户端已经连接,应该先断开旧的连接
if (client != nullptr) {
MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
int rc = MQTTAsync_disconnect(client, &disc_opts);
if (rc != MQTTASYNC_SUCCESS) {
qInfo() << "旧连接断开失败,返回编码" << rc;
}
}
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
QByteArray bUsername = config.username.toUtf8();
char* cUsername=bUsername.data();
char* cUsername = bUsername.data();
QByteArray bPassword = config.password.toUtf8();
char* cPassword=bPassword.data();
char* cPassword = bPassword.data();
conn_opts.username = cUsername;
conn_opts.password = cPassword;
conn_opts.onSuccess = [](void* context, MQTTAsync_successData* response) {
......@@ -67,7 +87,7 @@ void MqttSubscriber::start() {
static_cast<MqttSubscriber*>(context)->onConnectFailure(response);
};
conn_opts.context = this;
int rc;
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) {
qInfo() << "启动连接失败,返回编码" << rc;
......@@ -84,29 +104,30 @@ void MqttSubscriber::onConnect(MQTTAsync_successData* response) {
static_cast<MqttSubscriber*>(context)->onSubscribeFailure(response);
};
opts.context = this;
QByteArray bTopic = config.topic.toUtf8();
char* cTopic=bTopic.data();
char* cTopic = bTopic.data();
int rc;
if ((rc = MQTTAsync_subscribe(client, cTopic, config.qos, &opts)) != MQTTASYNC_SUCCESS) {
qInfo() << "启动订阅失败,返回编码" << rc<<response->token;
qInfo() << "启动订阅失败,返回编码" << rc << response->token;
}
}
void MqttSubscriber::reconnectAndFetchConfig() {
qInfo() << "重新连接并获取配置 ";
Common & instace= Common::getInstance();
qInfo() << "重新连接并获取配置";
Common& instace = Common::getInstance();
vides_data::responseConfig re_config;
// 使用 HttpService 从远程云端拉取配置
HttpService httpService(httpUrl); // 替换为实际的远程URL
vides_data::response *res = httpService.httpDeviceConfig(serialNumber,re_config);
vides_data::response* res = httpService.httpDeviceConfig(serialNumber, re_config);
if (res->code == 0) {
instace.deleteObj(res);
re_config.mqttConfig.clientId=serialNumber;
re_config.mqttConfig.clientId = serialNumber;
QString topic = QStringLiteral("/thingshub/%1/device/reply").arg(serialNumber);
re_config.mqttConfig.topic=topic;
this->config=re_config.mqttConfig;
this->init(re_config.mqttConfig,httpUrl,serialNumber);
re_config.mqttConfig.topic = topic;
this->config = re_config.mqttConfig;
this->init(re_config.mqttConfig, httpUrl, serialNumber);
start();
} else {
qInfo() << "配置拉取失败,等待10秒后重试";
......@@ -118,11 +139,10 @@ void MqttSubscriber::reconnectAndFetchConfig() {
void MqttSubscriber::onConnectFailure(MQTTAsync_failureData* response) {
qInfo() << "连接失败, rc" << (response ? response->code : -1);
retryTimer->start();
}
void MqttSubscriber::onSubscribe(MQTTAsync_successData* response) {
qInfo() << "订阅成功"<<response->token;
qInfo() << "订阅成功" << response->token;
}
void MqttSubscriber::onSubscribeFailure(MQTTAsync_failureData* response) {
......@@ -135,13 +155,12 @@ void MqttSubscriber::connectionLost(char* cause) {
qInfo() << "Cause:" << cause;
}
emit connectionLostSignal(); // 发出连接丢失信号
}
int MqttSubscriber::messageArrived(char* topicName, int topicLen, MQTTAsync_message* m) {
QString topic(topicName);
QString payload = QString::fromUtf8(reinterpret_cast<const char*>(m->payload), m->payloadlen);
vides_data::responseMqttData response;
QJsonDocument jsonDoc = QJsonDocument::fromJson(payload.toUtf8());
if (!jsonDoc.isNull() && jsonDoc.isObject()) {
......@@ -152,57 +171,53 @@ int MqttSubscriber::messageArrived(char* topicName, int topicLen, MQTTAsync_mess
} else {
qInfo() << "Failed to parse JSON payload";
}
int res=-2;
//1开 2关 3 重启 4 GB28181开 5 GB28181关
CameraHandle*cameraHandle= MainWindow::sp_this->findHandle(response.sn);
if(cameraHandle==nullptr){
int res = -2;
CameraHandle* cameraHandle = MainWindow::sp_this->findHandle(response.sn);
if (cameraHandle == nullptr) {
qInfo() << "不存在该相机";
res=-1;
}else{
if(response.msg_type==2){
res=cameraHandle->deviceShutdown();
}
if(response.msg_type==3){
res= cameraHandle->deviceReboot();
}
if(response.msg_type==4){
res=cameraHandle->updateSdkDevStatus(true);
}
if(response.msg_type==5){
res= cameraHandle->updateSdkDevStatus(false);
res = -1;
} else {
if (response.msg_type == 2) {
res = cameraHandle->deviceShutdown();
} else if (response.msg_type == 3) {
res = cameraHandle->deviceReboot();
} else if (response.msg_type == 4) {
res = cameraHandle->updateSdkDevStatus(true);
} else if (response.msg_type == 5) {
res = cameraHandle->updateSdkDevStatus(false);
}
}
vides_data::requestMqttData request;
request.code =res>=0?0:0x01;
request.code = (res >= 0) ? 0 : 0x01;
request.uniq = response.uniq;
request.sn=response.sn ;
request.sn = response.sn;
sendSubscriptionConfirmation(request);
MQTTAsync_freeMessage(&m);
MQTTAsync_free(topicName);
return 1;
}
void MqttSubscriber::sendSubscriptionConfirmation(const vides_data::requestMqttData& response) {
QString responseTopic = "/thingshub/" + response.sn + "/device/post";
QByteArray bResponseTopic = responseTopic.toUtf8();
char* cResponseTopic = bResponseTopic.data();
qInfo() << "sendSubscriptionConfirmation"<<cResponseTopic;
// 将 struct 转换成 JSON 格式
qInfo() << "sendSubscriptionConfirmation" << cResponseTopic;
QJsonObject json;
json["code"] = response.code;
json["uniq"] = response.uniq;
QJsonDocument jsonDoc(json);
QByteArray payload = jsonDoc.toJson(QJsonDocument::Compact);
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
pubmsg.payload = const_cast<char*>(payload.data());
pubmsg.payloadlen = payload.size();
pubmsg.qos = config.qos;
pubmsg.retained = 0;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
opts.onSuccess = [](void* context, MQTTAsync_successData* response) {
static_cast<MqttSubscriber*>(context)->onPublishSuccess(response);
......@@ -211,7 +226,7 @@ void MqttSubscriber::sendSubscriptionConfirmation(const vides_data::requestMqttD
static_cast<MqttSubscriber*>(context)->onPublishFailure(response);
};
opts.context = this;
int rc;
if ((rc = MQTTAsync_sendMessage(client, cResponseTopic, &pubmsg, &opts)) != MQTTASYNC_SUCCESS) {
qInfo() << "发送消息失败,返回编码" << rc;
......@@ -219,9 +234,10 @@ void MqttSubscriber::sendSubscriptionConfirmation(const vides_data::requestMqttD
}
void MqttSubscriber::onPublishSuccess(MQTTAsync_successData* response) {
qInfo() << "消息已成功发布"<<response->token;
qInfo() << "消息已成功发布" << response->token;
}
void MqttSubscriber::onPublishFailure(MQTTAsync_failureData* response) {
qInfo() << "消息发布失败, rc" << (response ? response->code : -1);
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment