cellsysBase/cellsysPush.js

863 lines
27 KiB
JavaScript
Raw Normal View History

2024-08-14 16:20:56 +08:00
import { Query, QueryTask, QueryType } from './cellsysUtil.js';
import './utils/paho-mqtt.js';
2024-12-27 09:41:41 +08:00
import cellsysSystem from './cellsysSystem';
2024-08-14 16:20:56 +08:00
let clientManager = {
//全局clinet管理
};
let clientLock = {
//全局clinet连接状态锁
};
let callBackManager = {
//全局回调管理
};
let topicManager = {
//全局topic管理,包含onReceive,onSuccess,onFail
};
let subscribeQueue = {
//全局订阅队列
};
//通配符订阅处理
let wildcardTopic = [];
//断线倒计时相关逻辑
let isLost = false; //用于进行是否开始倒计时判断
let lostTimeCount = null;
const TopicType = {
SysMessage: 'SysMessage',
MemberLocation: 'MemberLocation',
GroupLocation: 'GroupLocation',
Fence: 'Fence',
Equipment: 'Equipment',
Task: 'Task',
Chat: 'Chat',
};
const createMqttClient = (cellsysPush, cellsysTopic) => {
// let clientId = 'cellsys' + (new Date().getTime()) + Math.floor(Math.random() * 100)
2024-12-27 09:41:41 +08:00
let clientId = cellsysSystem.token + cellsysTopic.topicType + new Date().getTime();
2024-08-14 16:20:56 +08:00
let { ip, port, aliasPath, isSSL, username, password } = cellsysPush;
let client;
if (aliasPath) {
client = new Paho.Client(ip, Number(port), `/${aliasPath}`, clientId);
} else {
client = new Paho.Client(ip, Number(port), clientId);
}
return new Promise((resolve, reject) => {
let options = {
userName: username,
useSSL: isSSL,
password: password,
reconnect: true,
onFailure: function (err) {
console.error('MQ创建客户端异常', err);
reject(err);
},
};
client.connect(options);
client.onConnected = (reconnect) => {
2024-12-26 11:21:07 +08:00
console.log(clientId, '连接建立的clientId');
2024-08-14 16:20:56 +08:00
if (reconnect) {
isLost = false;
clearTimeout(lostTimeCount);
lostTimeCount = null;
2024-12-26 11:21:07 +08:00
/* console.log(cellsysTopic, 'mq重连topic');*/
2024-08-14 16:20:56 +08:00
}
resolve(client);
};
client.onConnectionLost = (responseObject) => {
console.log(responseObject, '中断对象');
console.log(cellsysTopic, '中断的topic');
2024-12-26 11:21:07 +08:00
console.log(clientId, '中断的clientId');
2024-08-14 16:20:56 +08:00
//连接中断时开始倒计时2分钟后没有重连成功则执行错误回调;
isLost = true;
lostTimeCount = setTimeout(() => {
if (responseObject.errorCode !== 0) {
let topicName = cellsysTopic.getTopicName();
let failOperator = topicManager[topicName]['onFail'];
if (failOperator) {
failOperator(responseObject);
}
}
}, 2 * 60 * 1000);
};
/**
* 全局MQTT接收消息处理
* @param res
*/
client.onMessageArrived = (res) => {
try {
let topic = res.topic, //主题
payloadStringArr = res.payloadString.split(';'); //返回数据
console.log(`${topic}${res.payloadString}`, '接收消息');
let wildcard = topic.substring(0, topic.lastIndexOf('/')) + '/+'; //获取通用主题
let topicName = wildcardTopic.some((name) => name === wildcard) ? wildcard : topic; //是否是通配符订阅的
let callback = callBackManager[topicName];
let topicSign = topic.split('/'),
message = null;
if (topicSign[0] === 'cellsys') {
//系统消息,但是现在好像没有系统消息了(1.5.0)
message = payloadStringArr;
} else if (topicSign[0] === 'lite') {
//cellsysLite给web端回应的消息
message = {
topic: res.topic,
payloadString: res.payloadString,
};
} else {
//非系统消息
switch (topicSign[1]) {
case 'chat': //增加聊天消息处理
case 'eventFeature': //自定义上报事件消息
case 'markerFeature': //标记上报消息
case 'memberJoin': //标记上报消息
message = JSON.parse(res.payloadString);
break;
case 'mqtt': //增加mqtt消息处理
message = res.payloadString;
break;
case 'loc':
message = new MemberMessage(payloadStringArr);
break;
case 'equ':
let type = topicSign[2];
switch (type) {
case 'PLQ':
break;
/*case 'sensing': //传感器 break*/
case 'sos':
message = res.payloadString;
break;
case '6':
message = res.payloadString;
break;
case 'status':
message = res.payloadString;
break;
default:
message = new EquipmentMessage(res);
break;
}
break;
case 'app':
message = new appNoticeMessage(payloadStringArr);
break;
case 'fence': //围栏事件
message = payloadStringArr;
break;
case 'equToEdge':
message = new EdgeCommanderMessage(res);
break;
default:
message = res.payloadString;
break;
}
}
if (callback) {
callback(message);
}
} catch (error) {
console.error(error);
}
};
window.addEventListener('unload', (e) => {
client.disconnect();
console.log('mqtt连接关闭');
});
});
};
class CellsysPush {
constructor(params) {
this.port = params.port;
this.ip = params.ip;
this.username = params.username;
this.password = params.password;
this.aliasPath = params.path; //别名路径
this.isSSL = params.isSSL;
}
}
class pushTask {
constructor() {
this.cellsysTopic = null;
this.client = null;
this.clientConnecting = false;
this.cellsysOrg = window.CELLSYSORG; //这里的全局变量CELLSYSORG是cellsysOrg这个类在初始化时就埋进window上的
}
/**
* 连接服务器
* @param topic
* @returns {Promise<unknown>}
* @private
*/
_connect(topic) {
let query = new Query();
// let _queryType = location.protocol === 'https:' ? QueryType.MqWss : QueryType.MqWs;
//连接地址
let queryTask = new QueryTask(this.cellsysTopic ? this.cellsysTopic.broker : topic.broker);
return new Promise((resolve, reject) => {
queryTask
.execute(query)
.then((res) => {
let isSSL = false;
let data = res.filter((item) => {
if (location.protocol === 'https:') {
//ws协议好像已经不允许不同域名情况下连接了因此本地开发要走wss协议
if (item.type === 'wss') {
isSSL = true;
return true;
}
} else {
if (item.type === 'ws') {
isSSL = false;
return true;
}
}
});
let config = data[0].config;
let params = {
port: config.port,
ip: config.ip ? config.ip : location.hostname, //如果mq配置里没有配置ip则用hostname这种情况会出现在边缘工作站上
username: config.username,
password: config.password,
aliasPath: null,
isSSL: isSSL,
};
if (process.env.NODE_ENV === 'development') {
2024-12-26 11:21:07 +08:00
params['ip'] = '192.168.2.10';
2024-08-14 16:20:56 +08:00
}
let cloudOptions = this.cellsysOrg ? this.cellsysOrg.cloudOptions : {};
//如果组织开通了外部访问
if (cloudOptions.account && cloudOptions.token && cloudOptions.connectId) {
//如果cloudHostname不是一级域名需要先转成一级域名再拼接connectId
let cloucHostnameSplit = location.hostname.split('.');
let _ip;
if (cloucHostnameSplit.length > 2) {
_ip = `${cloudOptions['connectId']}.${cloucHostnameSplit[1]}.${cloucHostnameSplit[2]}`;
} else {
_ip = `${cloudOptions['connectId']}.airkoon.cn`;
}
//当前访问路径和外部访问的域名一致,才视为从外部进行访问
if (location.hostname === _ip) {
params['ip'] = _ip;
params['port'] = 80;
params['path'] = config.path; //只有开启了外部访问的情况下才会读取别名路径
isSSL = false;
}
}
let cellsysPush = new CellsysPush(params);
return createMqttClient(cellsysPush, this.cellsysTopic);
})
.then((client) => {
resolve(client);
})
.catch((err) => {
console.error(err);
reject(err);
});
});
}
_removeFromQueue(topicName) {
let topicType = this.cellsysTopic.topicType,
queue = subscribeQueue[topicType];
let index = queue.indexOf(queue);
if (index !== -1) {
queue.splice(index, 1);
}
}
_subscribe(topicName, onReceive, onSuccess, onFail, topicType) {
this.client.subscribe(topicName, {
qos: 0,
onSuccess: (res) => {
callBackManager[topicName] = onReceive;
this._removeFromQueue(topicName);
if (topicType) {
clientLock[topicType] = false;
}
if (onSuccess) {
onSuccess(res, this.client);
}
},
onFailure: (e) => {
console.log(e);
this._removeFromQueue(topicName);
if (topicType) {
clientLock[topicType] = false;
}
if (onFail) {
onFail(e);
}
},
});
}
_unSubscribe(topicName, onSuccess, onFail) {
this.client.unsubscribe(topicName, {
onSuccess: (res) => {
console.log('取消订阅:', topicName);
if (onSuccess) {
onSuccess(res);
}
},
onFailure: (e) => {
if (onFail) {
onFail(e);
}
},
});
}
/**
* 订阅
* @param params
*/
subscribe(params) {
let cellsysTopic = params.cellsysTopic,
onReceive = params.onReceive,
onFail = params.onFail,
onSuccess = params.onSuccess;
this.cellsysTopic = cellsysTopic; //主题对象
let topicType = cellsysTopic.topicType, //主题类型
topicName = cellsysTopic.getTopicName(); //主题
if (!subscribeQueue[topicType]) {
//主题类型是否存在队列中
subscribeQueue[topicType] = [];
subscribeQueue[topicType].push(topicName);
} else {
if (subscribeQueue[topicType].indexOf(topicName) === -1) {
subscribeQueue[topicType].push(topicName);
}
}
topicManager[topicName] = {
onReceive: onReceive,
onSuccess: onSuccess,
onFail: onFail,
};
if (topicName.lastIndexOf('/+') !== -1) {
//有通配符
wildcardTopic.push(topicName);
}
if (!clientLock[topicType]) {
if (clientManager[topicType]) {
//是否有client
this.client = clientManager[topicType];
this._subscribe(topicName, onReceive, onSuccess, onFail);
} else {
clientLock[topicType] = true;
this._connect()
.then((res) => {
clientManager[topicType] = res;
this.client = res;
let queue = subscribeQueue[topicType];
if (queue.length > 0) {
queue.forEach((tname) => {
let topicOperator = topicManager[tname];
this._subscribe(
tname,
topicOperator['onReceive'],
topicOperator['onSuccess'],
topicOperator['onFail'],
topicType,
);
});
}
})
.catch((err) => {
console.error(err);
onFail(err);
})
.finally(() => {
clientLock[topicType] = false;
});
}
}
}
unSubscribe(params) {
let cellsysTopic = params.cellsysTopic,
onFail = params.onFail,
onSuccess = params.onSuccess;
this.cellsysTopic = cellsysTopic;
let topicType = cellsysTopic.topicType,
topicName = cellsysTopic.getTopicName();
if (subscribeQueue[topicType]) {
this._removeFromQueue(topicName);
}
if (clientManager[topicType]) {
this.client = clientManager[topicType];
this._unSubscribe(topicName, onSuccess, onFail);
} else {
if (onFail) {
onFail('尚未订阅该主题');
}
}
}
/**
* 发送mqtt消息
* @param params
*/
send(params) {
let cellsysTopic = params.cellsysTopic,
payload = params.message; //消息体
let topicType = cellsysTopic.topicType,
topicName = cellsysTopic.getTopicName();
if (!clientLock[topicType]) {
this.client = clientManager[topicType];
if (this.client) {
this.client.publish(topicName, payload, 0, false);
console.log(`${topicName}${payload}`, '消息推送');
} else {
clientLock[topicType] = true;
this._connect(cellsysTopic)
.then((res) => {
clientManager[topicType] = res;
this.client = res;
this.client.publish(topicName, payload, 0, false);
console.log(`${topicName}${payload}`, '推送消息');
})
.catch((err) => {
console.log(err);
})
.finally(() => {
clientLock[topicType] = false;
});
}
}
}
}
/**
* 订阅主题基本类所有继承
* @param orgId 组织id
* @param topicType 主题类型
* @param broker 主题url
*/
class TopicBase {
constructor(orgId, topicType, broker) {
if (new.target === TopicBase) {
throw new Error('TopicBase cannot be instantiated, can only be inherited');
}
// if (!orgId) { 订阅和发布话题也不一定要orgid吧
// throw new Error('orgId not undefined');
// }
this.orgId = orgId;
this.topicType = topicType;
this.broker = broker;
this.all = false;
}
getTopicType() {
return this.topicType;
}
}
/********************************* 人员模块 **********************************************/
/**
* 人员位置专题
*/
class MemberLocTopic extends TopicBase {
constructor(cellsysMember) {
super(cellsysMember.orgId, TopicType.MemberLocation, QueryType.LocationBroker);
this.about = cellsysMember;
}
getTopicName() {
let cellsysMember = this.about;
return `${this.orgId}/loc/${cellsysMember.userId}`;
}
}
class MemberMessage {
constructor(msgArr) {
this.userId = parseInt(msgArr[0]);
this.lngLat = msgArr[1];
this.timeStamp = Number(msgArr[2]);
}
}
/********************************* 设备模块 **********************************************/
/**
* 设备模块主题
*/
class DeviceTopic extends TopicBase {
constructor(device) {
if (new.target === DeviceTopic) {
throw new Error('DeviceTopic cannot be instantiated, can only be inherited');
}
super(device.orgId, TopicType.Equipment, QueryType.EquipmentBroker);
this.about = device;
}
}
/**
* 设备专题
*/
class EquipmentTopic extends DeviceTopic {
constructor(equipment) {
super(equipment);
}
getTopicName() {
let equipment = this.about;
return `${this.orgId}/equ/${equipment.type}/${equipment.macid}`;
}
}
/**
* sos订阅
*/
class SosTopic extends DeviceTopic {
constructor(device) {
super(device);
}
getTopicName() {
return `${this.orgId}/equ/sos/+`;
}
subTopicAllName() {
return `${this.orgId}/equ/sos/+`;
}
}
/**
* 北斗盒子订阅(卡号360348)
*/
class BDBoxTopic extends DeviceTopic {
constructor(device) {
super(device);
}
getTopicName() {
//系统类型id
return `${this.orgId}/equ/6/+`;
}
}
class EquipmentMonitorTopic extends DeviceTopic {
constructor(device) {
super(device);
this.topicName = device.topicName;
}
getTopicName() {
//系统类型id
return this.orgId + this.topicName + '+';
}
}
/**
* 设备状态订阅目前用于人员在线状态逻辑后续可能用于网优部分
*/
class EquipmentStatusTopic extends TopicBase {
constructor(orgId) {
super(orgId, TopicType.Equipment, QueryType.EquipmentBroker);
}
getTopicName() {
let equipment = this.about;
return `${this.orgId}/equ/status`;
}
}
/**
* 边缘工作站设备给边缘工作站的消息
*/
class EquToEdgeTopic extends TopicBase {
constructor(orgId, macId) {
super(orgId, TopicType.Equipment, QueryType.EquipmentBroker);
this.macId = macId;
}
getTopicName() {
let equipment = this.about;
return `${this.orgId}/equToEdge/${this.macId}/+`;
}
}
/********************************* 系统消息模块 **********************************************/
/**
* 自定义事件上传消息专题
* 主要用于接受自定义事件上传的消息提示
*/
class eventNoticeTopic extends TopicBase {
constructor(orgId) {
super(orgId, TopicType.SysMessage, QueryType.SystemBroker);
}
getTopicName() {
return `${this.orgId}/eventFeature`;
}
}
/**
* 标记上传消息专题
* 主要用于接受标记上传的消息提示
*/
class markerNoticeTopic extends TopicBase {
constructor(orgId) {
super(orgId, TopicType.SysMessage, QueryType.SystemBroker);
}
getTopicName() {
return `${this.orgId}/markerFeature`;
}
}
/**
* 成员申请加入专题
*/
class memberJoinTopic extends TopicBase {
constructor(orgId) {
super(orgId, TopicType.SysMessage, QueryType.SystemBroker);
}
getTopicName() {
return `${this.orgId}/memberJoin`;
}
}
/**
* 成员审核桩体
*/
class memberAuditTopic extends TopicBase {
constructor(orgId, userId) {
super(orgId, TopicType.SysMessage, QueryType.SystemBroker);
this.userId = userId;
}
getTopicName() {
return `${this.orgId}/${this.userId}/orgAudit`;
}
}
/**
* 离线地图状态推送
*/
class offlineFenceTopic extends TopicBase {
constructor(orgId) {
super(orgId, TopicType.SysMessage, QueryType.SystemBroker);
}
getTopicName() {
return `${this.orgId}/offlineFence`;
}
}
/**
* 下载离线地图时推送MQ目的是让cellsysLite去下载对应的地图
*/
class SendDownloadOfflineData extends TopicBase {
constructor(OfflineData) {
super(null, TopicType.SysMessage, QueryType.SystemBroker);
this.OfflineData = OfflineData;
}
getTopicName() {
return `edge/lite/offlineFence`;
}
}
/**
* 边缘工作站连接的终端设备的状态如电量
*/
class EdgeDeviceStatusTopic extends TopicBase {
constructor(orgId) {
super(orgId, TopicType.SysMessage, QueryType.SystemBroker);
}
getTopicName() {
return `${this.orgId}/equToEdge/status`;
}
}
/********************************* 应用模块 **********************************************/
/**
* 应用消息专题
*/
class appNoticeTopic extends TopicBase {
constructor(cellsysApp) {
super(cellsysApp.orgId, TopicType.SysMessage, QueryType.SystemBroker);
this.about = cellsysApp;
}
getTopicName() {
let cellsysApp = this.about;
return `${this.orgId}/app/${cellsysApp.id}`;
}
}
class appNoticeMessage {
constructor(msgArr) {
this.timeStamp = msgArr[0];
this.title = msgArr[1];
this.message = msgArr[2];
}
}
/********************************* 聊天模块 **********************************************/
/**
* 聊天基类
*/
class ChatBase extends TopicBase {
constructor(orgId) {
if (new.target === ChatBase) {
throw new Error('ChatBase cannot be instantiated, can only be inherited');
}
super(orgId, TopicType.Chat, QueryType.ChatBroker);
}
}
//单聊消息对应单聊mq目前针对explorer逻辑
class SingleChatTopic extends ChatBase {
constructor(orgId, conversationId, senderId) {
super(orgId);
this.conversationId = conversationId;
this.senderId = senderId;
}
getTopicName() {
return `${this.orgId}/chat/single/${this.conversationId}/${this.senderId}`;
}
}
class SingleChatReplyTopic extends ChatBase {
constructor(orgId, conversationId) {
super(orgId);
this.conversationId = conversationId;
}
getTopicName() {
return `${this.orgId}/chat/single/${this.conversationId}/+`;
}
}
/**
* 系统后台-单聊-发送消息指挥机中转
*/
class ChatSystemTopic extends ChatBase {
constructor(orgId) {
super(orgId);
}
getTopicName() {
return this.orgId + '/chat/portal/single';
}
}
/**
* 系统后台-单聊-回复消息指挥机中转
*/
class ChatSystemReplyTopic extends ChatBase {
constructor(orgId, receiverId) {
super(orgId);
this.receiverId = receiverId;
}
getTopicName() {
return this.orgId + '/chat/replyPortal/single/' + this.receiverId;
}
}
/**
* 围栏事件订阅主题
*/
class FenceEventTopic extends TopicBase {
constructor(orgId) {
super(orgId, TopicType.Fence, QueryType.LocationBroker);
}
getTopicName() {
return this.orgId + '/fence/+';
}
}
/******************************* MQTT消息接收处理类 ***************************************/
class MessageBase {
constructor(param) {
if (new.target === MessageBase) {
throw new Error('MessageBase cannot be instantiated, can only be inherited');
}
let msg = param || {};
this.topicName = msg['destinationName'];
this.messageString = msg['payloadString'];
}
get message() {
return this.messageString;
}
}
class EquipmentMessage extends MessageBase {
constructor(param) {
super(param);
this.$_resolveMsg();
}
$_resolveMsg() {
let msg = JSON.parse(this.messageString);
this.data = msg.data;
this.macId = msg.macid;
this.sysTypeId = msg.type;
this.transportMode = msg.transportMode;
this.dataType = msg.dataType;
this.orgId = msg.orgId;
this.dateTime = msg.dateTime; //上传时间
}
}
//edgeCommander发送的消息
class EdgeCommanderMessage extends MessageBase {
constructor(param) {
super(param);
let { topic } = param;
let topicSign = topic.split('/');
if (topicSign.length > 3) {
this.macId = topicSign[2];
this.msgType = topicSign[3];
}
}
}
export {
pushTask,
TopicBase,
TopicType,
MemberLocTopic,
EquipmentTopic,
EquipmentMonitorTopic,
SosTopic,
BDBoxTopic,
EquipmentMessage,
appNoticeTopic,
ChatSystemTopic,
SingleChatTopic,
ChatSystemReplyTopic,
SingleChatReplyTopic,
FenceEventTopic,
eventNoticeTopic,
markerNoticeTopic,
EquipmentStatusTopic,
memberJoinTopic,
memberAuditTopic,
offlineFenceTopic,
EquToEdgeTopic,
EdgeDeviceStatusTopic,
SendDownloadOfflineData,
};