Improve library scan performance

This commit is contained in:
MBucari 2023-03-06 18:39:48 -07:00 committed by Mbucari
parent 4140722a6d
commit bd49db83e4

View File

@ -91,20 +91,13 @@ namespace AudibleUtilities
{ {
Serilog.Log.Logger.Debug("Beginning library scan."); Serilog.Log.Logger.Debug("Beginning library scan.");
var episodeChannel = Channel.CreateBounded<Task<List<Item>>>(
new BoundedChannelOptions(1)
{
SingleReader = true,
SingleWriter = false,
FullMode = BoundedChannelFullMode.Wait
});
int count = 0; int count = 0;
List<Item> items = new(); List<Item> items = new();
List<Item> seriesItems = new(); List<Item> seriesItems = new();
var sw = Stopwatch.StartNew(); var sw = Stopwatch.StartNew();
//Scan the library for all added books, and add ay episode-type items to seriesItems to be scanned for episodes/parents
await foreach (var item in Api.GetLibraryItemAsyncEnumerable(libraryOptions, BatchSize, MaxConcurrency)) await foreach (var item in Api.GetLibraryItemAsyncEnumerable(libraryOptions, BatchSize, MaxConcurrency))
{ {
if ((item.IsEpisodes || item.IsSeriesParent) && importEpisodes) if ((item.IsEpisodes || item.IsSeriesParent) && importEpisodes)
@ -119,15 +112,24 @@ namespace AudibleUtilities
Serilog.Log.Logger.Debug("Beginning episode scan."); Serilog.Log.Logger.Debug("Beginning episode scan.");
count = 0; count = 0;
var episodeDlTask = scanAllSeries(seriesItems, episodeChannel.Writer);
await foreach (var ep in getAllEpisodesAsync(episodeChannel.Reader, MaxConcurrency)) //'get' Tasks are activated when they are written to the channel. To avoid more concurrency than is desired, the
//channel is bounded with a capacity of 1. Channel write operations are blocked until the current item is read
var episodeChannel = Channel.CreateBounded<Task<List<Item>>>(new BoundedChannelOptions(1) { SingleReader = true });
//Start scanning for all episodes. Episode batch 'get' Tasks are written to the channel.
var scanAllSeriesTask = scanAllSeries(seriesItems, episodeChannel.Writer);
//Read all episodes from the channel and add them to the import items.
//This method blocks until episodeChannel.Writer is closed by scanAllSeries()
await foreach (var ep in getAllEpisodesAsync(episodeChannel.Reader))
{ {
items.Add(ep); items.Add(ep);
count++; count++;
} }
await episodeDlTask; //Be sure to await the scanAllSeries Task so that any exceptions are thrown
await scanAllSeriesTask;
sw.Stop(); sw.Stop();
Serilog.Log.Logger.Debug("Episode scan complete. Found {count} episodes and series.", count); Serilog.Log.Logger.Debug("Episode scan complete. Found {count} episodes and series.", count);
@ -164,27 +166,37 @@ namespace AudibleUtilities
#region episodes and podcasts #region episodes and podcasts
private async IAsyncEnumerable<Item> getAllEpisodesAsync(ChannelReader<Task<List<Item>>> allEpisodes, int maxConcurrency) /// <summary>
/// Read get tasks from the <paramref name="channel"/> and await results. This method maintains
/// a list of up to <see cref="MaxConcurrency"/> get tasks. When any of the get tasks completes,
/// the Items are yielded, that task is removed from the list, and a new get task is read from
/// the channel.
/// </summary>
private async IAsyncEnumerable<Item> getAllEpisodesAsync(ChannelReader<Task<List<Item>>> channel)
{ {
List<Task<List<Item>>> concurentGets = new(); List<Task<List<Item>>> concurentGets = new();
for (int i = 0; i < maxConcurrency && await allEpisodes.WaitToReadAsync(); i++) for (int i = 0; i < MaxConcurrency && await channel.WaitToReadAsync(); i++)
concurentGets.Add(await allEpisodes.ReadAsync()); concurentGets.Add(await channel.ReadAsync());
while (concurentGets.Count > 0) while (concurentGets.Count > 0)
{ {
var completed = await Task.WhenAny(concurentGets); var completed = await Task.WhenAny(concurentGets);
concurentGets.Remove(completed); concurentGets.Remove(completed);
if (await allEpisodes.WaitToReadAsync()) if (await channel.WaitToReadAsync())
concurentGets.Add(await allEpisodes.ReadAsync()); concurentGets.Add(await channel.ReadAsync());
foreach (var item in completed.Result) foreach (var item in completed.Result)
yield return item; yield return item;
} }
} }
private async Task scanAllSeries(IEnumerable<Item> seriesItems, ChannelWriter<Task<List<Item>>> allEpisodes) /// <summary>
/// Gets all child episodes and episode parents belonging to <paramref name="seriesItems"/> in batches and
/// writes the get tasks to <paramref name="channel"/>.
/// </summary>
private async Task scanAllSeries(IEnumerable<Item> seriesItems, ChannelWriter<Task<List<Item>>> channel)
{ {
try try
{ {
@ -193,14 +205,15 @@ namespace AudibleUtilities
foreach (var item in seriesItems) foreach (var item in seriesItems)
{ {
if (item.IsEpisodes) if (item.IsEpisodes)
await allEpisodes.WriteAsync(getEpisodeParentAsync(item)); await channel.WriteAsync(getEpisodeParentAsync(item));
else if (item.IsSeriesParent) else if (item.IsSeriesParent)
episodeScanTasks.Add(getParentEpisodesAsync(allEpisodes, item)); episodeScanTasks.Add(getParentEpisodesAsync(item, channel));
} }
//episodeScanTasks complete only after all episode batch 'gets' have been written to the channel
await Task.WhenAll(episodeScanTasks); await Task.WhenAll(episodeScanTasks);
} }
finally { allEpisodes.Complete(); } finally { channel.Complete(); }
} }
private async Task<List<Item>> getEpisodeParentAsync(Item episode) private async Task<List<Item>> getEpisodeParentAsync(Item episode)
@ -247,7 +260,11 @@ namespace AudibleUtilities
return children; return children;
} }
private async Task getParentEpisodesAsync(ChannelWriter<Task<List<Item>>> channel, Item parent) /// <summary>
/// Gets all episodes belonging to <paramref name="parent"/> in batches of <see cref="BatchSize"/> and writes the batch get tasks to <paramref name="channel"/>
/// This method only completes after all episode batch 'gets' have been written to the channel
/// </summary>
private async Task getParentEpisodesAsync(Item parent, ChannelWriter<Task<List<Item>>> channel)
{ {
Serilog.Log.Logger.Debug("Beginning episode scan for {parent}", parent); Serilog.Log.Logger.Debug("Beginning episode scan for {parent}", parent);