cellsysBase/cellsysPush.js
2024-12-26 11:21:07 +08:00

863 lines
27 KiB
JavaScript
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import { Query, QueryTask, QueryType } from './cellsysUtil.js';
import './utils/paho-mqtt.js';
import { getToken } from '@/utils/auth';
let clientManager = {
//全局clinet管理
};
let clientLock = {
//全局clinet连接状态锁
};
let callBackManager = {
//全局回调管理
};
let topicManager = {
//全局topic管理,包含onReceive,onSuccess,onFail
};
let subscribeQueue = {
//全局订阅队列
};
//通配符订阅处理
let wildcardTopic = [];
//断线倒计时相关逻辑
let isLost = false; //用于进行是否开始倒计时判断
let lostTimeCount = null;
const TopicType = {
SysMessage: 'SysMessage',
MemberLocation: 'MemberLocation',
GroupLocation: 'GroupLocation',
Fence: 'Fence',
Equipment: 'Equipment',
Task: 'Task',
Chat: 'Chat',
};
const createMqttClient = (cellsysPush, cellsysTopic) => {
// let clientId = 'cellsys' + (new Date().getTime()) + Math.floor(Math.random() * 100)
let clientId = getToken() + cellsysTopic.topicType + new Date().getTime();
let { ip, port, aliasPath, isSSL, username, password } = cellsysPush;
let client;
if (aliasPath) {
client = new Paho.Client(ip, Number(port), `/${aliasPath}`, clientId);
} else {
client = new Paho.Client(ip, Number(port), clientId);
}
return new Promise((resolve, reject) => {
let options = {
userName: username,
useSSL: isSSL,
password: password,
reconnect: true,
onFailure: function (err) {
console.error('MQ创建客户端异常', err);
reject(err);
},
};
client.connect(options);
client.onConnected = (reconnect) => {
console.log(clientId, '连接建立的clientId');
if (reconnect) {
isLost = false;
clearTimeout(lostTimeCount);
lostTimeCount = null;
/* console.log(cellsysTopic, 'mq重连topic');*/
}
resolve(client);
};
client.onConnectionLost = (responseObject) => {
console.log(responseObject, '中断对象');
console.log(cellsysTopic, '中断的topic');
console.log(clientId, '中断的clientId');
//连接中断时开始倒计时2分钟后没有重连成功则执行错误回调;
isLost = true;
lostTimeCount = setTimeout(() => {
if (responseObject.errorCode !== 0) {
let topicName = cellsysTopic.getTopicName();
let failOperator = topicManager[topicName]['onFail'];
if (failOperator) {
failOperator(responseObject);
}
}
}, 2 * 60 * 1000);
};
/**
* 全局MQTT接收消息处理
* @param res
*/
client.onMessageArrived = (res) => {
try {
let topic = res.topic, //主题
payloadStringArr = res.payloadString.split(';'); //返回数据
console.log(`${topic}${res.payloadString}`, '接收消息');
let wildcard = topic.substring(0, topic.lastIndexOf('/')) + '/+'; //获取通用主题
let topicName = wildcardTopic.some((name) => name === wildcard) ? wildcard : topic; //是否是通配符订阅的
let callback = callBackManager[topicName];
let topicSign = topic.split('/'),
message = null;
if (topicSign[0] === 'cellsys') {
//系统消息,但是现在好像没有系统消息了(1.5.0)
message = payloadStringArr;
} else if (topicSign[0] === 'lite') {
//cellsysLite给web端回应的消息
message = {
topic: res.topic,
payloadString: res.payloadString,
};
} else {
//非系统消息
switch (topicSign[1]) {
case 'chat': //增加聊天消息处理
case 'eventFeature': //自定义上报事件消息
case 'markerFeature': //标记上报消息
case 'memberJoin': //标记上报消息
message = JSON.parse(res.payloadString);
break;
case 'mqtt': //增加mqtt消息处理
message = res.payloadString;
break;
case 'loc':
message = new MemberMessage(payloadStringArr);
break;
case 'equ':
let type = topicSign[2];
switch (type) {
case 'PLQ':
break;
/*case 'sensing': //传感器 break*/
case 'sos':
message = res.payloadString;
break;
case '6':
message = res.payloadString;
break;
case 'status':
message = res.payloadString;
break;
default:
message = new EquipmentMessage(res);
break;
}
break;
case 'app':
message = new appNoticeMessage(payloadStringArr);
break;
case 'fence': //围栏事件
message = payloadStringArr;
break;
case 'equToEdge':
message = new EdgeCommanderMessage(res);
break;
default:
message = res.payloadString;
break;
}
}
if (callback) {
callback(message);
}
} catch (error) {
console.error(error);
}
};
window.addEventListener('unload', (e) => {
client.disconnect();
console.log('mqtt连接关闭');
});
});
};
class CellsysPush {
constructor(params) {
this.port = params.port;
this.ip = params.ip;
this.username = params.username;
this.password = params.password;
this.aliasPath = params.path; //别名路径
this.isSSL = params.isSSL;
}
}
class pushTask {
constructor() {
this.cellsysTopic = null;
this.client = null;
this.clientConnecting = false;
this.cellsysOrg = window.CELLSYSORG; //这里的全局变量CELLSYSORG是cellsysOrg这个类在初始化时就埋进window上的
}
/**
* 连接服务器
* @param topic
* @returns {Promise<unknown>}
* @private
*/
_connect(topic) {
let query = new Query();
// let _queryType = location.protocol === 'https:' ? QueryType.MqWss : QueryType.MqWs;
//连接地址
let queryTask = new QueryTask(this.cellsysTopic ? this.cellsysTopic.broker : topic.broker);
return new Promise((resolve, reject) => {
queryTask
.execute(query)
.then((res) => {
let isSSL = false;
let data = res.filter((item) => {
if (location.protocol === 'https:') {
//ws协议好像已经不允许不同域名情况下连接了因此本地开发要走wss协议
if (item.type === 'wss') {
isSSL = true;
return true;
}
} else {
if (item.type === 'ws') {
isSSL = false;
return true;
}
}
});
let config = data[0].config;
let params = {
port: config.port,
ip: config.ip ? config.ip : location.hostname, //如果mq配置里没有配置ip则用hostname这种情况会出现在边缘工作站上
username: config.username,
password: config.password,
aliasPath: null,
isSSL: isSSL,
};
if (process.env.NODE_ENV === 'development') {
params['ip'] = '192.168.2.10';
}
let cloudOptions = this.cellsysOrg ? this.cellsysOrg.cloudOptions : {};
//如果组织开通了外部访问
if (cloudOptions.account && cloudOptions.token && cloudOptions.connectId) {
//如果cloudHostname不是一级域名需要先转成一级域名再拼接connectId
let cloucHostnameSplit = location.hostname.split('.');
let _ip;
if (cloucHostnameSplit.length > 2) {
_ip = `${cloudOptions['connectId']}.${cloucHostnameSplit[1]}.${cloucHostnameSplit[2]}`;
} else {
_ip = `${cloudOptions['connectId']}.airkoon.cn`;
}
//当前访问路径和外部访问的域名一致,才视为从外部进行访问
if (location.hostname === _ip) {
params['ip'] = _ip;
params['port'] = 80;
params['path'] = config.path; //只有开启了外部访问的情况下才会读取别名路径
isSSL = false;
}
}
let cellsysPush = new CellsysPush(params);
return createMqttClient(cellsysPush, this.cellsysTopic);
})
.then((client) => {
resolve(client);
})
.catch((err) => {
console.error(err);
reject(err);
});
});
}
_removeFromQueue(topicName) {
let topicType = this.cellsysTopic.topicType,
queue = subscribeQueue[topicType];
let index = queue.indexOf(queue);
if (index !== -1) {
queue.splice(index, 1);
}
}
_subscribe(topicName, onReceive, onSuccess, onFail, topicType) {
this.client.subscribe(topicName, {
qos: 0,
onSuccess: (res) => {
callBackManager[topicName] = onReceive;
this._removeFromQueue(topicName);
if (topicType) {
clientLock[topicType] = false;
}
if (onSuccess) {
onSuccess(res, this.client);
}
},
onFailure: (e) => {
console.log(e);
this._removeFromQueue(topicName);
if (topicType) {
clientLock[topicType] = false;
}
if (onFail) {
onFail(e);
}
},
});
}
_unSubscribe(topicName, onSuccess, onFail) {
this.client.unsubscribe(topicName, {
onSuccess: (res) => {
console.log('取消订阅:', topicName);
if (onSuccess) {
onSuccess(res);
}
},
onFailure: (e) => {
if (onFail) {
onFail(e);
}
},
});
}
/**
* 订阅
* @param params
*/
subscribe(params) {
let cellsysTopic = params.cellsysTopic,
onReceive = params.onReceive,
onFail = params.onFail,
onSuccess = params.onSuccess;
this.cellsysTopic = cellsysTopic; //主题对象
let topicType = cellsysTopic.topicType, //主题类型
topicName = cellsysTopic.getTopicName(); //主题
if (!subscribeQueue[topicType]) {
//主题类型是否存在队列中
subscribeQueue[topicType] = [];
subscribeQueue[topicType].push(topicName);
} else {
if (subscribeQueue[topicType].indexOf(topicName) === -1) {
subscribeQueue[topicType].push(topicName);
}
}
topicManager[topicName] = {
onReceive: onReceive,
onSuccess: onSuccess,
onFail: onFail,
};
if (topicName.lastIndexOf('/+') !== -1) {
//有通配符
wildcardTopic.push(topicName);
}
if (!clientLock[topicType]) {
if (clientManager[topicType]) {
//是否有client
this.client = clientManager[topicType];
this._subscribe(topicName, onReceive, onSuccess, onFail);
} else {
clientLock[topicType] = true;
this._connect()
.then((res) => {
clientManager[topicType] = res;
this.client = res;
let queue = subscribeQueue[topicType];
if (queue.length > 0) {
queue.forEach((tname) => {
let topicOperator = topicManager[tname];
this._subscribe(
tname,
topicOperator['onReceive'],
topicOperator['onSuccess'],
topicOperator['onFail'],
topicType,
);
});
}
})
.catch((err) => {
console.error(err);
onFail(err);
})
.finally(() => {
clientLock[topicType] = false;
});
}
}
}
unSubscribe(params) {
let cellsysTopic = params.cellsysTopic,
onFail = params.onFail,
onSuccess = params.onSuccess;
this.cellsysTopic = cellsysTopic;
let topicType = cellsysTopic.topicType,
topicName = cellsysTopic.getTopicName();
if (subscribeQueue[topicType]) {
this._removeFromQueue(topicName);
}
if (clientManager[topicType]) {
this.client = clientManager[topicType];
this._unSubscribe(topicName, onSuccess, onFail);
} else {
if (onFail) {
onFail('尚未订阅该主题');
}
}
}
/**
* 发送mqtt消息
* @param params
*/
send(params) {
let cellsysTopic = params.cellsysTopic,
payload = params.message; //消息体
let topicType = cellsysTopic.topicType,
topicName = cellsysTopic.getTopicName();
if (!clientLock[topicType]) {
this.client = clientManager[topicType];
if (this.client) {
this.client.publish(topicName, payload, 0, false);
console.log(`${topicName}${payload}`, '消息推送');
} else {
clientLock[topicType] = true;
this._connect(cellsysTopic)
.then((res) => {
clientManager[topicType] = res;
this.client = res;
this.client.publish(topicName, payload, 0, false);
console.log(`${topicName}${payload}`, '推送消息');
})
.catch((err) => {
console.log(err);
})
.finally(() => {
clientLock[topicType] = false;
});
}
}
}
}
/**
* 订阅主题基本类,所有继承
* @param orgId 组织id
* @param topicType 主题类型
* @param broker 主题url
*/
class TopicBase {
constructor(orgId, topicType, broker) {
if (new.target === TopicBase) {
throw new Error('TopicBase cannot be instantiated, can only be inherited');
}
// if (!orgId) { 订阅和发布话题也不一定要orgid吧
// throw new Error('orgId not undefined');
// }
this.orgId = orgId;
this.topicType = topicType;
this.broker = broker;
this.all = false;
}
getTopicType() {
return this.topicType;
}
}
/********************************* 人员模块 **********************************************/
/**
* 人员位置专题
*/
class MemberLocTopic extends TopicBase {
constructor(cellsysMember) {
super(cellsysMember.orgId, TopicType.MemberLocation, QueryType.LocationBroker);
this.about = cellsysMember;
}
getTopicName() {
let cellsysMember = this.about;
return `${this.orgId}/loc/${cellsysMember.userId}`;
}
}
class MemberMessage {
constructor(msgArr) {
this.userId = parseInt(msgArr[0]);
this.lngLat = msgArr[1];
this.timeStamp = Number(msgArr[2]);
}
}
/********************************* 设备模块 **********************************************/
/**
* 设备模块主题
*/
class DeviceTopic extends TopicBase {
constructor(device) {
if (new.target === DeviceTopic) {
throw new Error('DeviceTopic cannot be instantiated, can only be inherited');
}
super(device.orgId, TopicType.Equipment, QueryType.EquipmentBroker);
this.about = device;
}
}
/**
* 设备专题
*/
class EquipmentTopic extends DeviceTopic {
constructor(equipment) {
super(equipment);
}
getTopicName() {
let equipment = this.about;
return `${this.orgId}/equ/${equipment.type}/${equipment.macid}`;
}
}
/**
* sos订阅
*/
class SosTopic extends DeviceTopic {
constructor(device) {
super(device);
}
getTopicName() {
return `${this.orgId}/equ/sos/+`;
}
subTopicAllName() {
return `${this.orgId}/equ/sos/+`;
}
}
/**
* 北斗盒子订阅(卡号360348)
*/
class BDBoxTopic extends DeviceTopic {
constructor(device) {
super(device);
}
getTopicName() {
//系统类型id
return `${this.orgId}/equ/6/+`;
}
}
class EquipmentMonitorTopic extends DeviceTopic {
constructor(device) {
super(device);
this.topicName = device.topicName;
}
getTopicName() {
//系统类型id
return this.orgId + this.topicName + '+';
}
}
/**
* 设备状态订阅;目前用于人员在线状态逻辑,后续可能用于网优部分
*/
class EquipmentStatusTopic extends TopicBase {
constructor(orgId) {
super(orgId, TopicType.Equipment, QueryType.EquipmentBroker);
}
getTopicName() {
let equipment = this.about;
return `${this.orgId}/equ/status`;
}
}
/**
* 边缘工作站设备给边缘工作站的消息
*/
class EquToEdgeTopic extends TopicBase {
constructor(orgId, macId) {
super(orgId, TopicType.Equipment, QueryType.EquipmentBroker);
this.macId = macId;
}
getTopicName() {
let equipment = this.about;
return `${this.orgId}/equToEdge/${this.macId}/+`;
}
}
/********************************* 系统消息模块 **********************************************/
/**
* 自定义事件上传消息专题
* 主要用于接受自定义事件上传的消息提示
*/
class eventNoticeTopic extends TopicBase {
constructor(orgId) {
super(orgId, TopicType.SysMessage, QueryType.SystemBroker);
}
getTopicName() {
return `${this.orgId}/eventFeature`;
}
}
/**
* 标记上传消息专题
* 主要用于接受标记上传的消息提示
*/
class markerNoticeTopic extends TopicBase {
constructor(orgId) {
super(orgId, TopicType.SysMessage, QueryType.SystemBroker);
}
getTopicName() {
return `${this.orgId}/markerFeature`;
}
}
/**
* 成员申请加入专题
*/
class memberJoinTopic extends TopicBase {
constructor(orgId) {
super(orgId, TopicType.SysMessage, QueryType.SystemBroker);
}
getTopicName() {
return `${this.orgId}/memberJoin`;
}
}
/**
* 成员审核桩体
*/
class memberAuditTopic extends TopicBase {
constructor(orgId, userId) {
super(orgId, TopicType.SysMessage, QueryType.SystemBroker);
this.userId = userId;
}
getTopicName() {
return `${this.orgId}/${this.userId}/orgAudit`;
}
}
/**
* 离线地图状态推送
*/
class offlineFenceTopic extends TopicBase {
constructor(orgId) {
super(orgId, TopicType.SysMessage, QueryType.SystemBroker);
}
getTopicName() {
return `${this.orgId}/offlineFence`;
}
}
/**
* 下载离线地图时推送MQ目的是让cellsysLite去下载对应的地图
*/
class SendDownloadOfflineData extends TopicBase {
constructor(OfflineData) {
super(null, TopicType.SysMessage, QueryType.SystemBroker);
this.OfflineData = OfflineData;
}
getTopicName() {
return `edge/lite/offlineFence`;
}
}
/**
* 边缘工作站连接的终端设备的状态,如电量
*/
class EdgeDeviceStatusTopic extends TopicBase {
constructor(orgId) {
super(orgId, TopicType.SysMessage, QueryType.SystemBroker);
}
getTopicName() {
return `${this.orgId}/equToEdge/status`;
}
}
/********************************* 应用模块 **********************************************/
/**
* 应用消息专题
*/
class appNoticeTopic extends TopicBase {
constructor(cellsysApp) {
super(cellsysApp.orgId, TopicType.SysMessage, QueryType.SystemBroker);
this.about = cellsysApp;
}
getTopicName() {
let cellsysApp = this.about;
return `${this.orgId}/app/${cellsysApp.id}`;
}
}
class appNoticeMessage {
constructor(msgArr) {
this.timeStamp = msgArr[0];
this.title = msgArr[1];
this.message = msgArr[2];
}
}
/********************************* 聊天模块 **********************************************/
/**
* 聊天、基类
*/
class ChatBase extends TopicBase {
constructor(orgId) {
if (new.target === ChatBase) {
throw new Error('ChatBase cannot be instantiated, can only be inherited');
}
super(orgId, TopicType.Chat, QueryType.ChatBroker);
}
}
//单聊消息对应单聊mq目前针对explorer逻辑
class SingleChatTopic extends ChatBase {
constructor(orgId, conversationId, senderId) {
super(orgId);
this.conversationId = conversationId;
this.senderId = senderId;
}
getTopicName() {
return `${this.orgId}/chat/single/${this.conversationId}/${this.senderId}`;
}
}
class SingleChatReplyTopic extends ChatBase {
constructor(orgId, conversationId) {
super(orgId);
this.conversationId = conversationId;
}
getTopicName() {
return `${this.orgId}/chat/single/${this.conversationId}/+`;
}
}
/**
* 系统后台-单聊-发送消息(指挥机中转)
*/
class ChatSystemTopic extends ChatBase {
constructor(orgId) {
super(orgId);
}
getTopicName() {
return this.orgId + '/chat/portal/single';
}
}
/**
* 系统后台-单聊-回复消息(指挥机中转)
*/
class ChatSystemReplyTopic extends ChatBase {
constructor(orgId, receiverId) {
super(orgId);
this.receiverId = receiverId;
}
getTopicName() {
return this.orgId + '/chat/replyPortal/single/' + this.receiverId;
}
}
/**
* 围栏事件订阅主题
*/
class FenceEventTopic extends TopicBase {
constructor(orgId) {
super(orgId, TopicType.Fence, QueryType.LocationBroker);
}
getTopicName() {
return this.orgId + '/fence/+';
}
}
/******************************* MQTT消息接收处理类 ***************************************/
class MessageBase {
constructor(param) {
if (new.target === MessageBase) {
throw new Error('MessageBase cannot be instantiated, can only be inherited');
}
let msg = param || {};
this.topicName = msg['destinationName'];
this.messageString = msg['payloadString'];
}
get message() {
return this.messageString;
}
}
class EquipmentMessage extends MessageBase {
constructor(param) {
super(param);
this.$_resolveMsg();
}
$_resolveMsg() {
let msg = JSON.parse(this.messageString);
this.data = msg.data;
this.macId = msg.macid;
this.sysTypeId = msg.type;
this.transportMode = msg.transportMode;
this.dataType = msg.dataType;
this.orgId = msg.orgId;
this.dateTime = msg.dateTime; //上传时间
}
}
//edgeCommander发送的消息
class EdgeCommanderMessage extends MessageBase {
constructor(param) {
super(param);
let { topic } = param;
let topicSign = topic.split('/');
if (topicSign.length > 3) {
this.macId = topicSign[2];
this.msgType = topicSign[3];
}
}
}
export {
pushTask,
TopicBase,
TopicType,
MemberLocTopic,
EquipmentTopic,
EquipmentMonitorTopic,
SosTopic,
BDBoxTopic,
EquipmentMessage,
appNoticeTopic,
ChatSystemTopic,
SingleChatTopic,
ChatSystemReplyTopic,
SingleChatReplyTopic,
FenceEventTopic,
eventNoticeTopic,
markerNoticeTopic,
EquipmentStatusTopic,
memberJoinTopic,
memberAuditTopic,
offlineFenceTopic,
EquToEdgeTopic,
EdgeDeviceStatusTopic,
SendDownloadOfflineData,
};