123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279 |
- using AmrControl.workstation;
- using MQTTnet;
- using MQTTnet.Client;
- using MQTTnet.Protocol;
- using Newtonsoft.Json;
- using SixLabors.ImageSharp.ColorSpaces.Companding;
- namespace AmrControl.Clients
- {
- /// <summary>
- /// 来回读配置文件会导致内存异常,这是目前dotnet的问题,因此不要直接引用配置文件的内容
- /// </summary>
- public class MqttClient
- {
- /// <summary>
- /// client客户端
- /// </summary>
- private IMqttClient mqttClient;
- /// <summary>
- /// 连接状态
- /// </summary>
- private bool linkedState = false;
- /// <summary>
- /// 配置信息
- /// </summary>
- private readonly IConfiguration _configuration;
- /// <summary>
- /// 服务获取
- /// </summary>
- private readonly IServiceProvider _service;
- // protected readonly Microsoft.Extensions.Logging.ILogger _logger;
- private bool reConnTaskStarted = false;
- private object obj = new object();
- string wsid, send_topic, sub_topic;
- /// <summary>
- /// 构造函数
- /// </summary>
- /// <param name="configuration"></param>
- /// <param name="service"></param>
- public MqttClient(IConfiguration configuration, IServiceProvider service)
- {
- this._configuration = configuration;
- this._service = service;
- wsid = _configuration["Mqtt:id"]; //用户标识ID
- send_topic = _configuration["Mqtt:send-topic"]; //用户标识ID
- sub_topic = _configuration["Mqtt:sub-topic"]; //用户标识ID
- }
- /// <summary>
- /// 构建客户端
- /// </summary>
- public void CreateClient()
- {
- try
- {
- if (linkedState && mqttClient != null && mqttClient.IsConnected)
- {
- //启动并且连接成功的服务不再考虑进行连接
- return;
- }
- if(mqttClient != null)
- {
- mqttClient.Dispose();
- }
- MqttClientOptions Option = new MqttClientOptionsBuilder().WithTcpServer(_configuration["Mqtt:url"], int.Parse(_configuration["Mqtt:port"]))
- .WithClientId(wsid) //客户端标识Id要唯一。
- //.WithCredentials(_configuration["Mqtt:username"], _configuration["Mqtt:password"]) //用户名,密码
- .WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V311)
- .WithCleanSession(true)
- .WithWillQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.AtMostOnce)
- .WithTimeout(new TimeSpan(0, 0, 10))
- .WithKeepAlivePeriod(TimeSpan.FromSeconds(20))
- .Build();
- mqttClient = new MqttFactory().CreateMqttClient();
- mqttClient.ApplicationMessageReceivedAsync += ReceivedHandle;
- mqttClient.ConnectedAsync += ConnectedHandle;
- mqttClient.DisconnectedAsync += DisconnectedHandle;
- //调用异步方法连接到服务端
- mqttClient.ConnectAsync(Option).Wait();
- //订阅主题
- SubTopics(sub_topic);
- }
- catch (Exception ex)
- {
- // _logger.LogError($"mqttclient connect failed. the cId {CId}, the error: {ex.Message}");
- linkedState = false;
- }
- }
- /// <summary>
- /// 订阅主题
- /// </summary>
- public async void SubTopics(string topic)
- {
- topic = topic.Trim();
- if (string.IsNullOrEmpty(topic))
- {
- // _logger.LogWarning("订阅主题不能为空!");
- return;
- }
- topic = string.Format(topic, wsid);
- // 判断客户端是否连接
- if (mqttClient == null || !mqttClient.IsConnected)
- {
- //_logger.LogWarning("MQTT 客户端尚未连接!");
- return;
- }
- // 设置订阅参数
- var subscribeOptions = new MqttClientSubscribeOptionsBuilder()
- .WithTopicFilter(topic, MqttQualityOfServiceLevel.ExactlyOnce)
- .Build();
- try
- {
- // 订阅
- await mqttClient.SubscribeAsync(
- subscribeOptions,
- System.Threading.CancellationToken.None);
- // _logger.LogInformation("监听主题:" + topic);
- Console.WriteLine("监听主题:" + topic);
- }
- catch (Exception e)
- {
- // _logger.LogError($"消息监听失败.监听主题:{topic}, 失败原因:{e.Message}");
- Console.WriteLine($"消息监听失败.监听主题:{topic}, 失败原因:{e.Message}");
- }
- }
- /// <summary>
- /// 处理接收到的消息
- /// </summary>
- /// <param name="arg"></param>
- /// <returns></returns>
- private Task ReceivedHandle(MqttApplicationMessageReceivedEventArgs arg)
- {
- ///收到消息
- string content = System.Text.Encoding.UTF8.GetString(arg.ApplicationMessage.Payload);
- //_logger.LogDebug("接收消息:" + content);
- MsWorkstation ws = _service.GetRequiredService<MsWorkstation>();
- if(ws != null)
- {
- ws.PushMqMessage(arg.ApplicationMessage.Topic, content);
- }
- return Task.CompletedTask;
- }
- /// <summary>
- /// 连接丢失处理
- /// </summary>
- /// <param name="arg"></param>
- /// <returns></returns>
- private Task DisconnectedHandle(MqttClientDisconnectedEventArgs arg)
- {
- //_logger.LogWarning("MQTT发生断线重联!");
- Thread.Sleep(2000);
- reConnect();
- return Task.CompletedTask;
- }
- public async void reConnect()
- {
- lock (obj)
- {
- if (reConnTaskStarted)
- return ;
- //防止多个线程执行
- reConnTaskStarted = true;
- if (mqttClient != null)
- {
- mqttClient.Dispose();
- }
- mqttClient = null;
- }
- try
- {
- linkedState = false;
- //重连服务
- CreateClient();
- reConnTaskStarted = false;
- }
- catch
- {
- }
- return ;
- }
- /// <summary>
- /// 连接处理
- /// </summary>
- /// <param name="arg"></param>
- /// <returns></returns>
- private Task ConnectedHandle(MqttClientConnectedEventArgs arg)
- {
- linkedState = true;
- return Task.CompletedTask;
- }
- public bool SendToMqtt(string msg)
- {
- return SendToMqtt(send_topic, msg);
- }
- public bool SendToMqtt(string topic, string msg)
- {
- try
- {
- if (mqttClient == null)
- return false;
- if (linkedState || mqttClient.IsConnected)
- {
- //发布消息
- ClientPublish(topic, msg);
- return true;
- }
- else
- {
- return false;
- }
- }
- catch
- {
- return false;
- }
- }
- /// <summary>
- /// 发送消息
- /// </summary>
- /// <param name="topic"></param>
- /// <param name="message"></param>
- private async void ClientPublish(string topic, string message)
- {
- topic = topic.Trim();
- message = message.Trim();
- if (string.IsNullOrEmpty(topic))
- {
- // _logger.LogWarning("发送主题不能为空!");
- return;
- }
- // 判断客户端是否连接
- if (!linkedState || mqttClient == null || !mqttClient.IsConnected)
- {
- //_logger.LogWarning("MQTT 客户端尚未连接!");
- return;
- }
- // _logger.LogDebug("发送消息:" + message);
- // 填充消息
- var applicationMessage = new MqttApplicationMessageBuilder()
- .WithTopic(topic) // 主题
- .WithPayload(message) // 消息
- .WithRetainFlag(false) // 持久化为空,不持久化
- .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce)
- .Build();
- try
- {
- await mqttClient.PublishAsync(applicationMessage);
- }
- catch (Exception)
- {
- }
- }
- }
- }
|