MqttTopicImporter.cs 2.6 KB
Newer Older
1
using System;
麦壳饼's avatar
麦壳饼 已提交
2
using IoTSharp.Handlers;
麦壳饼's avatar
麦壳饼 已提交
3
using IoTSharp.Services;
4 5 6 7 8 9 10 11 12 13
using Microsoft.Extensions.Logging;
using MQTTnet;
using MQTTnet.Client.Options;
using MQTTnet.Extensions.ManagedClient;

namespace IoTSharp.MQTT
{
    public class MqttTopicImporter
    {
        private readonly MqttImportTopicParameters _parameters;
麦壳饼's avatar
麦壳饼 已提交
14
        private readonly MQTTService _mqttService;
15 16 17 18
        private readonly ILogger _logger;

        private IManagedMqttClient _mqttClient;

麦壳饼's avatar
麦壳饼 已提交
19
        public MqttTopicImporter(MqttImportTopicParameters parameters, MQTTService mqttService,  ILogger logger)
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
        {
            _parameters = parameters ?? throw new ArgumentNullException(nameof(parameters));
            _mqttService = mqttService ?? throw new ArgumentNullException(nameof(mqttService));
            _logger = logger ?? throw new ArgumentNullException(nameof(logger));

        }

        public void Start()
        {
            var optionsBuilder = new ManagedMqttClientOptionsBuilder();
            optionsBuilder = optionsBuilder.WithClientOptions(
                o => o
                    .WithTcpServer(_parameters.Server, _parameters.Port)
                    .WithCredentials(_parameters.Username, _parameters.Password)
                    .WithClientId(_parameters.ClientId)
                    .WithTls(new MqttClientOptionsBuilderTlsParameters
                    {
                        UseTls = _parameters.UseTls
                    }));

            if (!string.IsNullOrEmpty(_parameters.ClientId))
            {
                optionsBuilder = optionsBuilder.WithClientOptions(o => o.WithClientId(_parameters.ClientId));
            }

            var options = optionsBuilder.Build();

麦壳饼's avatar
麦壳饼 已提交
47 48 49

            _mqttClient = new MqttFactory().CreateManagedMqttClient();

50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
            _mqttClient.SubscribeAsync(_parameters.Topic, _parameters.QualityOfServiceLevel).GetAwaiter().GetResult();
            _mqttClient.UseApplicationMessageReceivedHandler(e => OnApplicationMessageReceived(e));
            _mqttClient.StartAsync(options).GetAwaiter().GetResult();
        }

        public void Stop()
        {
            _mqttClient?.StopAsync().GetAwaiter().GetResult();
            _mqttClient?.Dispose();
        }

        private void OnApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e)
        {
            _mqttService.Publish(new MqttPublishParameters
            {
                Topic = e.ApplicationMessage.Topic,
                Payload = e.ApplicationMessage.Payload,
                QualityOfServiceLevel = e.ApplicationMessage.QualityOfServiceLevel,
                Retain = e.ApplicationMessage.Retain
            });
        }
    }
}