Commit bbc9b471 by “liusq”

mqtt断线重连,配置修改重新创建连接

parent ed7719b4
...@@ -556,6 +556,7 @@ vides_data::response *HttpService::httpDeviceConfig(const QString &serialNumber, ...@@ -556,6 +556,7 @@ vides_data::response *HttpService::httpDeviceConfig(const QString &serialNumber,
config.mqttConfig.timeout = mqttConfigObj["timeout"].toVariant().toULongLong(); config.mqttConfig.timeout = mqttConfigObj["timeout"].toVariant().toULongLong();
config.mqttConfig.username=mqttConfigObj["username"].toString(); config.mqttConfig.username=mqttConfigObj["username"].toString();
config.mqttConfig.password=mqttConfigObj["password"].toString(); config.mqttConfig.password=mqttConfigObj["password"].toString();
config.mqttConfig.updateAt=mqttConfigObj["updateAt"].toVariant().toULongLong();
resp->msg=map["message"].toString(); resp->msg=map["message"].toString();
}else{ }else{
......
...@@ -3,30 +3,41 @@ ...@@ -3,30 +3,41 @@
#include "CameraHandle.h" #include "CameraHandle.h"
MqttSubscriber* MqttSubscriber::instance = nullptr; MqttSubscriber* MqttSubscriber::instance = nullptr;
MqttSubscriber* MqttSubscriber::getInstance(vides_data::MqttConfig& config, QObject* parent) { MqttSubscriber* MqttSubscriber::getInstance(QObject* parent) {
if (!instance) { if (!instance) {
instance = new MqttSubscriber(config, parent); instance = new MqttSubscriber(parent);
} }
return instance; return instance;
} }
MqttSubscriber::MqttSubscriber(vides_data::MqttConfig& config, QObject* parent) void MqttSubscriber::init(vides_data::MqttConfig &config,QString &httpUrl,QString &serialNumber){
: QObject(parent), config(config) { this->config=config;
this->httpUrl=httpUrl;
this->serialNumber=serialNumber;
QByteArray bAddress = config.address.toUtf8(); QByteArray bAddress = config.address.toUtf8();
char* cAddress=bAddress.data(); char* cAddress=bAddress.data();
QByteArray bClientId = config.clientId.toUtf8(); QByteArray bClientId = config.clientId.toUtf8();
char* cClientId=bClientId.data(); char* cClientId=bClientId.data();
MQTTAsync_create(&client,cAddress,cClientId, MQTTCLIENT_PERSISTENCE_NONE, nullptr); MQTTAsync_create(&client,cAddress,cClientId, MQTTCLIENT_PERSISTENCE_NONE, nullptr);
MQTTAsync_setCallbacks(client, this, [](void* context, char* cause) { MQTTAsync_setCallbacks(client, this, [](void* context, char* cause) {
static_cast<MqttSubscriber*>(context)->connectionLost(cause); static_cast<MqttSubscriber*>(context)->connectionLost(cause);
}, [](void* context, char* topicName, int topicLen, MQTTAsync_message* m) { }, [](void* context, char* topicName, int topicLen, MQTTAsync_message* m) {
return static_cast<MqttSubscriber*>(context)->messageArrived(topicName, topicLen, m); return static_cast<MqttSubscriber*>(context)->messageArrived(topicName, topicLen, m);
}, nullptr); }, nullptr);
// 连接信号和槽
connect(this, &MqttSubscriber::connectionLostSignal, this, &MqttSubscriber::reconnectAndFetchConfig);
}
MqttSubscriber::MqttSubscriber(QObject* parent)
: QObject(parent), retryTimer(new QTimer(this)) {
retryTimer->setInterval(10000); // 设置重试间隔为10秒
retryTimer->setSingleShot(true); // 单次触发
connect(retryTimer, &QTimer::timeout, this, &MqttSubscriber::reconnectAndFetchConfig);
} }
MqttSubscriber::~MqttSubscriber() { MqttSubscriber::~MqttSubscriber() {
...@@ -35,6 +46,7 @@ MqttSubscriber::~MqttSubscriber() { ...@@ -35,6 +46,7 @@ MqttSubscriber::~MqttSubscriber() {
} }
void MqttSubscriber::start() { void MqttSubscriber::start() {
retryTimer->stop(); // 确保每次开始连接前停止定时器
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
conn_opts.keepAliveInterval = 20; conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1; conn_opts.cleansession = 1;
...@@ -63,6 +75,7 @@ void MqttSubscriber::start() { ...@@ -63,6 +75,7 @@ void MqttSubscriber::start() {
} }
void MqttSubscriber::onConnect(MQTTAsync_successData* response) { void MqttSubscriber::onConnect(MQTTAsync_successData* response) {
retryTimer->stop();
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
opts.onSuccess = [](void* context, MQTTAsync_successData* response) { opts.onSuccess = [](void* context, MQTTAsync_successData* response) {
static_cast<MqttSubscriber*>(context)->onSubscribe(response); static_cast<MqttSubscriber*>(context)->onSubscribe(response);
...@@ -80,8 +93,32 @@ void MqttSubscriber::onConnect(MQTTAsync_successData* response) { ...@@ -80,8 +93,32 @@ void MqttSubscriber::onConnect(MQTTAsync_successData* response) {
} }
} }
void MqttSubscriber::reconnectAndFetchConfig() {
Common & instace= Common::getInstance();
vides_data::responseConfig re_config;
// 使用 HttpService 从远程云端拉取配置
HttpService httpService(httpUrl); // 替换为实际的远程URL
vides_data::response *res = httpService.httpDeviceConfig(serialNumber,re_config);
if (res->code != 0) {
instace.deleteObj(res);
re_config.mqttConfig.clientId=serialNumber;
QString topic = QStringLiteral("/thingshub/%1/device/reply").arg(serialNumber);
re_config.mqttConfig.topic=topic;
this->config=re_config.mqttConfig;
init(re_config.mqttConfig,httpUrl,serialNumber);
start();
} else {
qInfo() << "配置拉取失败,等待10秒后重试";
instace.deleteObj(res);
retryTimer->start(); // 启动定时器,10秒后重试
}
}
void MqttSubscriber::onConnectFailure(MQTTAsync_failureData* response) { void MqttSubscriber::onConnectFailure(MQTTAsync_failureData* response) {
qInfo() << "连接失败, rc" << (response ? response->code : -1); qInfo() << "连接失败, rc" << (response ? response->code : -1);
retryTimer->start();
} }
void MqttSubscriber::onSubscribe(MQTTAsync_successData* response) { void MqttSubscriber::onSubscribe(MQTTAsync_successData* response) {
...@@ -97,6 +134,8 @@ void MqttSubscriber::connectionLost(char* cause) { ...@@ -97,6 +134,8 @@ void MqttSubscriber::connectionLost(char* cause) {
if (cause) { if (cause) {
qInfo() << "Cause:" << cause; qInfo() << "Cause:" << cause;
} }
emit connectionLostSignal(); // 发出连接丢失信号
} }
int MqttSubscriber::messageArrived(char* topicName, int topicLen, MQTTAsync_message* m) { int MqttSubscriber::messageArrived(char* topicName, int topicLen, MQTTAsync_message* m) {
...@@ -133,7 +172,7 @@ int MqttSubscriber::messageArrived(char* topicName, int topicLen, MQTTAsync_mess ...@@ -133,7 +172,7 @@ int MqttSubscriber::messageArrived(char* topicName, int topicLen, MQTTAsync_mess
res= cameraHandle->updateSdkDevStatus(false); res= cameraHandle->updateSdkDevStatus(false);
} }
} }
vides_data::requestMqttData request; vides_data::requestMqttData request;
request.code =res>=0?0:0x01; request.code =res>=0?0:0x01;
request.uniq = response.uniq; request.uniq = response.uniq;
request.sn=response.sn ; request.sn=response.sn ;
......
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
#define MQTTSUBSCRIBER_H #define MQTTSUBSCRIBER_H
#include <MQTTClient.h> #include <MQTTClient.h>
#include <MQTTAsync.h> #include <MQTTAsync.h>
#include <QTimer>
#include <QObject> #include <QObject>
#include "VidesData.h" #include "VidesData.h"
...@@ -9,15 +10,25 @@ class MqttSubscriber : public QObject ...@@ -9,15 +10,25 @@ class MqttSubscriber : public QObject
{ {
Q_OBJECT Q_OBJECT
public: public:
static MqttSubscriber* getInstance(vides_data::MqttConfig& config, QObject* parent = nullptr); ~MqttSubscriber(); static MqttSubscriber* getInstance( QObject* parent = nullptr); ~MqttSubscriber();
void init(vides_data:: MqttConfig& config,QString &httpUrl,QString &serialNumber);
void start(); void start();
signals:
void connectionLostSignal();
private slots:
void reconnectAndFetchConfig();
private: private:
MqttSubscriber(vides_data:: MqttConfig& config, QObject* parent = nullptr); MqttSubscriber(QObject* parent = nullptr);
MqttSubscriber(const MqttSubscriber&) = delete; MqttSubscriber() = delete;
MqttSubscriber& operator=(const MqttSubscriber&) = delete; MqttSubscriber& operator=(const MqttSubscriber&) = delete;
QTimer *retryTimer;
MQTTAsync client; MQTTAsync client;
QString httpUrl;
QString serialNumber;
vides_data::MqttConfig config; vides_data::MqttConfig config;
......
...@@ -287,6 +287,7 @@ struct MqttConfig { ...@@ -287,6 +287,7 @@ struct MqttConfig {
QString topic; QString topic;
QString username; QString username;
QString password; QString password;
quint64 updateAt;
}; };
struct responseConfig { struct responseConfig {
......
...@@ -126,13 +126,8 @@ HEADERS += \ ...@@ -126,13 +126,8 @@ HEADERS += \
BaseAlgorithm.h \ BaseAlgorithm.h \
MqttSubscriber.h MqttSubscriber.h
#FORMS += \
# mainwindow.ui
# Default rules for deployment. # Default rules for deployment.
qnx: target.path = /tmp/$${TARGET}/bin qnx: target.path = /tmp/$${TARGET}/bin
else: unix:!android: target.path = /opt/$${TARGET}/bin else: unix:!android: target.path = /opt/$${TARGET}/bin
!isEmpty(target.path): INSTALLS += target !isEmpty(target.path): INSTALLS += target
#RESOURCES += \
# BG.qrc
...@@ -131,11 +131,15 @@ MainWindow::MainWindow() ...@@ -131,11 +131,15 @@ MainWindow::MainWindow()
QString topic = QStringLiteral("/thingshub/%1/device/reply").arg(serialNumber); QString topic = QStringLiteral("/thingshub/%1/device/reply").arg(serialNumber);
config.mqttConfig.topic=topic; config.mqttConfig.topic=topic;
this->mqttConfig= config.mqttConfig; this->mqttConfig= config.mqttConfig;
MqttSubscriber* subscriber = MqttSubscriber::getInstance(mqttConfig); runOrRebootMqtt(mqttConfig,httpurl,serialNumber);
}
void MainWindow::runOrRebootMqtt(vides_data::MqttConfig &mqtt_config,QString &httpUrl,QString &serialNumber){
MqttSubscriber* subscriber = MqttSubscriber::getInstance(this);
subscriber->init(mqtt_config,httpUrl,serialNumber);
subscriber->start(); subscriber->start();
} }
void MainWindow::divParameterUpdate(vides_data::responseConfig &cloudConfig ){ void MainWindow::divParameterUpdate(vides_data::responseConfig &cloudConfig,QString &httpUrl,QString &serialNumber ){
bool faceAlgorithm = false, licensePlateAlgorithm = false, uniformAlgorithm = false, timeChange = false; bool faceAlgorithm = false, licensePlateAlgorithm = false, uniformAlgorithm = false, timeChange = false;
AlgorithmTaskManage &algorithmTaskManage= AlgorithmTaskManage::getInstance(); AlgorithmTaskManage &algorithmTaskManage= AlgorithmTaskManage::getInstance();
...@@ -162,6 +166,10 @@ void MainWindow::divParameterUpdate(vides_data::responseConfig &cloudConfig ){ ...@@ -162,6 +166,10 @@ void MainWindow::divParameterUpdate(vides_data::responseConfig &cloudConfig ){
if(!faceAlgorithm && !licensePlateAlgorithm && !uniformAlgorithm && !timeChange){ if(!faceAlgorithm && !licensePlateAlgorithm && !uniformAlgorithm && !timeChange){
return; return;
} }
if(config.mqttConfig.updateAt!=cloudConfig.mqttConfig.updateAt){
cloudConfig.mqttConfig.topic=config.mqttConfig.topic;
runOrRebootMqtt(cloudConfig.mqttConfig,httpUrl,serialNumber);
}
__uint8_t alg= this->intToUint8t(faceAlgorithm,licensePlateAlgorithm,uniformAlgorithm) ; __uint8_t alg= this->intToUint8t(faceAlgorithm,licensePlateAlgorithm,uniformAlgorithm) ;
vides_data::DetectionParams params; vides_data::DetectionParams params;
...@@ -500,7 +508,7 @@ void MainWindow::startCamera(const QString &httpurl){ ...@@ -500,7 +508,7 @@ void MainWindow::startCamera(const QString &httpurl){
return ; return ;
} }
instace.deleteObj(res_config); instace.deleteObj(res_config);
divParameterUpdate(cloudConfig); divParameterUpdate(cloudConfig,nonConstHttpUrl,serialNumber);
for (const auto& device : devices.list) { for (const auto& device : devices.list) {
if(localDevices.count(device.sSn)>0 ){ if(localDevices.count(device.sSn)>0 ){
......
...@@ -38,13 +38,14 @@ public: ...@@ -38,13 +38,14 @@ public:
void createDirectory(int flag,const QString& dirName, const QString& successMsg, const QString& failureMsg); void createDirectory(int flag,const QString& dirName, const QString& successMsg, const QString& failureMsg);
void runOrRebootMqtt(vides_data::MqttConfig &mqtt_config,QString &httpUrl,QString &serialNumber);
void initFaceFaceRecognition(); void initFaceFaceRecognition();
void initCameras(vides_data::cameraParameters &parameter, vides_data::responseConfig &devConfig, const std::list<vides_data::responseArea>&areas,std::list<vides_data::requestCameraInfo>&camera_info_list); void initCameras(vides_data::cameraParameters &parameter, vides_data::responseConfig &devConfig, const std::list<vides_data::responseArea>&areas,std::list<vides_data::requestCameraInfo>&camera_info_list);
__uint8_t intToUint8t(bool faceAlgorithm,bool licensePlateAlgorithm,bool uniformAlgorithm); __uint8_t intToUint8t(bool faceAlgorithm,bool licensePlateAlgorithm,bool uniformAlgorithm);
//盒子参数更新 //盒子参数更新
void divParameterUpdate(vides_data::responseConfig &cloudConfig ); void divParameterUpdate(vides_data::responseConfig &cloudConfig,QString &httpUrl,QString &serialNumber );
static MainWindow * sp_this; static MainWindow * sp_this;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment