Improve library scan performance

This commit is contained in:
Mbucari 2023-03-06 16:49:52 -07:00
parent cdb27ef712
commit da36f9414d

View File

@ -2,8 +2,9 @@
using System.Collections.Generic; using System.Collections.Generic;
using System.Globalization; using System.Globalization;
using System.Linq; using System.Linq;
using System.Threading; using System.Threading.Channels;
using System.Threading.Tasks; using System.Threading.Tasks;
using System.Diagnostics;
using AudibleApi; using AudibleApi;
using AudibleApi.Common; using AudibleApi.Common;
using Dinah.Core; using Dinah.Core;
@ -19,6 +20,9 @@ namespace AudibleUtilities
{ {
public Api Api { get; private set; } public Api Api { get; private set; }
private const int MaxConcurrency = 10;
private const int BatchSize = 50;
private ApiExtended(Api api) => Api = api; private ApiExtended(Api api) => Api = api;
/// <summary>Get api from existing tokens else login with 'eager' choice. External browser url is provided. Response can be external browser login or continuing with native api callbacks.</summary> /// <summary>Get api from existing tokens else login with 'eager' choice. External browser url is provided. Response can be external browser login or continuing with native api callbacks.</summary>
@ -85,35 +89,49 @@ namespace AudibleUtilities
private async Task<List<Item>> getItemsAsync(LibraryOptions libraryOptions, bool importEpisodes) private async Task<List<Item>> getItemsAsync(LibraryOptions libraryOptions, bool importEpisodes)
{ {
var items = new List<Item>();
Serilog.Log.Logger.Debug("Beginning library scan."); Serilog.Log.Logger.Debug("Beginning library scan.");
List<Task<List<Item>>> getChildEpisodesTasks = new(); var episodeChannel = Channel.CreateBounded<Task<List<Item>>>(
new BoundedChannelOptions(1)
{
SingleReader = true,
SingleWriter = false,
FullMode = BoundedChannelFullMode.Wait
});
int count = 0, maxConcurrentEpisodeScans = 5; int count = 0;
using SemaphoreSlim concurrencySemaphore = new(maxConcurrentEpisodeScans); List<Item> items = new();
List<Item> seriesItems = new();
await foreach (var item in Api.GetLibraryItemAsyncEnumerable(libraryOptions)) var sw = Stopwatch.StartNew();
await foreach (var item in Api.GetLibraryItemAsyncEnumerable(libraryOptions, BatchSize, MaxConcurrency))
{ {
if ((item.IsEpisodes || item.IsSeriesParent) && importEpisodes) if ((item.IsEpisodes || item.IsSeriesParent) && importEpisodes)
{ seriesItems.Add(item);
//Get child episodes asynchronously and await all at the end
getChildEpisodesTasks.Add(getChildEpisodesAsync(concurrencySemaphore, item));
}
else if (!item.IsEpisodes && !item.IsSeriesParent) else if (!item.IsEpisodes && !item.IsSeriesParent)
items.Add(item); items.Add(item);
count++; count++;
} }
Serilog.Log.Logger.Debug("Library scan complete. Found {count} books and series. Waiting on {getChildEpisodesTasksCount} series episode scans to complete.", count, getChildEpisodesTasks.Count); Serilog.Log.Logger.Debug("Library scan complete. Found {count} books and series. Waiting on series episode scans to complete.", count);
Serilog.Log.Logger.Debug("Beginning episode scan.");
//await and add all episodes from all parents count = 0;
foreach (var epList in await Task.WhenAll(getChildEpisodesTasks)) var episodeDlTask = scanAllSeries(seriesItems, episodeChannel.Writer);
items.AddRange(epList);
Serilog.Log.Logger.Debug("Completed library scan."); await foreach (var ep in getAllEpisodesAsync(episodeChannel.Reader, MaxConcurrency))
{
items.Add(ep);
count++;
}
await episodeDlTask;
sw.Stop();
Serilog.Log.Logger.Debug("Episode scan complete. Found {count} episodes and series.", count);
Serilog.Log.Logger.Debug($"Completed library scan in {sw.Elapsed.TotalMilliseconds:F0} ms.");
#if DEBUG #if DEBUG
//// this will not work for multi accounts //// this will not work for multi accounts
@ -146,165 +164,164 @@ namespace AudibleUtilities
#region episodes and podcasts #region episodes and podcasts
private async Task<List<Item>> getChildEpisodesAsync(SemaphoreSlim concurrencySemaphore, Item parent) private async IAsyncEnumerable<Item> getAllEpisodesAsync(ChannelReader<Task<List<Item>>> allEpisodes, int maxConcurrency)
{ {
await concurrencySemaphore.WaitAsync(); List<Task<List<Item>>> concurentGets = new();
for (int i = 0; i < maxConcurrency && await allEpisodes.WaitToReadAsync(); i++)
concurentGets.Add(await allEpisodes.ReadAsync());
while (concurentGets.Count > 0)
{
var completed = await Task.WhenAny(concurentGets);
concurentGets.Remove(completed);
if (await allEpisodes.WaitToReadAsync())
concurentGets.Add(await allEpisodes.ReadAsync());
foreach (var item in completed.Result)
yield return item;
}
}
private async Task scanAllSeries(IEnumerable<Item> seriesItems, ChannelWriter<Task<List<Item>>> allEpisodes)
{
try try
{ {
Serilog.Log.Logger.Debug("Beginning episode scan for {parent}", parent); List<Task> episodeScanTasks = new();
List<Item> children; foreach (var item in seriesItems)
if (parent.IsEpisodes)
{ {
//The 'parent' is a single episode that was added to the library. if (item.IsEpisodes)
//Get the episode's parent and add it to the database. await allEpisodes.WriteAsync(getEpisodeParentAsync(item));
else if (item.IsSeriesParent)
episodeScanTasks.Add(getParentEpisodesAsync(allEpisodes, item));
}
Serilog.Log.Logger.Debug("Supplied Parent is an episode. Beginning parent scan for {parent}", parent); await Task.WhenAll(episodeScanTasks);
}
finally { allEpisodes.Complete(); }
}
children = new() { parent }; private async Task<List<Item>> getEpisodeParentAsync(Item episode)
{
//Item is a single episode that was added to the library.
//Get the episode's parent and add it to the database.
var parentAsins = parent.Relationships Serilog.Log.Logger.Debug("Supplied Parent is an episode. Beginning parent scan for {parent}", episode);
.Where(r => r.RelationshipToProduct == RelationshipToProduct.Parent)
.Select(p => p.Asin);
var seriesParents = await Api.GetCatalogProductsAsync(parentAsins, CatalogOptions.ResponseGroupOptions.ALL_OPTIONS); List<Item> children = new() { episode };
int numSeriesParents = seriesParents.Count(p => p.IsSeriesParent); var parentAsins = episode.Relationships
if (numSeriesParents != 1) .Where(r => r.RelationshipToProduct == RelationshipToProduct.Parent)
{ .Select(p => p.Asin);
//There should only ever be 1 top-level parent per episode. If not, log
//so we can figure out what to do about those special cases, and don't var seriesParents = await Api.GetCatalogProductsAsync(parentAsins, CatalogOptions.ResponseGroupOptions.ALL_OPTIONS);
//import the episode.
JsonSerializerSettings Settings = new() int numSeriesParents = seriesParents.Count(p => p.IsSeriesParent);
{ if (numSeriesParents != 1)
MetadataPropertyHandling = MetadataPropertyHandling.Ignore, {
DateParseHandling = DateParseHandling.None, //There should only ever be 1 top-level parent per episode. If not, log
Converters = //so we can figure out what to do about those special cases, and don't
{ //import the episode.
new IsoDateTimeConverter { DateTimeStyles = DateTimeStyles.AssumeUniversal } JsonSerializerSettings Settings = new()
}, {
}; MetadataPropertyHandling = MetadataPropertyHandling.Ignore,
Serilog.Log.Logger.Error($"Found {numSeriesParents} parents for {parent.Asin}\r\nEpisode Product:\r\n{JsonConvert.SerializeObject(parent, Formatting.None, Settings)}"); DateParseHandling = DateParseHandling.None,
return new List<Item>(); Converters = {
new IsoDateTimeConverter { DateTimeStyles = DateTimeStyles.AssumeUniversal }
} }
};
Serilog.Log.Logger.Error($"Found {numSeriesParents} parents for {episode.Asin}\r\nEpisode Product:\r\n{JsonConvert.SerializeObject(episode, Formatting.None, Settings)}");
return new();
}
var realParent = seriesParents.Single(p => p.IsSeriesParent); var parent = seriesParents.Single(p => p.IsSeriesParent);
realParent.PurchaseDate = parent.PurchaseDate; parent.PurchaseDate = episode.PurchaseDate;
Serilog.Log.Logger.Debug("Completed parent scan for {parent}", parent); setSeries(parent, children);
parent = realParent; children.Add(parent);
}
else Serilog.Log.Logger.Debug("Completed parent scan for {episode}", episode);
return children;
}
private async Task getParentEpisodesAsync(ChannelWriter<Task<List<Item>>> channel, Item parent)
{
Serilog.Log.Logger.Debug("Beginning episode scan for {parent}", parent);
var episodeIds = parent.Relationships
.Where(r => r.RelationshipToProduct == RelationshipToProduct.Child && r.RelationshipType == RelationshipType.Episode)
.Select(r => r.Asin);
for (int batchNum = 0; episodeIds.Any(); batchNum++)
{
var batch = episodeIds.Take(BatchSize);
await channel.WriteAsync(getEpisodeBatchAsync(batchNum, parent, batch));
episodeIds = episodeIds.Skip(BatchSize);
}
}
private async Task<List<Item>> getEpisodeBatchAsync(int batchNum, Item parent, IEnumerable<string> childrenIds)
{
try
{
List<Item> episodeBatch = await Api.GetCatalogProductsAsync(childrenIds, CatalogOptions.ResponseGroupOptions.ALL_OPTIONS);
setSeries(parent, episodeBatch);
if (batchNum == 0)
episodeBatch.Add(parent);
Serilog.Log.Logger.Debug($"Batch {batchNum}: {episodeBatch.Count} results\t({{parent}})", parent);
return episodeBatch;
}
catch (Exception ex)
{
Serilog.Log.Logger.Error(ex, "Error fetching batch of episodes. {@DebugInfo}", new
{ {
children = await getEpisodeChildrenAsync(parent); ParentId = parent.Asin,
if (!children.Any()) ParentTitle = parent.Title,
return new(); BatchNumber = batchNum,
} ChildIdBatch = childrenIds
});
throw;
}
}
//A series parent will always have exactly 1 Series private static void setSeries(Item parent, IEnumerable<Item> children)
parent.Series = new Series[] {
//A series parent will always have exactly 1 Series
parent.Series = new[]
{
new Series
{
Asin = parent.Asin,
Sequence = "-1",
Title = parent.TitleWithSubtitle
}
};
foreach (var child in children)
{
// use parent's 'DateAdded'. DateAdded is just a convenience prop for: PurchaseDate.UtcDateTime
child.PurchaseDate = parent.PurchaseDate;
// parent is essentially a series
child.Series = new[]
{ {
new Series new Series
{ {
Asin = parent.Asin, Asin = parent.Asin,
Sequence = "-1", // This should properly be Single() not FirstOrDefault(), but FirstOrDefault is defensive for malformed data from audible
Sequence = parent.Relationships.FirstOrDefault(r => r.Asin == child.Asin)?.Sort?.ToString() ?? "0",
Title = parent.TitleWithSubtitle Title = parent.TitleWithSubtitle
} }
}; };
foreach (var child in children)
{
// use parent's 'DateAdded'. DateAdded is just a convenience prop for: PurchaseDate.UtcDateTime
child.PurchaseDate = parent.PurchaseDate;
// parent is essentially a series
child.Series = new Series[]
{
new Series
{
Asin = parent.Asin,
// This should properly be Single() not FirstOrDefault(), but FirstOrDefault is defensive for malformed data from audible
Sequence = parent.Relationships.FirstOrDefault(r => r.Asin == child.Asin)?.Sort?.ToString() ?? "0",
Title = parent.TitleWithSubtitle
}
};
}
children.Add(parent);
Serilog.Log.Logger.Debug("Completed episode scan for {parent}", parent);
return children;
} }
finally
{
concurrencySemaphore.Release();
}
}
private async Task<List<Item>> getEpisodeChildrenAsync(Item parent)
{
var childrenIds = parent.Relationships
.Where(r => r.RelationshipToProduct == RelationshipToProduct.Child && r.RelationshipType == RelationshipType.Episode)
.Select(r => r.Asin)
.ToList();
// fetch children in batches
const int batchSize = 20;
var results = new List<Item>();
for (var i = 1; ; i++)
{
var idBatch = childrenIds.Skip((i - 1) * batchSize).Take(batchSize).ToList();
if (!idBatch.Any())
break;
List<Item> childrenBatch;
try
{
childrenBatch = await Api.GetCatalogProductsAsync(idBatch, CatalogOptions.ResponseGroupOptions.ALL_OPTIONS);
#if DEBUG
//var childrenBatchDebug = childrenBatch.Select(i => i.ToJson()).Aggregate((a, b) => $"{a}\r\n\r\n{b}");
//System.IO.File.WriteAllText($"children of {parent.Asin}.json", childrenBatchDebug);
#endif
}
catch (Exception ex)
{
Serilog.Log.Logger.Error(ex, "Error fetching batch of episodes. {@DebugInfo}", new
{
ParentId = parent.Asin,
ParentTitle = parent.Title,
BatchNumber = i,
ChildIdBatch = idBatch
});
throw;
}
Serilog.Log.Logger.Debug($"Batch {i}: {childrenBatch.Count} results\t({{parent}})", parent);
// the service returned no results. probably indicates an error. stop running batches
if (!childrenBatch.Any())
break;
results.AddRange(childrenBatch);
}
Serilog.Log.Logger.Debug("Parent episodes/podcasts series. Children found. {@DebugInfo}", new
{
ParentId = parent.Asin,
ParentTitle = parent.Title,
ChildCount = childrenIds.Count
});
if (childrenIds.Count != results.Count)
{
var ex = new ApplicationException($"Mis-match: Children defined by parent={childrenIds.Count}. Children returned by batches={results.Count}");
Serilog.Log.Logger.Error(ex, "{parent} - Quantity of series episodes defined by parent does not match quantity returned by batch fetching.", parent);
throw ex;
}
return results;
} }
#endregion #endregion
} }