import { Query, QueryTask, QueryType } from './cellsysUtil.js'; import './utils/paho-mqtt.js'; import cellsysSystem from './cellsysSystem'; let clientManager = { //全局clinet管理 }; let clientLock = { //全局clinet连接状态锁 }; let callBackManager = { //全局回调管理 }; let topicManager = { //全局topic管理,包含onReceive,onSuccess,onFail }; let subscribeQueue = { //全局订阅队列 }; //通配符订阅处理 let wildcardTopic = []; //断线倒计时相关逻辑 let isLost = false; //用于进行是否开始倒计时判断 let lostTimeCount = null; const TopicType = { SysMessage: 'SysMessage', MemberLocation: 'MemberLocation', GroupLocation: 'GroupLocation', Fence: 'Fence', Equipment: 'Equipment', Task: 'Task', Chat: 'Chat', }; const createMqttClient = (cellsysPush, cellsysTopic) => { // let clientId = 'cellsys' + (new Date().getTime()) + Math.floor(Math.random() * 100) let clientId = cellsysSystem.token + cellsysTopic.topicType + new Date().getTime(); let { ip, port, aliasPath, isSSL, username, password } = cellsysPush; let client; if (aliasPath) { client = new Paho.Client(ip, Number(port), `/${aliasPath}`, clientId); } else { client = new Paho.Client(ip, Number(port), clientId); } return new Promise((resolve, reject) => { let options = { userName: username, useSSL: isSSL, password: password, reconnect: true, onFailure: function (err) { console.error('MQ创建客户端异常', err); reject(err); }, }; client.connect(options); client.onConnected = (reconnect) => { console.log(clientId, '连接建立的clientId'); if (reconnect) { isLost = false; clearTimeout(lostTimeCount); lostTimeCount = null; /* console.log(cellsysTopic, 'mq重连topic');*/ } resolve(client); }; client.onConnectionLost = (responseObject) => { console.log(responseObject, '中断对象'); console.log(cellsysTopic, '中断的topic'); console.log(clientId, '中断的clientId'); //连接中断时开始倒计时,2分钟后没有重连成功则执行错误回调; isLost = true; lostTimeCount = setTimeout(() => { if (responseObject.errorCode !== 0) { let topicName = cellsysTopic.getTopicName(); let failOperator = topicManager[topicName]['onFail']; if (failOperator) { failOperator(responseObject); } } }, 2 * 60 * 1000); }; /** * 全局MQTT接收消息处理 * @param res */ client.onMessageArrived = (res) => { try { let topic = res.topic, //主题 payloadStringArr = res.payloadString.split(';'); //返回数据 console.log(`【${topic}】${res.payloadString}`, '接收消息'); let wildcard = topic.substring(0, topic.lastIndexOf('/')) + '/+'; //获取通用主题 let topicName = wildcardTopic.some((name) => name === wildcard) ? wildcard : topic; //是否是通配符订阅的 let callback = callBackManager[topicName]; let topicSign = topic.split('/'), message = null; if (topicSign[0] === 'cellsys') { //系统消息,但是现在好像没有系统消息了(1.5.0)? message = payloadStringArr; } else if (topicSign[0] === 'lite') { //cellsysLite给web端回应的消息 message = { topic: res.topic, payloadString: res.payloadString, }; } else { //非系统消息 switch (topicSign[1]) { case 'chat': //增加聊天消息处理 case 'eventFeature': //自定义上报事件消息 case 'markerFeature': //标记上报消息 case 'memberJoin': //标记上报消息 message = JSON.parse(res.payloadString); break; case 'mqtt': //增加mqtt消息处理 message = res.payloadString; break; case 'loc': message = new MemberMessage(payloadStringArr); break; case 'equ': let type = topicSign[2]; switch (type) { case 'PLQ': break; /*case 'sensing': //传感器 break*/ case 'sos': message = res.payloadString; break; case '6': message = res.payloadString; break; case 'status': message = res.payloadString; break; default: message = new EquipmentMessage(res); break; } break; case 'app': message = new appNoticeMessage(payloadStringArr); break; case 'fence': //围栏事件 message = payloadStringArr; break; case 'equToEdge': message = new EdgeCommanderMessage(res); break; default: message = res.payloadString; break; } } if (callback) { callback(message); } } catch (error) { console.error(error); } }; window.addEventListener('unload', (e) => { client.disconnect(); console.log('mqtt连接关闭'); }); }); }; class CellsysPush { constructor(params) { this.port = params.port; this.ip = params.ip; this.username = params.username; this.password = params.password; this.aliasPath = params.path; //别名路径 this.isSSL = params.isSSL; } } class pushTask { constructor() { this.cellsysTopic = null; this.client = null; this.clientConnecting = false; this.cellsysOrg = window.CELLSYSORG; //这里的全局变量CELLSYSORG是cellsysOrg这个类在初始化时就埋进window上的 } /** * 连接服务器 * @param topic * @returns {Promise} * @private */ _connect(topic) { let query = new Query(); // let _queryType = location.protocol === 'https:' ? QueryType.MqWss : QueryType.MqWs; //连接地址 let queryTask = new QueryTask(this.cellsysTopic ? this.cellsysTopic.broker : topic.broker); return new Promise((resolve, reject) => { queryTask .execute(query) .then((res) => { let isSSL = false; let data = res.filter((item) => { if (location.protocol === 'https:') { //ws协议好像已经不允许不同域名情况下连接了,因此本地开发要走wss协议 if (item.type === 'wss') { isSSL = true; return true; } } else { if (item.type === 'ws') { isSSL = false; return true; } } }); let config = data[0].config; let params = { port: config.port, ip: config.ip ? config.ip : location.hostname, //如果mq配置里没有配置ip则用hostname,这种情况会出现在边缘工作站上 username: config.username, password: config.password, aliasPath: null, isSSL: isSSL, }; if (process.env.NODE_ENV === 'development') { params['ip'] = '192.168.2.10'; } let cloudOptions = this.cellsysOrg ? this.cellsysOrg.cloudOptions : {}; //如果组织开通了外部访问 if (cloudOptions.account && cloudOptions.token && cloudOptions.connectId) { //如果cloudHostname不是一级域名,需要先转成一级域名再拼接connectId let cloucHostnameSplit = location.hostname.split('.'); let _ip; if (cloucHostnameSplit.length > 2) { _ip = `${cloudOptions['connectId']}.${cloucHostnameSplit[1]}.${cloucHostnameSplit[2]}`; } else { _ip = `${cloudOptions['connectId']}.airkoon.cn`; } //当前访问路径和外部访问的域名一致,才视为从外部进行访问 if (location.hostname === _ip) { params['ip'] = _ip; params['port'] = 80; params['path'] = config.path; //只有开启了外部访问的情况下才会读取别名路径 isSSL = false; } } let cellsysPush = new CellsysPush(params); return createMqttClient(cellsysPush, this.cellsysTopic); }) .then((client) => { resolve(client); }) .catch((err) => { console.error(err); reject(err); }); }); } _removeFromQueue(topicName) { let topicType = this.cellsysTopic.topicType, queue = subscribeQueue[topicType]; let index = queue.indexOf(queue); if (index !== -1) { queue.splice(index, 1); } } _subscribe(topicName, onReceive, onSuccess, onFail, topicType) { this.client.subscribe(topicName, { qos: 0, onSuccess: (res) => { callBackManager[topicName] = onReceive; this._removeFromQueue(topicName); if (topicType) { clientLock[topicType] = false; } if (onSuccess) { onSuccess(res, this.client); } }, onFailure: (e) => { console.log(e); this._removeFromQueue(topicName); if (topicType) { clientLock[topicType] = false; } if (onFail) { onFail(e); } }, }); } _unSubscribe(topicName, onSuccess, onFail) { this.client.unsubscribe(topicName, { onSuccess: (res) => { console.log('取消订阅:', topicName); if (onSuccess) { onSuccess(res); } }, onFailure: (e) => { if (onFail) { onFail(e); } }, }); } /** * 订阅 * @param params */ subscribe(params) { let cellsysTopic = params.cellsysTopic, onReceive = params.onReceive, onFail = params.onFail, onSuccess = params.onSuccess; this.cellsysTopic = cellsysTopic; //主题对象 let topicType = cellsysTopic.topicType, //主题类型 topicName = cellsysTopic.getTopicName(); //主题 if (!subscribeQueue[topicType]) { //主题类型是否存在队列中 subscribeQueue[topicType] = []; subscribeQueue[topicType].push(topicName); } else { if (subscribeQueue[topicType].indexOf(topicName) === -1) { subscribeQueue[topicType].push(topicName); } } topicManager[topicName] = { onReceive: onReceive, onSuccess: onSuccess, onFail: onFail, }; if (topicName.lastIndexOf('/+') !== -1) { //有通配符 wildcardTopic.push(topicName); } if (!clientLock[topicType]) { if (clientManager[topicType]) { //是否有client this.client = clientManager[topicType]; this._subscribe(topicName, onReceive, onSuccess, onFail); } else { clientLock[topicType] = true; this._connect() .then((res) => { clientManager[topicType] = res; this.client = res; let queue = subscribeQueue[topicType]; if (queue.length > 0) { queue.forEach((tname) => { let topicOperator = topicManager[tname]; this._subscribe( tname, topicOperator['onReceive'], topicOperator['onSuccess'], topicOperator['onFail'], topicType, ); }); } }) .catch((err) => { console.error(err); onFail(err); }) .finally(() => { clientLock[topicType] = false; }); } } } unSubscribe(params) { let cellsysTopic = params.cellsysTopic, onFail = params.onFail, onSuccess = params.onSuccess; this.cellsysTopic = cellsysTopic; let topicType = cellsysTopic.topicType, topicName = cellsysTopic.getTopicName(); if (subscribeQueue[topicType]) { this._removeFromQueue(topicName); } if (clientManager[topicType]) { this.client = clientManager[topicType]; this._unSubscribe(topicName, onSuccess, onFail); } else { if (onFail) { onFail('尚未订阅该主题'); } } } /** * 发送mqtt消息 * @param params */ send(params) { let cellsysTopic = params.cellsysTopic, payload = params.message; //消息体 let topicType = cellsysTopic.topicType, topicName = cellsysTopic.getTopicName(); if (!clientLock[topicType]) { this.client = clientManager[topicType]; if (this.client) { this.client.publish(topicName, payload, 0, false); console.log(`【${topicName}】${payload}`, '消息推送'); } else { clientLock[topicType] = true; this._connect(cellsysTopic) .then((res) => { clientManager[topicType] = res; this.client = res; this.client.publish(topicName, payload, 0, false); console.log(`【${topicName}】${payload}`, '推送消息'); }) .catch((err) => { console.log(err); }) .finally(() => { clientLock[topicType] = false; }); } } } } /** * 订阅主题基本类,所有继承 * @param orgId 组织id * @param topicType 主题类型 * @param broker 主题url */ class TopicBase { constructor(orgId, topicType, broker) { if (new.target === TopicBase) { throw new Error('TopicBase cannot be instantiated, can only be inherited'); } // if (!orgId) { 订阅和发布话题也不一定要orgid吧 // throw new Error('orgId not undefined'); // } this.orgId = orgId; this.topicType = topicType; this.broker = broker; this.all = false; } getTopicType() { return this.topicType; } } /********************************* 人员模块 **********************************************/ /** * 人员位置专题 */ class MemberLocTopic extends TopicBase { constructor(cellsysMember) { super(cellsysMember.orgId, TopicType.MemberLocation, QueryType.LocationBroker); this.about = cellsysMember; } getTopicName() { let cellsysMember = this.about; return `${this.orgId}/loc/${cellsysMember.userId}`; } } class MemberMessage { constructor(msgArr) { this.userId = parseInt(msgArr[0]); this.lngLat = msgArr[1]; this.timeStamp = Number(msgArr[2]); } } /********************************* 设备模块 **********************************************/ /** * 设备模块主题 */ class DeviceTopic extends TopicBase { constructor(device) { if (new.target === DeviceTopic) { throw new Error('DeviceTopic cannot be instantiated, can only be inherited'); } super(device.orgId, TopicType.Equipment, QueryType.EquipmentBroker); this.about = device; } } /** * 设备专题 */ class EquipmentTopic extends DeviceTopic { constructor(equipment) { super(equipment); } getTopicName() { let equipment = this.about; return `${this.orgId}/equ/${equipment.type}/${equipment.macid}`; } } /** * sos订阅 */ class SosTopic extends DeviceTopic { constructor(device) { super(device); } getTopicName() { return `${this.orgId}/equ/sos/+`; } subTopicAllName() { return `${this.orgId}/equ/sos/+`; } } /** * 北斗盒子订阅(卡号:360348) */ class BDBoxTopic extends DeviceTopic { constructor(device) { super(device); } getTopicName() { //系统类型id return `${this.orgId}/equ/6/+`; } } class EquipmentMonitorTopic extends DeviceTopic { constructor(device) { super(device); this.topicName = device.topicName; } getTopicName() { //系统类型id return this.orgId + this.topicName + '+'; } } /** * 设备状态订阅;目前用于人员在线状态逻辑,后续可能用于网优部分 */ class EquipmentStatusTopic extends TopicBase { constructor(orgId) { super(orgId, TopicType.Equipment, QueryType.EquipmentBroker); } getTopicName() { let equipment = this.about; return `${this.orgId}/equ/status`; } } /** * 边缘工作站设备给边缘工作站的消息 */ class EquToEdgeTopic extends TopicBase { constructor(orgId, macId) { super(orgId, TopicType.Equipment, QueryType.EquipmentBroker); this.macId = macId; } getTopicName() { let equipment = this.about; return `${this.orgId}/equToEdge/${this.macId}/+`; } } /********************************* 系统消息模块 **********************************************/ /** * 自定义事件上传消息专题 * 主要用于接受自定义事件上传的消息提示 */ class eventNoticeTopic extends TopicBase { constructor(orgId) { super(orgId, TopicType.SysMessage, QueryType.SystemBroker); } getTopicName() { return `${this.orgId}/eventFeature`; } } /** * 标记上传消息专题 * 主要用于接受标记上传的消息提示 */ class markerNoticeTopic extends TopicBase { constructor(orgId) { super(orgId, TopicType.SysMessage, QueryType.SystemBroker); } getTopicName() { return `${this.orgId}/markerFeature`; } } /** * 成员申请加入专题 */ class memberJoinTopic extends TopicBase { constructor(orgId) { super(orgId, TopicType.SysMessage, QueryType.SystemBroker); } getTopicName() { return `${this.orgId}/memberJoin`; } } /** * 成员审核桩体 */ class memberAuditTopic extends TopicBase { constructor(orgId, userId) { super(orgId, TopicType.SysMessage, QueryType.SystemBroker); this.userId = userId; } getTopicName() { return `${this.orgId}/${this.userId}/orgAudit`; } } /** * 离线地图状态推送 */ class offlineFenceTopic extends TopicBase { constructor(orgId) { super(orgId, TopicType.SysMessage, QueryType.SystemBroker); } getTopicName() { return `${this.orgId}/offlineFence`; } } /** * 下载离线地图时推送MQ(目的是让cellsysLite去下载对应的地图) */ class SendDownloadOfflineData extends TopicBase { constructor(OfflineData) { super(null, TopicType.SysMessage, QueryType.SystemBroker); this.OfflineData = OfflineData; } getTopicName() { return `edge/lite/offlineFence`; } } /** * 边缘工作站连接的终端设备的状态,如电量 */ class EdgeDeviceStatusTopic extends TopicBase { constructor(orgId) { super(orgId, TopicType.SysMessage, QueryType.SystemBroker); } getTopicName() { return `${this.orgId}/equToEdge/status`; } } /********************************* 应用模块 **********************************************/ /** * 应用消息专题 */ class appNoticeTopic extends TopicBase { constructor(cellsysApp) { super(cellsysApp.orgId, TopicType.SysMessage, QueryType.SystemBroker); this.about = cellsysApp; } getTopicName() { let cellsysApp = this.about; return `${this.orgId}/app/${cellsysApp.id}`; } } class appNoticeMessage { constructor(msgArr) { this.timeStamp = msgArr[0]; this.title = msgArr[1]; this.message = msgArr[2]; } } /********************************* 聊天模块 **********************************************/ /** * 聊天、基类 */ class ChatBase extends TopicBase { constructor(orgId) { if (new.target === ChatBase) { throw new Error('ChatBase cannot be instantiated, can only be inherited'); } super(orgId, TopicType.Chat, QueryType.ChatBroker); } } //单聊消息,对应单聊mq,目前针对explorer逻辑 class SingleChatTopic extends ChatBase { constructor(orgId, conversationId, senderId) { super(orgId); this.conversationId = conversationId; this.senderId = senderId; } getTopicName() { return `${this.orgId}/chat/single/${this.conversationId}/${this.senderId}`; } } class SingleChatReplyTopic extends ChatBase { constructor(orgId, conversationId) { super(orgId); this.conversationId = conversationId; } getTopicName() { return `${this.orgId}/chat/single/${this.conversationId}/+`; } } /** * 系统后台-单聊-发送消息(指挥机中转) */ class ChatSystemTopic extends ChatBase { constructor(orgId) { super(orgId); } getTopicName() { return this.orgId + '/chat/portal/single'; } } /** * 系统后台-单聊-回复消息(指挥机中转) */ class ChatSystemReplyTopic extends ChatBase { constructor(orgId, receiverId) { super(orgId); this.receiverId = receiverId; } getTopicName() { return this.orgId + '/chat/replyPortal/single/' + this.receiverId; } } /** * 围栏事件订阅主题 */ class FenceEventTopic extends TopicBase { constructor(orgId) { super(orgId, TopicType.Fence, QueryType.LocationBroker); } getTopicName() { return this.orgId + '/fence/+'; } } /******************************* MQTT消息接收处理类 ***************************************/ class MessageBase { constructor(param) { if (new.target === MessageBase) { throw new Error('MessageBase cannot be instantiated, can only be inherited'); } let msg = param || {}; this.topicName = msg['destinationName']; this.messageString = msg['payloadString']; } get message() { return this.messageString; } } class EquipmentMessage extends MessageBase { constructor(param) { super(param); this.$_resolveMsg(); } $_resolveMsg() { let msg = JSON.parse(this.messageString); this.data = msg.data; this.macId = msg.macid; this.sysTypeId = msg.type; this.transportMode = msg.transportMode; this.dataType = msg.dataType; this.orgId = msg.orgId; this.dateTime = msg.dateTime; //上传时间 } } //edgeCommander发送的消息 class EdgeCommanderMessage extends MessageBase { constructor(param) { super(param); let { topic } = param; let topicSign = topic.split('/'); if (topicSign.length > 3) { this.macId = topicSign[2]; this.msgType = topicSign[3]; } } } export { pushTask, TopicBase, TopicType, MemberLocTopic, EquipmentTopic, EquipmentMonitorTopic, SosTopic, BDBoxTopic, EquipmentMessage, appNoticeTopic, ChatSystemTopic, SingleChatTopic, ChatSystemReplyTopic, SingleChatReplyTopic, FenceEventTopic, eventNoticeTopic, markerNoticeTopic, EquipmentStatusTopic, memberJoinTopic, memberAuditTopic, offlineFenceTopic, EquToEdgeTopic, EdgeDeviceStatusTopic, SendDownloadOfflineData, };