using Microsoft.AspNetCore.SignalR;
using MQTTnet;
using MQTTnet.Client;
using Newtonsoft.Json;
using Newtonsoft.Json.Converters;
using System.Text;
namespace AmrControl.Common
{
///
/// AMR车的控制器,监听Zigbee的消息,转发mqtt消息
/// 在系统中是单例实现
///
public class AmrManager : IAmrManager,IDisposable
{
public static bool ProcessExit = false;
public bool LinkState { get; private set; }
public int AmrOnlineCount { get; private set; }
private readonly IHubContext hubContext;
MqttFactory Factory = null;
IMqttClient mqttClient = null;
string subscribeTopic = "device/agv-control/handle";
string publicTopic = "srv/device/handle";
IsoDateTimeConverter timeConverter;
RS485 rs485;
public AmrManager(IHubContext context)
{
LinkState = false;
AmrOnlineCount = 0;
hubContext = context;
timeConverter = new IsoDateTimeConverter();
//这里使用自定义日期格式,如果不使用的话,默认是ISO8601格式
timeConverter.DateTimeFormat = "yyyy-MM-dd HH:mm:ss";
rs485 = new RS485();
//四种事件使用的mqtt主题是一样的,可以改动消息类型来适应
rs485.ReceiveCarStateCmdEvent += Rs485_ReceiveCarStateCmdEvent;
rs485.CarConnectedEvent += Rs485_CarConnectedEvent;
rs485.CarDisconnectedEvent += Rs485_CarDisconnectedEvent;
rs485.CarArrivedEvent += Rs485_CarArrivedEvent;
//模拟一个数据
//cars.Add(new CarStateCmd() { carId="0xfe01", loading =true ,msgType = MessageType.MSG_TYPE_CAR_REPORT, state = CarState.Moving, type = CarType.AMR, speed="0.98", powerLevel="90%", curPos= new Position(0,0,0,"0"),destPos= new Position(10,22,33, "0") });
// cars.Add(new CarStateCmd() { carId="0xfe02", loading =true ,msgType = MessageType.MSG_TYPE_CAR_REPORT, state = CarState.Moving, type = CarType.AMR, speed="1.8", powerLevel="85%", curPos= new Position(1,1,1,"0"),destPos= new Position(20,40,60, "0") });
//AmrOnlineCount = 2;
}
//车到达某个指定位置的消息
private void Rs485_CarArrivedEvent(CarStateCmd carState)
{
string str = encodeCarStateCmd(publicTopic, carState);
if (str != "")
{
ClientPublish(publicTopic, str);
}
}
//车上线的消息
private void Rs485_CarDisconnectedEvent(CarStateCmd carState)
{
AmrOnlineCount = rs485.CarCount;
string str = encodeCarStateCmd(publicTopic, carState);
if (str != "")
{
ClientPublish(publicTopic, str);
}
}
//车下线的消息
private void Rs485_CarConnectedEvent(CarStateCmd carState)
{
AmrOnlineCount = rs485.CarCount;
string str = encodeCarStateCmd(publicTopic, carState);
if (str != "")
{
ClientPublish(publicTopic, str);
}
}
//车定时上报状态的消息
private void Rs485_ReceiveCarStateCmdEvent(CarStateCmd carState)
{
string str = encodeCarStateCmd(publicTopic, carState);
if (str != "")
{
ClientPublish(publicTopic, str);
}
}
public bool ResetSystem()
{
bool ok = CreateMqttClient();
if (rs485 != null)
{
rs485.Stop();
}
rs485.Start();
if (hubContext != null)
{
if(ok)
{
ChatHubMsg("AmrManager", "mqttclient conntcted success.");
//创建心跳上报的程序,定期上报设备的状态
//CreateHeartBeat();
// encodeCarStateCmd(publicTopic, cars[0]);
}
else
{
ChatHubMsg("AmrManager", "mqttclient conntcted fail.");
}
}
return ok;
}
public IList GetAmrMessage()
{
List messages = new List();
messages.Add(new AmrMessage() { Topic = "amrtopic", Time = DateTime.Now, Message = "amr message" });
return messages;
}
public IList GetMqMessages()
{
List messages = new List();
messages.Add(new MqMessage() { Topic = "mqtopic", Time = DateTime.Now, Message = "mq message" });
return messages;
}
public bool GetLinkState()
{
return LinkState;
}
public int GetAmrOnlineCount()
{
return AmrOnlineCount;
}
public bool SendToMqtt(string topic, string msg)
{
if(mqttClient.IsConnected)
{
ClientPublish(topic, msg);
return true;
}
else
{
return false;
}
}
bool CreateMqttClient()
{
try
{
if(mqttClient != null)
{
mqttClient.Dispose();
}
string CId = "AmrManager"; //用户标识ID
string userName = "jg-admin"; //用户名
string passWord = "jg-admin@2022"; //密码
MqttClientOptions Option = new MqttClientOptionsBuilder().WithTcpServer("192.168.1.4", 1883) // ("192.168.1.10", 1883)//地址端口号
.WithClientId(CId) //客户端标识Id要唯一。
.WithCredentials(userName, passWord) //用户名,密码
.WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V311)
.WithCleanSession(false)
.WithWillQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.AtMostOnce)
.WithTimeout(new TimeSpan(0, 0, 10))
.WithKeepAlivePeriod(TimeSpan.FromSeconds(20))
.Build();
Factory = new MqttFactory();
mqttClient = Factory.CreateMqttClient();
mqttClient.ApplicationMessageReceivedAsync += MqttClient_ApplicationMessageReceivedAsync;
mqttClient.ConnectedAsync += MqttClient_ConnectedAsync;
mqttClient.DisconnectedAsync += MqttClient_DisconnectedAsync;
//调用异步方法连接到服务端
mqttClient.ConnectAsync(Option).Wait();
//订阅主题
ClientSubscribeTopic(subscribeTopic);
return mqttClient.IsConnected;
}
catch (Exception ex)
{
ChatHubMsg("AmrManager", "mqttclient connect failed.");
LinkState = false;
return false;
}
}
//失去连接
private Task MqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg)
{
if(mqttClient != null)
{
mqttClient.Dispose();
}
mqttClient = null;
LinkState = false;
//新建线程定时连接,连接成功,LinkState= true
Task.Run(() => {
while(LinkState == false)
{
Thread.Sleep(10000);
CreateMqttClient();
}
});
return Task.CompletedTask;
}
//已经连接
private Task MqttClient_ConnectedAsync(MqttClientConnectedEventArgs arg)
{
ChatHubMsg("AmrManager", "mqttclient connect success.");
LinkState = true;
return Task.CompletedTask;
}
//接收到消息
private Task MqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
{
///收到消息
string topic = arg.ApplicationMessage.Topic;
string content = System.Text.Encoding.UTF8.GetString(arg.ApplicationMessage.Payload);
//转到界面上显示
if(topic == "device/agv-control/handle")
{
decodeMsg(topic,content);
}
else
{
ChatHubMsg(topic , "不识别的主题");
}
return Task.CompletedTask;
}
//mqtt解码为消息对象
void decodeMsg(string topic,string s)
{
try
{
int index = s.IndexOf('.');
string substr = s.Substring(index + 1);
string headStr = s.Substring(0, index);
byte[] bytes = Convert.FromBase64String(substr);
byte[] headBytes = Convert.FromBase64String(headStr);
short msgType = (short)((short)(headBytes[3] << 8) + headBytes[4]);
MessageType type = (MessageType)Enum.Parse(typeof(MessageType), Enum.GetName(typeof(MessageType), msgType));
switch(type)
{
case MessageType.MSG_TYPE_CALL_CAR:
CallCarCmd cmdCall = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(bytes));
cmdCall.msgType = MessageType.MSG_TYPE_CALL_CAR;
rs485.SendCarCmd(cmdCall);
break;
case MessageType.MSG_TYPE_STOP_CAR:
StopCarCmd cmdStop = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(bytes));
cmdStop.msgType = MessageType.MSG_TYPE_STOP_CAR;
rs485.SendCarStop(cmdStop);
break;
case MessageType.MSG_TYPE_ENABLE_CAR:
EnableCarCmd cmdEnable = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(bytes));
cmdEnable.msgType = MessageType.MSG_TYPE_ENABLE_CAR;
rs485.SendCarEnable(cmdEnable);
break;
case MessageType.MSG_TYPE_DISABLE_CAR:
DisableCarCmd cmdDisable = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(bytes));
cmdDisable.msgType = MessageType.MSG_TYPE_DISABLE_CAR;
rs485.SendCarDisable(cmdDisable);
break;
case MessageType.MSG_TYPE_CAR_SHOW:
//显示图片
CarShowCmd cmdCarshow = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(bytes));
cmdCarshow.msgType = MessageType.MSG_TYPE_CAR_SHOW;
rs485.SendCarShowImage(cmdCarshow);
break;
default:
break;
}
//转换回字符串
//Encoding.UTF8.GetString(bytes)
}
catch (Exception ex)
{
ChatHubMsg(topic, "解码失败!");
}
}
//消息对象编码为mqtt消息
string encodeCarStateCmd(string topic , CarStateCmd carState)
{
try
{
string msg = JsonConvert.SerializeObject(carState, Formatting.Indented, timeConverter);
string strMsg = Convert.ToBase64String(Encoding.UTF8.GetBytes(msg));
byte[] headBytes = new byte[7];
short msgType = (short)carState.msgType;
headBytes[0] = 0x09;
headBytes[1] = 0x12;
headBytes[2] = 1; //软件版本
headBytes[3] = (byte)((msgType >> 8) & 0xff); //消息类型
headBytes[4] = (byte)((msgType) & 0xff);
headBytes[5] = 1;//加密类型
headBytes[6] = 1;//客户端
string strHead = Convert.ToBase64String(headBytes);
return strHead + "." + strMsg;
}
catch (Exception ex)
{
ChatHubMsg(topic, "编码失败!");
return "";
}
}
//消息发送到界面上
private bool ChatHubMsg(string topic , string msg)
{
if (hubContext != null)
{
hubContext.Clients.All.SendAsync("ReceiveMessage", topic, msg); //SendMessage("ArmMessage", "reset system");
}
return true;
}
//订阅主题
private async void ClientSubscribeTopic(string topic)
{
topic = topic.Trim();
if (string.IsNullOrEmpty(topic))
{
//Console.Write("订阅主题不能为空!");
return;
}
// 判断客户端是否连接
if (mqttClient != null && !mqttClient.IsConnected)
{
//Console.WriteLine("MQTT 客户端尚未连接!");
return;
}
// 设置订阅参数
var subscribeOptions = new MqttClientSubscribeOptionsBuilder()
.WithTopicFilter(topic)
.Build();
try
{
// 订阅
await mqttClient.SubscribeAsync(
subscribeOptions,
System.Threading.CancellationToken.None);
}
catch (Exception ex)
{
}
}
private async void ClientUnsubscribeTopic(string topic)
{
topic = topic.Trim();
if (string.IsNullOrEmpty(topic))
{
// Console.Write("退订主题不能为空!");
return;
}
// 判断客户端是否连接
if (!mqttClient.IsConnected)
{
// Console.WriteLine("MQTT 客户端尚未连接!");
return;
}
// 设置订阅参数
var subscribeOptions = new MqttClientUnsubscribeOptionsBuilder()
.WithTopicFilter(topic)
.Build();
try
{
// 退订
await mqttClient.UnsubscribeAsync(
subscribeOptions,
System.Threading.CancellationToken.None);
}
catch (Exception ex)
{
}
}
private async void ClientPublish(string topic, string message)
{
topic = topic.Trim();
message = message.Trim();
if (string.IsNullOrEmpty(topic))
{
// Console.Write("退订主题不能为空!");
return ;
}
// 判断客户端是否连接
if (mqttClient == null || !mqttClient.IsConnected)
{
// Console.WriteLine("MQTT 客户端尚未连接!");
return ;
}
// 填充消息
var applicationMessage = new MqttApplicationMessageBuilder()
.WithTopic(topic) // 主题
.WithPayload(message) // 消息
.WithRetainFlag(false) // 持久化为空,不持久化
.Build();
try
{
await mqttClient.PublishAsync(applicationMessage);
}
catch (Exception ex)
{
}
}
public void Dispose()
{
if (rs485 != null)
{
rs485.Dispose();
}
ClientUnsubscribeTopic(subscribeTopic);
mqttClient.DisconnectAsync();
mqttClient.Dispose();
}
}
public interface IAmrManager
{
bool LinkState { get; }
int AmrOnlineCount { get; }
bool ResetSystem();
//msg和mqtt上序列化得消息内容一致
// bool SendToAmr(string topic,string msg);
//msg和mqtt上序列化得消息内容一致
bool SendToMqtt(string topic, string msg);
IList GetMqMessages();
IList GetAmrMessage();
bool GetLinkState();
int GetAmrOnlineCount();
}
}