using AmrControl.workstation; using MQTTnet; using MQTTnet.Client; using MQTTnet.Protocol; using Newtonsoft.Json; using SixLabors.ImageSharp.ColorSpaces.Companding; namespace AmrControl.Clients { /// /// 来回读配置文件会导致内存异常,这是目前dotnet的问题,因此不要直接引用配置文件的内容 /// public class MqttClient { /// /// client客户端 /// private IMqttClient mqttClient; /// /// 连接状态 /// private bool linkedState = false; /// /// 配置信息 /// private readonly IConfiguration _configuration; /// /// 服务获取 /// private readonly IServiceProvider _service; // protected readonly Microsoft.Extensions.Logging.ILogger _logger; private bool reConnTaskStarted = false; private object obj = new object(); string wsid, send_topic, sub_topic; /// /// 构造函数 /// /// /// public MqttClient(IConfiguration configuration, IServiceProvider service) { this._configuration = configuration; this._service = service; wsid = _configuration["Mqtt:id"]; //用户标识ID send_topic = _configuration["Mqtt:send-topic"]; //用户标识ID sub_topic = _configuration["Mqtt:sub-topic"]; //用户标识ID } /// /// 构建客户端 /// public void CreateClient() { try { if (linkedState && mqttClient != null && mqttClient.IsConnected) { //启动并且连接成功的服务不再考虑进行连接 return; } if(mqttClient != null) { mqttClient.Dispose(); } MqttClientOptions Option = new MqttClientOptionsBuilder().WithTcpServer(_configuration["Mqtt:url"], int.Parse(_configuration["Mqtt:port"])) .WithClientId(wsid) //客户端标识Id要唯一。 //.WithCredentials(_configuration["Mqtt:username"], _configuration["Mqtt:password"]) //用户名,密码 .WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V311) .WithCleanSession(true) .WithWillQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.AtMostOnce) .WithTimeout(new TimeSpan(0, 0, 10)) .WithKeepAlivePeriod(TimeSpan.FromSeconds(20)) .Build(); mqttClient = new MqttFactory().CreateMqttClient(); mqttClient.ApplicationMessageReceivedAsync += ReceivedHandle; mqttClient.ConnectedAsync += ConnectedHandle; mqttClient.DisconnectedAsync += DisconnectedHandle; //调用异步方法连接到服务端 mqttClient.ConnectAsync(Option).Wait(); //订阅主题 SubTopics(sub_topic); } catch (Exception ex) { // _logger.LogError($"mqttclient connect failed. the cId {CId}, the error: {ex.Message}"); linkedState = false; } } /// /// 订阅主题 /// public async void SubTopics(string topic) { topic = topic.Trim(); if (string.IsNullOrEmpty(topic)) { // _logger.LogWarning("订阅主题不能为空!"); return; } topic = string.Format(topic, wsid); // 判断客户端是否连接 if (mqttClient == null || !mqttClient.IsConnected) { //_logger.LogWarning("MQTT 客户端尚未连接!"); return; } // 设置订阅参数 var subscribeOptions = new MqttClientSubscribeOptionsBuilder() .WithTopicFilter(topic, MqttQualityOfServiceLevel.ExactlyOnce) .Build(); try { // 订阅 await mqttClient.SubscribeAsync( subscribeOptions, System.Threading.CancellationToken.None); // _logger.LogInformation("监听主题:" + topic); Console.WriteLine("监听主题:" + topic); } catch (Exception e) { // _logger.LogError($"消息监听失败.监听主题:{topic}, 失败原因:{e.Message}"); Console.WriteLine($"消息监听失败.监听主题:{topic}, 失败原因:{e.Message}"); } } /// /// 处理接收到的消息 /// /// /// private Task ReceivedHandle(MqttApplicationMessageReceivedEventArgs arg) { ///收到消息 string content = System.Text.Encoding.UTF8.GetString(arg.ApplicationMessage.Payload); //_logger.LogDebug("接收消息:" + content); MsWorkstation ws = _service.GetRequiredService(); if(ws != null) { ws.PushMqMessage(arg.ApplicationMessage.Topic, content); } return Task.CompletedTask; } /// /// 连接丢失处理 /// /// /// private Task DisconnectedHandle(MqttClientDisconnectedEventArgs arg) { //_logger.LogWarning("MQTT发生断线重联!"); Thread.Sleep(2000); reConnect(); return Task.CompletedTask; } public async void reConnect() { lock (obj) { if (reConnTaskStarted) return ; //防止多个线程执行 reConnTaskStarted = true; if (mqttClient != null) { mqttClient.Dispose(); } mqttClient = null; } try { linkedState = false; //重连服务 CreateClient(); reConnTaskStarted = false; } catch { } return ; } /// /// 连接处理 /// /// /// private Task ConnectedHandle(MqttClientConnectedEventArgs arg) { linkedState = true; return Task.CompletedTask; } public bool SendToMqtt(string msg) { return SendToMqtt(send_topic, msg); } public bool SendToMqtt(string topic, string msg) { try { if (mqttClient == null) return false; if (linkedState || mqttClient.IsConnected) { //发布消息 ClientPublish(topic, msg); return true; } else { return false; } } catch { return false; } } /// /// 发送消息 /// /// /// private async void ClientPublish(string topic, string message) { topic = topic.Trim(); message = message.Trim(); if (string.IsNullOrEmpty(topic)) { // _logger.LogWarning("发送主题不能为空!"); return; } // 判断客户端是否连接 if (!linkedState || mqttClient == null || !mqttClient.IsConnected) { //_logger.LogWarning("MQTT 客户端尚未连接!"); return; } // _logger.LogDebug("发送消息:" + message); // 填充消息 var applicationMessage = new MqttApplicationMessageBuilder() .WithTopic(topic) // 主题 .WithPayload(message) // 消息 .WithRetainFlag(false) // 持久化为空,不持久化 .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce) .Build(); try { await mqttClient.PublishAsync(applicationMessage); } catch (Exception) { } } } }