Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync AWS Things with portal #2133

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,16 @@ public AWSDeviceThingProfile()
.ForMember(dest => dest.ThingName, opts => opts.MapFrom(src => src.DeviceName))
.ForMember(dest => dest.Payload, opts => opts.MapFrom(src => EmptyPayload()))
.ReverseMap();

_ = CreateMap<DescribeThingResponse, Device>()
.ForMember(dest => dest.Id, opts => opts.MapFrom(src => src.ThingId))
.ForMember(dest => dest.Name, opts => opts.MapFrom(src => src.ThingName))
.ForMember(dest => dest.Version, opts => opts.MapFrom(src => src.Version))
.ForMember(dest => dest.Tags, opts => opts.MapFrom(src => src.Attributes.Select(att => new DeviceTagValue
{
Name = att.Key,
Value = att.Value
})));
}

private static MemoryStream EmptyPayload()
Expand Down
3 changes: 2 additions & 1 deletion src/AzureIoTHub.Portal.Domain/Entities/Device.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

namespace AzureIoTHub.Portal.Domain.Entities
{
using System.Collections.ObjectModel;
using AzureIoTHub.Portal.Domain.Base;

public class Device : EntityBase
Expand Down Expand Up @@ -50,6 +51,6 @@ public class Device : EntityBase
/// <summary>
/// List of custom device tags and their values.
/// </summary>
public ICollection<DeviceTagValue> Tags { get; set; } = default!;
public ICollection<DeviceTagValue> Tags { get; set; } = new Collection<DeviceTagValue>();
}
}
185 changes: 185 additions & 0 deletions src/AzureIoTHub.Portal.Infrastructure/Jobs/AWS/SyncThingsJob.cs
delager marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
// Copyright (c) CGI France. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace AzureIoTHub.Portal.Infrastructure.Jobs.AWS
{
using System.Linq;
using System.Net;
using System.Threading.Tasks;
using Amazon.IoT;
using Amazon.IoT.Model;
using Amazon.IotData;
using Amazon.IotData.Model;
using AutoMapper;
using AzureIoTHub.Portal.Domain;
using AzureIoTHub.Portal.Domain.Entities;
using AzureIoTHub.Portal.Domain.Repositories;
using Microsoft.Extensions.Logging;
using Quartz;
using Quartz.Util;

[DisallowConcurrentExecution]
public class SyncThingsJob : IJob
{

private readonly ILogger<SyncThingsJob> logger;
private readonly IMapper mapper;
private readonly IUnitOfWork unitOfWork;
private readonly IDeviceRepository deviceRepository;
private readonly IDeviceModelRepository deviceModelRepository;
private readonly IDeviceTagValueRepository deviceTagValueRepository;
private readonly IAmazonIoT amazonIoTClient;
private readonly IAmazonIotData amazonIoTDataClient;

public SyncThingsJob(
ILogger<SyncThingsJob> logger,
IMapper mapper,
IUnitOfWork unitOfWork,
IDeviceRepository deviceRepository,
IDeviceModelRepository deviceModelRepository,
IDeviceTagValueRepository deviceTagValueRepository,
IAmazonIoT amazonIoTClient,
IAmazonIotData amazonIoTDataClient)
{
this.mapper = mapper;
this.unitOfWork = unitOfWork;
this.deviceRepository = deviceRepository;
this.deviceModelRepository = deviceModelRepository;
this.deviceTagValueRepository = deviceTagValueRepository;
this.amazonIoTClient = amazonIoTClient;
this.amazonIoTDataClient = amazonIoTDataClient;
this.logger = logger;
}


public async Task Execute(IJobExecutionContext context)
{
try
{
this.logger.LogInformation("Start of sync Things job");

await SyncThingsAsDevices();

this.logger.LogInformation("End of sync Things job");
}
catch (Exception e)
{
this.logger.LogError(e, "Sync Things job has failed");
}
Comment on lines +65 to +68

Check notice

Code scanning / CodeQL

Generic catch clause

Generic catch clause.
}

private async Task SyncThingsAsDevices()
{
var things = await GetAllThings();

foreach (var thing in things)
{
//Thing error
if (thing.HttpStatusCode != HttpStatusCode.OK)
{
this.logger.LogWarning($"Cannot import device '{thing.ThingName}' due to an error in the Amazon IoT API : {thing.HttpStatusCode}");
continue;
}

//ThingType not specified
if (thing.ThingTypeName.IsNullOrWhiteSpace())
{
this.logger.LogInformation($"Cannot import device '{thing.ThingName}' since it doesn't have related thing type.");
continue;
}

//ThingType not find in DB
var deviceModel = this.deviceModelRepository.GetByName(thing.ThingTypeName);
if (deviceModel == null)
{
this.logger.LogWarning($"Cannot import device '{thing.ThingName}'. The ThingType '{thing.ThingTypeName}' doesn't exist");
continue;
}

//ThingShadow not specified
var thingShadowRequest = new GetThingShadowRequest()
{
ThingName = thing.ThingName
};
try
{
var thingShadow = await this.amazonIoTDataClient.GetThingShadowAsync(thingShadowRequest);
if (thingShadow.HttpStatusCode != HttpStatusCode.OK)
{
if (thingShadow.HttpStatusCode.Equals(HttpStatusCode.NotFound))
this.logger.LogInformation($"Cannot import device '{thing.ThingName}' since it doesn't have related classic thing shadow");
else
this.logger.LogWarning($"Cannot import device '{thing.ThingName}' due to an error retrieving thing shadow in the Amazon IoT API : {thingShadow.HttpStatusCode}");
continue;
}
}
catch (AmazonIotDataException e)
{
this.logger.LogWarning($"Cannot import device '{thing.ThingName}' due to an error retrieving thing shadow in the Amazon IoT Data API.", e);
continue;
}

//Create or update the thing
await CreateOrUpdateThing(thing, deviceModel);
}

foreach (var item in (await this.deviceRepository.GetAllAsync(
device => !things.Select(x => x.ThingId).Contains(device.Id),
default,
d => d.Tags,
d => d.Labels
)))
{
this.deviceRepository.Delete(item.Id);
}

await this.unitOfWork.SaveAsync();
}

private async Task<List<DescribeThingResponse>> GetAllThings()
{
var things = new List<DescribeThingResponse>();

var response = await amazonIoTClient.ListThingsAsync();

foreach (var requestDescribeThing in response.Things.Select(thing => new DescribeThingRequest { ThingName = thing.ThingName }))
{
try
{
things.Add(await this.amazonIoTClient.DescribeThingAsync(requestDescribeThing));
}
catch (AmazonIoTException e)
{
this.logger.LogWarning($"Cannot import device '{requestDescribeThing.ThingName}' due to an error in the Amazon IoT API.", e);
continue;
}
}

return things;
}

private async Task CreateOrUpdateThing(DescribeThingResponse thing, DeviceModel deviceModel)
{
var device = this.mapper.Map<Device>(thing);
var deviceEntity = await this.deviceRepository.GetByIdAsync(device.Id, d => d.Tags);
device.DeviceModelId = deviceModel.Id;

if (deviceEntity == null)
{
await this.deviceRepository.InsertAsync(device);
}
else
{
if (deviceEntity.Version >= device.Version) return;

foreach (var deviceTagEntity in deviceEntity.Tags)
{
this.deviceTagValueRepository.Delete(deviceTagEntity.Id);
}

_ = this.mapper.Map(device, deviceEntity);
this.deviceRepository.Update(deviceEntity);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,15 @@ private static IServiceCollection ConfigureAWSSyncJobs(this IServiceCollection s
.AddTrigger(t => t
.WithIdentity($"{nameof(SyncThingTypesJob)}")
.ForJob(nameof(SyncThingTypesJob))
.WithSimpleSchedule(s => s
.WithSimpleSchedule(s => s
.WithIntervalInMinutes(configuration.SyncDatabaseJobRefreshIntervalInMinutes)
.RepeatForever()));

_ = q.AddJob<SyncThingsJob>(j => j.WithIdentity(nameof(SyncThingsJob)))
.AddTrigger(t => t
.WithIdentity($"{nameof(SyncThingsJob)}")
.ForJob(nameof(SyncThingsJob))
.WithSimpleSchedule(s => s
.WithIntervalInMinutes(configuration.SyncDatabaseJobRefreshIntervalInMinutes)
.RepeatForever()));

Expand Down
Loading