using AmrControl.workstation;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using Newtonsoft.Json;
using SixLabors.ImageSharp.ColorSpaces.Companding;
namespace AmrControl.Clients
{
///
/// 来回读配置文件会导致内存异常,这是目前dotnet的问题,因此不要直接引用配置文件的内容
///
public class MqttClient
{
///
/// client客户端
///
private IMqttClient mqttClient;
///
/// 连接状态
///
private bool linkedState = false;
///
/// 配置信息
///
private readonly IConfiguration _configuration;
///
/// 服务获取
///
private readonly IServiceProvider _service;
// protected readonly Microsoft.Extensions.Logging.ILogger _logger;
private bool reConnTaskStarted = false;
private object obj = new object();
string wsid, send_topic, sub_topic;
///
/// 构造函数
///
///
///
public MqttClient(IConfiguration configuration, IServiceProvider service)
{
this._configuration = configuration;
this._service = service;
wsid = _configuration["Mqtt:id"]; //用户标识ID
send_topic = _configuration["Mqtt:send-topic"]; //用户标识ID
sub_topic = _configuration["Mqtt:sub-topic"]; //用户标识ID
}
///
/// 构建客户端
///
public void CreateClient()
{
try
{
if (linkedState && mqttClient != null && mqttClient.IsConnected)
{
//启动并且连接成功的服务不再考虑进行连接
return;
}
if(mqttClient != null)
{
mqttClient.Dispose();
}
MqttClientOptions Option = new MqttClientOptionsBuilder().WithTcpServer(_configuration["Mqtt:url"], int.Parse(_configuration["Mqtt:port"]))
.WithClientId(wsid) //客户端标识Id要唯一。
//.WithCredentials(_configuration["Mqtt:username"], _configuration["Mqtt:password"]) //用户名,密码
.WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V311)
.WithCleanSession(true)
.WithWillQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.AtMostOnce)
.WithTimeout(new TimeSpan(0, 0, 10))
.WithKeepAlivePeriod(TimeSpan.FromSeconds(20))
.Build();
mqttClient = new MqttFactory().CreateMqttClient();
mqttClient.ApplicationMessageReceivedAsync += ReceivedHandle;
mqttClient.ConnectedAsync += ConnectedHandle;
mqttClient.DisconnectedAsync += DisconnectedHandle;
//调用异步方法连接到服务端
mqttClient.ConnectAsync(Option).Wait();
//订阅主题
SubTopics(sub_topic);
}
catch (Exception ex)
{
// _logger.LogError($"mqttclient connect failed. the cId {CId}, the error: {ex.Message}");
linkedState = false;
}
}
///
/// 订阅主题
///
public async void SubTopics(string topic)
{
topic = topic.Trim();
if (string.IsNullOrEmpty(topic))
{
// _logger.LogWarning("订阅主题不能为空!");
return;
}
topic = string.Format(topic, wsid);
// 判断客户端是否连接
if (mqttClient == null || !mqttClient.IsConnected)
{
//_logger.LogWarning("MQTT 客户端尚未连接!");
return;
}
// 设置订阅参数
var subscribeOptions = new MqttClientSubscribeOptionsBuilder()
.WithTopicFilter(topic, MqttQualityOfServiceLevel.ExactlyOnce)
.Build();
try
{
// 订阅
await mqttClient.SubscribeAsync(
subscribeOptions,
System.Threading.CancellationToken.None);
// _logger.LogInformation("监听主题:" + topic);
Console.WriteLine("监听主题:" + topic);
}
catch (Exception e)
{
// _logger.LogError($"消息监听失败.监听主题:{topic}, 失败原因:{e.Message}");
Console.WriteLine($"消息监听失败.监听主题:{topic}, 失败原因:{e.Message}");
}
}
///
/// 处理接收到的消息
///
///
///
private Task ReceivedHandle(MqttApplicationMessageReceivedEventArgs arg)
{
///收到消息
string content = System.Text.Encoding.UTF8.GetString(arg.ApplicationMessage.Payload);
//_logger.LogDebug("接收消息:" + content);
MsWorkstation ws = _service.GetRequiredService();
if(ws != null)
{
ws.PushMqMessage(arg.ApplicationMessage.Topic, content);
}
return Task.CompletedTask;
}
///
/// 连接丢失处理
///
///
///
private Task DisconnectedHandle(MqttClientDisconnectedEventArgs arg)
{
//_logger.LogWarning("MQTT发生断线重联!");
Thread.Sleep(2000);
reConnect();
return Task.CompletedTask;
}
public async void reConnect()
{
lock (obj)
{
if (reConnTaskStarted)
return ;
//防止多个线程执行
reConnTaskStarted = true;
if (mqttClient != null)
{
mqttClient.Dispose();
}
mqttClient = null;
}
try
{
linkedState = false;
//重连服务
CreateClient();
reConnTaskStarted = false;
}
catch
{
}
return ;
}
///
/// 连接处理
///
///
///
private Task ConnectedHandle(MqttClientConnectedEventArgs arg)
{
linkedState = true;
return Task.CompletedTask;
}
public bool SendToMqtt(string msg)
{
return SendToMqtt(send_topic, msg);
}
public bool SendToMqtt(string topic, string msg)
{
try
{
if (mqttClient == null)
return false;
if (linkedState || mqttClient.IsConnected)
{
//发布消息
ClientPublish(topic, msg);
return true;
}
else
{
return false;
}
}
catch
{
return false;
}
}
///
/// 发送消息
///
///
///
private async void ClientPublish(string topic, string message)
{
topic = topic.Trim();
message = message.Trim();
if (string.IsNullOrEmpty(topic))
{
// _logger.LogWarning("发送主题不能为空!");
return;
}
// 判断客户端是否连接
if (!linkedState || mqttClient == null || !mqttClient.IsConnected)
{
//_logger.LogWarning("MQTT 客户端尚未连接!");
return;
}
// _logger.LogDebug("发送消息:" + message);
// 填充消息
var applicationMessage = new MqttApplicationMessageBuilder()
.WithTopic(topic) // 主题
.WithPayload(message) // 消息
.WithRetainFlag(false) // 持久化为空,不持久化
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce)
.Build();
try
{
await mqttClient.PublishAsync(applicationMessage);
}
catch (Exception)
{
}
}
}
}