import { MqttAsync, MqttClient, MqttClientOptions, MqttConnectOptions,MqttQos, MqttMessage, MqttPublishOptions } from '@ohos/mqtt'; import CommonConstants from '../../Common/constants/CommonConstants'; import AntiWristStrap from '../../viewmodel/device/AntiWristStrap'; import CardReader from '../../viewmodel/device/CardReader'; import ElectricScrewdriver from '../../viewmodel/device/ElectricScrewdriver'; import ElectricSolderingIron from '../../viewmodel/device/ElectricSolderingIron'; import Lighting from '../../viewmodel/device/Lighting'; import TempHumiditySensor from '../../viewmodel/device/TempHumiditySensor'; import ThreeColourLight from '../../viewmodel/device/ThreeColourLight'; import WeldFumeExtractor from '../../viewmodel/device/WeldFumeExtractor'; import BusinessConstant, { DeviceType } from '../constants/CommonConstants' const TAG = 'hhtest'; type MessageCallback = (topic: string, payload: string) => void; interface TagValuePair { tag: string; value: number; } class MqttManager { // static readonly MQTT_SERVICE = '192.168.1.10' private static instance: MqttManager; private client: MqttClient | null = null; private callbacks: Map = new Map(); private constructor() {} public static getInstance(): MqttManager { if (!MqttManager.instance) { MqttManager.instance = new MqttManager(); } return MqttManager.instance; } public init(options: MqttClientOptions): void { try { this.client = MqttAsync.createMqtt(options); this.registerCallbacks(); console.info(TAG, 'MQTT client initialized'); } catch (err) { console.error(TAG, `Initialization failed: ${JSON.stringify(err)}`); } } private registerCallbacks(): void { if (!this.client) return; this.client.messageArrived((err, message) => { if (err) { console.error(TAG, `Message error: ${err.message}`); return; } this.handleMessage(message); }); this.client.connectLost((err) => { if (err) { console.error(TAG, `Connection lost: ${err.message}`); } this.reconnect(); }); } // 修改后的 handleMessage 方法片段 private handleMessage(message: MqttMessage): void { const topic = message.topic; const payload = message.payload.toString(); const topicCallbacks = this.callbacks.get(topic) || []; topicCallbacks.forEach(cb => cb(topic, payload)); try { const valueJson: MQTTReceiveData = JSON.parse(payload); //订阅主题一的接收数据 if (CommonConstants.mqttSubscribeTopic1 == topic) { let device0: ThreeColourLight = { LedRed: 0, LedOrange: 0, LedGreen: 0, Buzzer: 0, OpenStatus: 0 } let device1: WeldFumeExtractor = { ExhaustFan: 0, OnlineStatus: 0 } let device2: ElectricSolderingIron = { SolderingCurrTemp: 0, OnlineStatus: 0 } let device3: ElectricScrewdriver = { TighteRotationDirection: 0, TorqueHoldTime: 0, TightenTorqueTarget: 0, TorqueUpperLimit: 0, TorqueLowerLimit: 0, EnableFloatSlipDetection: 0, FloatDetectionTurns: 0, OnlineStatus: 0 } let device4: AntiWristStrap = { RingWearNormal: 0, RingWearStandby: 0, RingWearFault: 0, OnlineStatus: 0 } let device5: CardReader = { RfidCardNum1: '', RfidCardNum2: '', OnlineStatus: 0 } let device6: Lighting = { lighting: 0, OnlineStatus: 0 } let device7: TempHumiditySensor = { Temperature: 0, Humidity: 0, OnlineStatus: 0 } for (const element of valueJson.d) { if (BusinessConstant.attrMap.has(element.tag)) { let deviceType: string = BusinessConstant.attrMap.get(element.tag) ?? ''; switch (deviceType) { case DeviceType.ThreeColourLight: if (element.tag! === 'LedRed') { device0.LedRed = element.value } else if (element.tag! === 'LedOrange') { device0.LedOrange = element.value } else if (element.tag! === 'LedGreen') { device0.LedGreen = element.value } else if (element.tag! === 'Buzzer') { device0.Buzzer = element.value } if (device0.LedRed! === 1 || device0.LedOrange! === 1 || device0.LedGreen! === 1 || device0.Buzzer! === 1) { device0.OpenStatus = 1 } device0.OnlineStatus = 1 break; case DeviceType.WeldFumeExtractor: if (element.tag! === 'ExhaustFan') { device1.ExhaustFan = element.value } device1.OnlineStatus = 1 break; case DeviceType.ElectricSolderingIron: if (element.tag! === 'SolderingCurrTemp') { device2.SolderingCurrTemp = element.value } device2.OnlineStatus = 1 break; case DeviceType.ElectricScrewdriver: if (element.tag! === 'TightenTorqueTarget') { device3.TightenTorqueTarget = element.value } else if (element.tag! === 'TorqueHoldTime') { device3.TorqueHoldTime = element.value } device3.OnlineStatus = 1 break; case DeviceType.AntiWristStrap: if (element.tag! === 'RingWearNormal') { device4.RingWearNormal = element.value } else if (element.tag! === 'RingWearStandby') { device4.RingWearStandby = element.value } else if (element.tag! === 'RingWearFault') { device4.RingWearFault = element.value } device4.OnlineStatus = 1 break; case DeviceType.CardReader: if (element.tag! === 'RfidCardNum1') { let value = element.value?.toString(16) device5.RfidCardNum1 = value === '0' ? '' : value } else if (element.tag! === 'RfidCardNum2') { let value = element.value?.toString(16) device5.RfidCardNum2 = value === '0' ? '' : value } device5.OnlineStatus = 1 break; case DeviceType.Lighting: if (element.tag! === 'lighting') { device6.lighting = element.value } device6.OnlineStatus = 1 break; case DeviceType.TempHumiditySensor: if (element.tag! === 'Temperature') { device7.Temperature = element.value } else if (element.tag! === 'Humidity') { device7.Humidity = element.value } device7.OnlineStatus = 1 break; } } AppStorage.setOrCreate('ThreeColourLight', device0); AppStorage.setOrCreate('WeldFumeExtractor', device1); AppStorage.setOrCreate('ElectricSolderingIron', device2); AppStorage.setOrCreate('ElectricScrewdriver', device3); AppStorage.setOrCreate('AntiWristStrap', device4); AppStorage.setOrCreate('CardReader', device5); AppStorage.setOrCreate('Lighting', device6) AppStorage.setOrCreate('TempHumiditySensor', device7) } } else if (CommonConstants.mqttSubscribeTopic2 == topic) { //订阅主题二的接收数据 let device0: TempHumiditySensor = { Temperature: 0, Humidity: 0, OnlineStatus: 0 } for (const element of valueJson.d) { if (CommonConstants.attrMap.has(element.tag)) { let deviceType: string = CommonConstants.attrMap.get(element.tag) ?? ''; switch (deviceType) { case DeviceType.TempHumiditySensor: if (element.tag! === 'Temperature') { device0.Temperature = element.value } else if (element.tag! === 'Humidity') { device0.Humidity = element.value } device0.OnlineStatus = 1 break; } } } AppStorage.setOrCreate('TempHumiditySensor', device0); } //订阅主题二的接收数据 else if(CommonConstants.mqttSubscribeTopic3 == topic){ const station1Set = valueJson?.d?.find(item => item.tag === 'Station1Set')?.value; const station1Weight = decodeWeight(valueJson?.d?.find(item => item.tag === 'Station1Weight')?.value); const rfidStringIn = decodeRfidString( valueJson?.d?.find(item => item.tag === 'Barcode1Data1')?.value as number, valueJson?.d?.find(item => item.tag === 'Barcode1Data2')?.value as number, valueJson?.d?.find(item => item.tag === 'Barcode1Data3')?.value as number, valueJson?.d?.find(item => item.tag === 'Barcode1Data4')?.value as number ); AppStorage.SetOrCreate('drawerPositionStatus', station1Set); AppStorage.SetOrCreate('materialBoxWeight', station1Weight); AppStorage.SetOrCreate('materialBoxInRfid', rfidStringIn); } } catch (e) { console.error("MQTT消息处理异常:", e); } } public async connect(options: MqttConnectOptions): Promise { if (!this.client) return false; try { const res = await this.client.connect(options); if (res.code === 0) { console.info(TAG, 'Connected to broker'); return true; } console.error(TAG, `Connect failed: ${res.message}`); return false; } catch (err) { console.error(TAG, `Connect error: ${err.message}`); return false; } } public async subscribe(topic: string, callback?: MessageCallback): Promise { if (!this.client) return; try { const res = await this.client.subscribe({ topic, qos: 1 }); if (res.code === 0) { console.info(TAG, `Subscribed to ${topic}`); if (callback) { this.addCallback(topic, callback); } } } catch (err) { console.error(TAG, `Subscribe error: ${err.message}`); } } private addCallback(topic: string, callback: MessageCallback): void { const callbacks = this.callbacks.get(topic) || []; callbacks.push(callback); this.callbacks.set(topic, callbacks); } private reconnect(): void { setTimeout(() => { this.client?.reconnect().then(success => { if (success) { console.info(TAG, 'Reconnected successfully'); } }); }, 5000); } public async disconnect(): Promise { if (!this.client) return; try { await this.client.disconnect(); console.info(TAG, 'Disconnected'); } catch (err) { console.error(TAG, `Disconnect error: ${err.message}`); } } public async publish( topic: string, payload: string | object, qos: MqttQos = 1, retained: boolean = false ): Promise { if (!this.client) { console.error(TAG, 'MQTT客户端未初始化'); return; } try { // 统一处理 payload 类型 const payloadStr = typeof payload === 'string' ? payload : JSON.stringify(payload); const options: MqttPublishOptions = { topic: topic, payload: payloadStr, qos: qos, retained: retained }; const res = await this.client.publish(options); if (res.code === 0) { console.info(TAG, `消息发布成功: ${topic}`); } else { console.error(TAG, `发布失败: ${res.message}`); } } catch (err) { console.error(TAG, `发布异常: ${JSON.stringify(err)}`); } } } export default MqttManager.getInstance(); export interface MQTTPublishData { w: TagValuePair[]; } export interface MQTTReceiveData { d: TagValuePair[]; } function decodeRegister(regValue: number | undefined): string { if (regValue === undefined || regValue === 0) return ''; //if (regValue === undefined) return ''; // 处理undefined // 确保是16位无符号整数 const value = regValue & 0xFFFF; // 提取高8位和低8位 const highByte = (value >> 8) & 0xFF; const lowByte = value & 0xFF; // 转换为ASCII字符 return String.fromCharCode(highByte) + String.fromCharCode(lowByte); } // 完整RFID解码函数 function decodeRfidString( rfidData1?: number, rfidData2?: number, rfidData3?: number, rfidData4?: number ): string { return [ decodeRegister(rfidData1), decodeRegister(rfidData2), decodeRegister(rfidData3), decodeRegister(rfidData4) ].join(''); } function decodeWeight(regValue: number | undefined): number { if (regValue === undefined) return 0; // 处理undefined // 确保是16位无符号整数 const value = regValue & 0xFFFF; // 提取高8位和低8位 const highByte = (value >> 8) & 0xFF; const lowByte = value & 0xFF; // 转换为ASCII字符 return highByte+lowByte/100 }