using Microsoft.AspNetCore.SignalR; using MQTTnet; using MQTTnet.Client; using Newtonsoft.Json; using Newtonsoft.Json.Converters; using System.Text; namespace AmrControl.Common { /// /// AMR车的控制器,监听Zigbee的消息,转发mqtt消息 /// 在系统中是单例实现 /// public class AmrManager : IAmrManager,IDisposable { public static bool ProcessExit = false; public bool LinkState { get; private set; } public int AmrOnlineCount { get; private set; } private readonly IHubContext hubContext; MqttFactory Factory = null; IMqttClient mqttClient = null; string subscribeTopic = "device/agv-control/handle"; string publicTopic = "srv/device/handle"; IsoDateTimeConverter timeConverter; RS485 rs485; public AmrManager(IHubContext context) { LinkState = false; AmrOnlineCount = 0; hubContext = context; timeConverter = new IsoDateTimeConverter(); //这里使用自定义日期格式,如果不使用的话,默认是ISO8601格式 timeConverter.DateTimeFormat = "yyyy-MM-dd HH:mm:ss"; rs485 = new RS485(); //四种事件使用的mqtt主题是一样的,可以改动消息类型来适应 rs485.ReceiveCarStateCmdEvent += Rs485_ReceiveCarStateCmdEvent; rs485.CarConnectedEvent += Rs485_CarConnectedEvent; rs485.CarDisconnectedEvent += Rs485_CarDisconnectedEvent; rs485.CarArrivedEvent += Rs485_CarArrivedEvent; //模拟一个数据 //cars.Add(new CarStateCmd() { carId="0xfe01", loading =true ,msgType = MessageType.MSG_TYPE_CAR_REPORT, state = CarState.Moving, type = CarType.AMR, speed="0.98", powerLevel="90%", curPos= new Position(0,0,0,"0"),destPos= new Position(10,22,33, "0") }); // cars.Add(new CarStateCmd() { carId="0xfe02", loading =true ,msgType = MessageType.MSG_TYPE_CAR_REPORT, state = CarState.Moving, type = CarType.AMR, speed="1.8", powerLevel="85%", curPos= new Position(1,1,1,"0"),destPos= new Position(20,40,60, "0") }); //AmrOnlineCount = 2; } //车到达某个指定位置的消息 private void Rs485_CarArrivedEvent(CarStateCmd carState) { string str = encodeCarStateCmd(publicTopic, carState); if (str != "") { ClientPublish(publicTopic, str); } } //车上线的消息 private void Rs485_CarDisconnectedEvent(CarStateCmd carState) { AmrOnlineCount = rs485.CarCount; string str = encodeCarStateCmd(publicTopic, carState); if (str != "") { ClientPublish(publicTopic, str); } } //车下线的消息 private void Rs485_CarConnectedEvent(CarStateCmd carState) { AmrOnlineCount = rs485.CarCount; string str = encodeCarStateCmd(publicTopic, carState); if (str != "") { ClientPublish(publicTopic, str); } } //车定时上报状态的消息 private void Rs485_ReceiveCarStateCmdEvent(CarStateCmd carState) { string str = encodeCarStateCmd(publicTopic, carState); if (str != "") { ClientPublish(publicTopic, str); } } public bool ResetSystem() { bool ok = CreateMqttClient(); if (rs485 != null) { rs485.Stop(); } rs485.Start(); if (hubContext != null) { if(ok) { ChatHubMsg("AmrManager", "mqttclient conntcted success."); //创建心跳上报的程序,定期上报设备的状态 //CreateHeartBeat(); // encodeCarStateCmd(publicTopic, cars[0]); } else { ChatHubMsg("AmrManager", "mqttclient conntcted fail."); } } return ok; } public IList GetAmrMessage() { List messages = new List(); messages.Add(new AmrMessage() { Topic = "amrtopic", Time = DateTime.Now, Message = "amr message" }); return messages; } public IList GetMqMessages() { List messages = new List(); messages.Add(new MqMessage() { Topic = "mqtopic", Time = DateTime.Now, Message = "mq message" }); return messages; } public bool GetLinkState() { return LinkState; } public int GetAmrOnlineCount() { return AmrOnlineCount; } public bool SendToMqtt(string topic, string msg) { if(mqttClient.IsConnected) { ClientPublish(topic, msg); return true; } else { return false; } } bool CreateMqttClient() { try { if(mqttClient != null) { mqttClient.Dispose(); } string CId = "AmrManager"; //用户标识ID string userName = "jg-admin"; //用户名 string passWord = "jg-admin@2022"; //密码 MqttClientOptions Option = new MqttClientOptionsBuilder().WithTcpServer("192.168.1.4", 1883) // ("192.168.1.10", 1883)//地址端口号 .WithClientId(CId) //客户端标识Id要唯一。 .WithCredentials(userName, passWord) //用户名,密码 .WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V311) .WithCleanSession(false) .WithWillQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.AtMostOnce) .WithTimeout(new TimeSpan(0, 0, 10)) .WithKeepAlivePeriod(TimeSpan.FromSeconds(20)) .Build(); Factory = new MqttFactory(); mqttClient = Factory.CreateMqttClient(); mqttClient.ApplicationMessageReceivedAsync += MqttClient_ApplicationMessageReceivedAsync; mqttClient.ConnectedAsync += MqttClient_ConnectedAsync; mqttClient.DisconnectedAsync += MqttClient_DisconnectedAsync; //调用异步方法连接到服务端 mqttClient.ConnectAsync(Option).Wait(); //订阅主题 ClientSubscribeTopic(subscribeTopic); return mqttClient.IsConnected; } catch (Exception ex) { ChatHubMsg("AmrManager", "mqttclient connect failed."); LinkState = false; return false; } } //失去连接 private Task MqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg) { if(mqttClient != null) { mqttClient.Dispose(); } mqttClient = null; LinkState = false; //新建线程定时连接,连接成功,LinkState= true Task.Run(() => { while(LinkState == false) { Thread.Sleep(10000); CreateMqttClient(); } }); return Task.CompletedTask; } //已经连接 private Task MqttClient_ConnectedAsync(MqttClientConnectedEventArgs arg) { ChatHubMsg("AmrManager", "mqttclient connect success."); LinkState = true; return Task.CompletedTask; } //接收到消息 private Task MqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg) { ///收到消息 string topic = arg.ApplicationMessage.Topic; string content = System.Text.Encoding.UTF8.GetString(arg.ApplicationMessage.Payload); //转到界面上显示 if(topic == "device/agv-control/handle") { decodeMsg(topic,content); } else { ChatHubMsg(topic , "不识别的主题"); } return Task.CompletedTask; } //mqtt解码为消息对象 void decodeMsg(string topic,string s) { try { int index = s.IndexOf('.'); string substr = s.Substring(index + 1); string headStr = s.Substring(0, index); byte[] bytes = Convert.FromBase64String(substr); byte[] headBytes = Convert.FromBase64String(headStr); short msgType = (short)((short)(headBytes[3] << 8) + headBytes[4]); MessageType type = (MessageType)Enum.Parse(typeof(MessageType), Enum.GetName(typeof(MessageType), msgType)); switch(type) { case MessageType.MSG_TYPE_CALL_CAR: CallCarCmd cmdCall = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(bytes)); cmdCall.msgType = MessageType.MSG_TYPE_CALL_CAR; rs485.SendCarCmd(cmdCall); break; case MessageType.MSG_TYPE_STOP_CAR: StopCarCmd cmdStop = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(bytes)); cmdStop.msgType = MessageType.MSG_TYPE_STOP_CAR; rs485.SendCarStop(cmdStop); break; case MessageType.MSG_TYPE_ENABLE_CAR: EnableCarCmd cmdEnable = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(bytes)); cmdEnable.msgType = MessageType.MSG_TYPE_ENABLE_CAR; rs485.SendCarEnable(cmdEnable); break; case MessageType.MSG_TYPE_DISABLE_CAR: DisableCarCmd cmdDisable = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(bytes)); cmdDisable.msgType = MessageType.MSG_TYPE_DISABLE_CAR; rs485.SendCarDisable(cmdDisable); break; case MessageType.MSG_TYPE_CAR_SHOW: //显示图片 CarShowCmd cmdCarshow = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(bytes)); cmdCarshow.msgType = MessageType.MSG_TYPE_CAR_SHOW; rs485.SendCarShowImage(cmdCarshow); break; default: break; } //转换回字符串 //Encoding.UTF8.GetString(bytes) } catch (Exception ex) { ChatHubMsg(topic, "解码失败!"); } } //消息对象编码为mqtt消息 string encodeCarStateCmd(string topic , CarStateCmd carState) { try { string msg = JsonConvert.SerializeObject(carState, Formatting.Indented, timeConverter); string strMsg = Convert.ToBase64String(Encoding.UTF8.GetBytes(msg)); byte[] headBytes = new byte[7]; short msgType = (short)carState.msgType; headBytes[0] = 0x09; headBytes[1] = 0x12; headBytes[2] = 1; //软件版本 headBytes[3] = (byte)((msgType >> 8) & 0xff); //消息类型 headBytes[4] = (byte)((msgType) & 0xff); headBytes[5] = 1;//加密类型 headBytes[6] = 1;//客户端 string strHead = Convert.ToBase64String(headBytes); return strHead + "." + strMsg; } catch (Exception ex) { ChatHubMsg(topic, "编码失败!"); return ""; } } //消息发送到界面上 private bool ChatHubMsg(string topic , string msg) { if (hubContext != null) { hubContext.Clients.All.SendAsync("ReceiveMessage", topic, msg); //SendMessage("ArmMessage", "reset system"); } return true; } //订阅主题 private async void ClientSubscribeTopic(string topic) { topic = topic.Trim(); if (string.IsNullOrEmpty(topic)) { //Console.Write("订阅主题不能为空!"); return; } // 判断客户端是否连接 if (mqttClient != null && !mqttClient.IsConnected) { //Console.WriteLine("MQTT 客户端尚未连接!"); return; } // 设置订阅参数 var subscribeOptions = new MqttClientSubscribeOptionsBuilder() .WithTopicFilter(topic) .Build(); try { // 订阅 await mqttClient.SubscribeAsync( subscribeOptions, System.Threading.CancellationToken.None); } catch (Exception ex) { } } private async void ClientUnsubscribeTopic(string topic) { topic = topic.Trim(); if (string.IsNullOrEmpty(topic)) { // Console.Write("退订主题不能为空!"); return; } // 判断客户端是否连接 if (!mqttClient.IsConnected) { // Console.WriteLine("MQTT 客户端尚未连接!"); return; } // 设置订阅参数 var subscribeOptions = new MqttClientUnsubscribeOptionsBuilder() .WithTopicFilter(topic) .Build(); try { // 退订 await mqttClient.UnsubscribeAsync( subscribeOptions, System.Threading.CancellationToken.None); } catch (Exception ex) { } } private async void ClientPublish(string topic, string message) { topic = topic.Trim(); message = message.Trim(); if (string.IsNullOrEmpty(topic)) { // Console.Write("退订主题不能为空!"); return ; } // 判断客户端是否连接 if (mqttClient == null || !mqttClient.IsConnected) { // Console.WriteLine("MQTT 客户端尚未连接!"); return ; } // 填充消息 var applicationMessage = new MqttApplicationMessageBuilder() .WithTopic(topic) // 主题 .WithPayload(message) // 消息 .WithRetainFlag(false) // 持久化为空,不持久化 .Build(); try { await mqttClient.PublishAsync(applicationMessage); } catch (Exception ex) { } } public void Dispose() { if (rs485 != null) { rs485.Dispose(); } ClientUnsubscribeTopic(subscribeTopic); mqttClient.DisconnectAsync(); mqttClient.Dispose(); } } public interface IAmrManager { bool LinkState { get; } int AmrOnlineCount { get; } bool ResetSystem(); //msg和mqtt上序列化得消息内容一致 // bool SendToAmr(string topic,string msg); //msg和mqtt上序列化得消息内容一致 bool SendToMqtt(string topic, string msg); IList GetMqMessages(); IList GetAmrMessage(); bool GetLinkState(); int GetAmrOnlineCount(); } }