diff --git a/Source/AudibleUtilities/ApiExtended.cs b/Source/AudibleUtilities/ApiExtended.cs
index 376bde2c..52585b00 100644
--- a/Source/AudibleUtilities/ApiExtended.cs
+++ b/Source/AudibleUtilities/ApiExtended.cs
@@ -2,8 +2,9 @@
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
-using System.Threading;
+using System.Threading.Channels;
using System.Threading.Tasks;
+using System.Diagnostics;
using AudibleApi;
using AudibleApi.Common;
using Dinah.Core;
@@ -19,6 +20,9 @@ namespace AudibleUtilities
{
public Api Api { get; private set; }
+ private const int MaxConcurrency = 10;
+ private const int BatchSize = 50;
+
private ApiExtended(Api api) => Api = api;
/// Get api from existing tokens else login with 'eager' choice. External browser url is provided. Response can be external browser login or continuing with native api callbacks.
@@ -85,35 +89,49 @@ namespace AudibleUtilities
private async Task> getItemsAsync(LibraryOptions libraryOptions, bool importEpisodes)
{
- var items = new List- ();
-
Serilog.Log.Logger.Debug("Beginning library scan.");
- List>> getChildEpisodesTasks = new();
+ var episodeChannel = Channel.CreateBounded>>(
+ new BoundedChannelOptions(1)
+ {
+ SingleReader = true,
+ SingleWriter = false,
+ FullMode = BoundedChannelFullMode.Wait
+ });
- int count = 0, maxConcurrentEpisodeScans = 5;
- using SemaphoreSlim concurrencySemaphore = new(maxConcurrentEpisodeScans);
+ int count = 0;
+ List
- items = new();
+ List
- seriesItems = new();
- await foreach (var item in Api.GetLibraryItemAsyncEnumerable(libraryOptions))
+ var sw = Stopwatch.StartNew();
+
+ await foreach (var item in Api.GetLibraryItemAsyncEnumerable(libraryOptions, BatchSize, MaxConcurrency))
{
if ((item.IsEpisodes || item.IsSeriesParent) && importEpisodes)
- {
- //Get child episodes asynchronously and await all at the end
- getChildEpisodesTasks.Add(getChildEpisodesAsync(concurrencySemaphore, item));
- }
+ seriesItems.Add(item);
else if (!item.IsEpisodes && !item.IsSeriesParent)
items.Add(item);
count++;
}
- Serilog.Log.Logger.Debug("Library scan complete. Found {count} books and series. Waiting on {getChildEpisodesTasksCount} series episode scans to complete.", count, getChildEpisodesTasks.Count);
+ Serilog.Log.Logger.Debug("Library scan complete. Found {count} books and series. Waiting on series episode scans to complete.", count);
+ Serilog.Log.Logger.Debug("Beginning episode scan.");
- //await and add all episodes from all parents
- foreach (var epList in await Task.WhenAll(getChildEpisodesTasks))
- items.AddRange(epList);
+ count = 0;
+ var episodeDlTask = scanAllSeries(seriesItems, episodeChannel.Writer);
- Serilog.Log.Logger.Debug("Completed library scan.");
+ await foreach (var ep in getAllEpisodesAsync(episodeChannel.Reader, MaxConcurrency))
+ {
+ items.Add(ep);
+ count++;
+ }
+
+ await episodeDlTask;
+
+ sw.Stop();
+ Serilog.Log.Logger.Debug("Episode scan complete. Found {count} episodes and series.", count);
+ Serilog.Log.Logger.Debug($"Completed library scan in {sw.Elapsed.TotalMilliseconds:F0} ms.");
#if DEBUG
//// this will not work for multi accounts
@@ -146,165 +164,164 @@ namespace AudibleUtilities
#region episodes and podcasts
- private async Task
> getChildEpisodesAsync(SemaphoreSlim concurrencySemaphore, Item parent)
+ private async IAsyncEnumerable- getAllEpisodesAsync(ChannelReader>> allEpisodes, int maxConcurrency)
{
- await concurrencySemaphore.WaitAsync();
+ List>> concurentGets = new();
+ for (int i = 0; i < maxConcurrency && await allEpisodes.WaitToReadAsync(); i++)
+ concurentGets.Add(await allEpisodes.ReadAsync());
+
+ while (concurentGets.Count > 0)
+ {
+ var completed = await Task.WhenAny(concurentGets);
+ concurentGets.Remove(completed);
+
+ if (await allEpisodes.WaitToReadAsync())
+ concurentGets.Add(await allEpisodes.ReadAsync());
+
+ foreach (var item in completed.Result)
+ yield return item;
+ }
+ }
+
+ private async Task scanAllSeries(IEnumerable
- seriesItems, ChannelWriter>> allEpisodes)
+ {
try
{
- Serilog.Log.Logger.Debug("Beginning episode scan for {parent}", parent);
+ List episodeScanTasks = new();
- List
- children;
-
- if (parent.IsEpisodes)
+ foreach (var item in seriesItems)
{
- //The 'parent' is a single episode that was added to the library.
- //Get the episode's parent and add it to the database.
+ if (item.IsEpisodes)
+ await allEpisodes.WriteAsync(getEpisodeParentAsync(item));
+ else if (item.IsSeriesParent)
+ episodeScanTasks.Add(getParentEpisodesAsync(allEpisodes, item));
+ }
- Serilog.Log.Logger.Debug("Supplied Parent is an episode. Beginning parent scan for {parent}", parent);
+ await Task.WhenAll(episodeScanTasks);
+ }
+ finally { allEpisodes.Complete(); }
+ }
- children = new() { parent };
+ private async Task
> getEpisodeParentAsync(Item episode)
+ {
+ //Item is a single episode that was added to the library.
+ //Get the episode's parent and add it to the database.
- var parentAsins = parent.Relationships
- .Where(r => r.RelationshipToProduct == RelationshipToProduct.Parent)
- .Select(p => p.Asin);
+ Serilog.Log.Logger.Debug("Supplied Parent is an episode. Beginning parent scan for {parent}", episode);
- var seriesParents = await Api.GetCatalogProductsAsync(parentAsins, CatalogOptions.ResponseGroupOptions.ALL_OPTIONS);
+ List- children = new() { episode };
- int numSeriesParents = seriesParents.Count(p => p.IsSeriesParent);
- if (numSeriesParents != 1)
- {
- //There should only ever be 1 top-level parent per episode. If not, log
- //so we can figure out what to do about those special cases, and don't
- //import the episode.
- JsonSerializerSettings Settings = new()
- {
- MetadataPropertyHandling = MetadataPropertyHandling.Ignore,
- DateParseHandling = DateParseHandling.None,
- Converters =
- {
- new IsoDateTimeConverter { DateTimeStyles = DateTimeStyles.AssumeUniversal }
- },
- };
- Serilog.Log.Logger.Error($"Found {numSeriesParents} parents for {parent.Asin}\r\nEpisode Product:\r\n{JsonConvert.SerializeObject(parent, Formatting.None, Settings)}");
- return new List
- ();
+ var parentAsins = episode.Relationships
+ .Where(r => r.RelationshipToProduct == RelationshipToProduct.Parent)
+ .Select(p => p.Asin);
+
+ var seriesParents = await Api.GetCatalogProductsAsync(parentAsins, CatalogOptions.ResponseGroupOptions.ALL_OPTIONS);
+
+ int numSeriesParents = seriesParents.Count(p => p.IsSeriesParent);
+ if (numSeriesParents != 1)
+ {
+ //There should only ever be 1 top-level parent per episode. If not, log
+ //so we can figure out what to do about those special cases, and don't
+ //import the episode.
+ JsonSerializerSettings Settings = new()
+ {
+ MetadataPropertyHandling = MetadataPropertyHandling.Ignore,
+ DateParseHandling = DateParseHandling.None,
+ Converters = {
+ new IsoDateTimeConverter { DateTimeStyles = DateTimeStyles.AssumeUniversal }
}
+ };
+ Serilog.Log.Logger.Error($"Found {numSeriesParents} parents for {episode.Asin}\r\nEpisode Product:\r\n{JsonConvert.SerializeObject(episode, Formatting.None, Settings)}");
+ return new();
+ }
- var realParent = seriesParents.Single(p => p.IsSeriesParent);
- realParent.PurchaseDate = parent.PurchaseDate;
+ var parent = seriesParents.Single(p => p.IsSeriesParent);
+ parent.PurchaseDate = episode.PurchaseDate;
- Serilog.Log.Logger.Debug("Completed parent scan for {parent}", parent);
- parent = realParent;
- }
- else
+ setSeries(parent, children);
+ children.Add(parent);
+
+ Serilog.Log.Logger.Debug("Completed parent scan for {episode}", episode);
+
+ return children;
+ }
+
+ private async Task getParentEpisodesAsync(ChannelWriter>> channel, Item parent)
+ {
+ Serilog.Log.Logger.Debug("Beginning episode scan for {parent}", parent);
+
+ var episodeIds = parent.Relationships
+ .Where(r => r.RelationshipToProduct == RelationshipToProduct.Child && r.RelationshipType == RelationshipType.Episode)
+ .Select(r => r.Asin);
+
+ for (int batchNum = 0; episodeIds.Any(); batchNum++)
+ {
+ var batch = episodeIds.Take(BatchSize);
+
+ await channel.WriteAsync(getEpisodeBatchAsync(batchNum, parent, batch));
+
+ episodeIds = episodeIds.Skip(BatchSize);
+ }
+ }
+
+ private async Task
> getEpisodeBatchAsync(int batchNum, Item parent, IEnumerable childrenIds)
+ {
+ try
+ {
+ List- episodeBatch = await Api.GetCatalogProductsAsync(childrenIds, CatalogOptions.ResponseGroupOptions.ALL_OPTIONS);
+
+ setSeries(parent, episodeBatch);
+
+ if (batchNum == 0)
+ episodeBatch.Add(parent);
+
+ Serilog.Log.Logger.Debug($"Batch {batchNum}: {episodeBatch.Count} results\t({{parent}})", parent);
+
+ return episodeBatch;
+ }
+ catch (Exception ex)
+ {
+ Serilog.Log.Logger.Error(ex, "Error fetching batch of episodes. {@DebugInfo}", new
{
- children = await getEpisodeChildrenAsync(parent);
- if (!children.Any())
- return new();
- }
+ ParentId = parent.Asin,
+ ParentTitle = parent.Title,
+ BatchNumber = batchNum,
+ ChildIdBatch = childrenIds
+ });
+ throw;
+ }
+ }
- //A series parent will always have exactly 1 Series
- parent.Series = new Series[]
+ private static void setSeries(Item parent, IEnumerable
- children)
+ {
+ //A series parent will always have exactly 1 Series
+ parent.Series = new[]
+ {
+ new Series
+ {
+ Asin = parent.Asin,
+ Sequence = "-1",
+ Title = parent.TitleWithSubtitle
+ }
+ };
+
+ foreach (var child in children)
+ {
+ // use parent's 'DateAdded'. DateAdded is just a convenience prop for: PurchaseDate.UtcDateTime
+ child.PurchaseDate = parent.PurchaseDate;
+ // parent is essentially a series
+ child.Series = new[]
{
new Series
{
Asin = parent.Asin,
- Sequence = "-1",
+ // This should properly be Single() not FirstOrDefault(), but FirstOrDefault is defensive for malformed data from audible
+ Sequence = parent.Relationships.FirstOrDefault(r => r.Asin == child.Asin)?.Sort?.ToString() ?? "0",
Title = parent.TitleWithSubtitle
}
};
-
- foreach (var child in children)
- {
- // use parent's 'DateAdded'. DateAdded is just a convenience prop for: PurchaseDate.UtcDateTime
- child.PurchaseDate = parent.PurchaseDate;
- // parent is essentially a series
- child.Series = new Series[]
- {
- new Series
- {
- Asin = parent.Asin,
- // This should properly be Single() not FirstOrDefault(), but FirstOrDefault is defensive for malformed data from audible
- Sequence = parent.Relationships.FirstOrDefault(r => r.Asin == child.Asin)?.Sort?.ToString() ?? "0",
- Title = parent.TitleWithSubtitle
- }
- };
- }
-
- children.Add(parent);
-
- Serilog.Log.Logger.Debug("Completed episode scan for {parent}", parent);
-
- return children;
}
- finally
- {
- concurrencySemaphore.Release();
- }
- }
-
- private async Task
> getEpisodeChildrenAsync(Item parent)
- {
- var childrenIds = parent.Relationships
- .Where(r => r.RelationshipToProduct == RelationshipToProduct.Child && r.RelationshipType == RelationshipType.Episode)
- .Select(r => r.Asin)
- .ToList();
-
- // fetch children in batches
- const int batchSize = 20;
-
- var results = new List- ();
-
- for (var i = 1; ; i++)
- {
- var idBatch = childrenIds.Skip((i - 1) * batchSize).Take(batchSize).ToList();
- if (!idBatch.Any())
- break;
-
- List
- childrenBatch;
- try
- {
- childrenBatch = await Api.GetCatalogProductsAsync(idBatch, CatalogOptions.ResponseGroupOptions.ALL_OPTIONS);
-#if DEBUG
- //var childrenBatchDebug = childrenBatch.Select(i => i.ToJson()).Aggregate((a, b) => $"{a}\r\n\r\n{b}");
- //System.IO.File.WriteAllText($"children of {parent.Asin}.json", childrenBatchDebug);
-#endif
- }
- catch (Exception ex)
- {
- Serilog.Log.Logger.Error(ex, "Error fetching batch of episodes. {@DebugInfo}", new
- {
- ParentId = parent.Asin,
- ParentTitle = parent.Title,
- BatchNumber = i,
- ChildIdBatch = idBatch
- });
- throw;
- }
-
- Serilog.Log.Logger.Debug($"Batch {i}: {childrenBatch.Count} results\t({{parent}})", parent);
- // the service returned no results. probably indicates an error. stop running batches
- if (!childrenBatch.Any())
- break;
-
- results.AddRange(childrenBatch);
- }
-
- Serilog.Log.Logger.Debug("Parent episodes/podcasts series. Children found. {@DebugInfo}", new
- {
- ParentId = parent.Asin,
- ParentTitle = parent.Title,
- ChildCount = childrenIds.Count
- });
-
- if (childrenIds.Count != results.Count)
- {
- var ex = new ApplicationException($"Mis-match: Children defined by parent={childrenIds.Count}. Children returned by batches={results.Count}");
- Serilog.Log.Logger.Error(ex, "{parent} - Quantity of series episodes defined by parent does not match quantity returned by batch fetching.", parent);
- throw ex;
- }
-
- return results;
}
#endregion
}