From da36f9414dae11bd475e62cb53cb910983d390a5 Mon Sep 17 00:00:00 2001 From: Mbucari Date: Mon, 6 Mar 2023 16:49:52 -0700 Subject: [PATCH] Improve library scan performance --- Source/AudibleUtilities/ApiExtended.cs | 319 +++++++++++++------------ 1 file changed, 168 insertions(+), 151 deletions(-) diff --git a/Source/AudibleUtilities/ApiExtended.cs b/Source/AudibleUtilities/ApiExtended.cs index 376bde2c..52585b00 100644 --- a/Source/AudibleUtilities/ApiExtended.cs +++ b/Source/AudibleUtilities/ApiExtended.cs @@ -2,8 +2,9 @@ using System.Collections.Generic; using System.Globalization; using System.Linq; -using System.Threading; +using System.Threading.Channels; using System.Threading.Tasks; +using System.Diagnostics; using AudibleApi; using AudibleApi.Common; using Dinah.Core; @@ -19,6 +20,9 @@ namespace AudibleUtilities { public Api Api { get; private set; } + private const int MaxConcurrency = 10; + private const int BatchSize = 50; + private ApiExtended(Api api) => Api = api; /// Get api from existing tokens else login with 'eager' choice. External browser url is provided. Response can be external browser login or continuing with native api callbacks. @@ -85,35 +89,49 @@ namespace AudibleUtilities private async Task> getItemsAsync(LibraryOptions libraryOptions, bool importEpisodes) { - var items = new List(); - Serilog.Log.Logger.Debug("Beginning library scan."); - List>> getChildEpisodesTasks = new(); + var episodeChannel = Channel.CreateBounded>>( + new BoundedChannelOptions(1) + { + SingleReader = true, + SingleWriter = false, + FullMode = BoundedChannelFullMode.Wait + }); - int count = 0, maxConcurrentEpisodeScans = 5; - using SemaphoreSlim concurrencySemaphore = new(maxConcurrentEpisodeScans); + int count = 0; + List items = new(); + List seriesItems = new(); - await foreach (var item in Api.GetLibraryItemAsyncEnumerable(libraryOptions)) + var sw = Stopwatch.StartNew(); + + await foreach (var item in Api.GetLibraryItemAsyncEnumerable(libraryOptions, BatchSize, MaxConcurrency)) { if ((item.IsEpisodes || item.IsSeriesParent) && importEpisodes) - { - //Get child episodes asynchronously and await all at the end - getChildEpisodesTasks.Add(getChildEpisodesAsync(concurrencySemaphore, item)); - } + seriesItems.Add(item); else if (!item.IsEpisodes && !item.IsSeriesParent) items.Add(item); count++; } - Serilog.Log.Logger.Debug("Library scan complete. Found {count} books and series. Waiting on {getChildEpisodesTasksCount} series episode scans to complete.", count, getChildEpisodesTasks.Count); + Serilog.Log.Logger.Debug("Library scan complete. Found {count} books and series. Waiting on series episode scans to complete.", count); + Serilog.Log.Logger.Debug("Beginning episode scan."); - //await and add all episodes from all parents - foreach (var epList in await Task.WhenAll(getChildEpisodesTasks)) - items.AddRange(epList); + count = 0; + var episodeDlTask = scanAllSeries(seriesItems, episodeChannel.Writer); - Serilog.Log.Logger.Debug("Completed library scan."); + await foreach (var ep in getAllEpisodesAsync(episodeChannel.Reader, MaxConcurrency)) + { + items.Add(ep); + count++; + } + + await episodeDlTask; + + sw.Stop(); + Serilog.Log.Logger.Debug("Episode scan complete. Found {count} episodes and series.", count); + Serilog.Log.Logger.Debug($"Completed library scan in {sw.Elapsed.TotalMilliseconds:F0} ms."); #if DEBUG //// this will not work for multi accounts @@ -146,165 +164,164 @@ namespace AudibleUtilities #region episodes and podcasts - private async Task> getChildEpisodesAsync(SemaphoreSlim concurrencySemaphore, Item parent) + private async IAsyncEnumerable getAllEpisodesAsync(ChannelReader>> allEpisodes, int maxConcurrency) { - await concurrencySemaphore.WaitAsync(); + List>> concurentGets = new(); + for (int i = 0; i < maxConcurrency && await allEpisodes.WaitToReadAsync(); i++) + concurentGets.Add(await allEpisodes.ReadAsync()); + + while (concurentGets.Count > 0) + { + var completed = await Task.WhenAny(concurentGets); + concurentGets.Remove(completed); + + if (await allEpisodes.WaitToReadAsync()) + concurentGets.Add(await allEpisodes.ReadAsync()); + + foreach (var item in completed.Result) + yield return item; + } + } + + private async Task scanAllSeries(IEnumerable seriesItems, ChannelWriter>> allEpisodes) + { try { - Serilog.Log.Logger.Debug("Beginning episode scan for {parent}", parent); + List episodeScanTasks = new(); - List children; - - if (parent.IsEpisodes) + foreach (var item in seriesItems) { - //The 'parent' is a single episode that was added to the library. - //Get the episode's parent and add it to the database. + if (item.IsEpisodes) + await allEpisodes.WriteAsync(getEpisodeParentAsync(item)); + else if (item.IsSeriesParent) + episodeScanTasks.Add(getParentEpisodesAsync(allEpisodes, item)); + } - Serilog.Log.Logger.Debug("Supplied Parent is an episode. Beginning parent scan for {parent}", parent); + await Task.WhenAll(episodeScanTasks); + } + finally { allEpisodes.Complete(); } + } - children = new() { parent }; + private async Task> getEpisodeParentAsync(Item episode) + { + //Item is a single episode that was added to the library. + //Get the episode's parent and add it to the database. - var parentAsins = parent.Relationships - .Where(r => r.RelationshipToProduct == RelationshipToProduct.Parent) - .Select(p => p.Asin); + Serilog.Log.Logger.Debug("Supplied Parent is an episode. Beginning parent scan for {parent}", episode); - var seriesParents = await Api.GetCatalogProductsAsync(parentAsins, CatalogOptions.ResponseGroupOptions.ALL_OPTIONS); + List children = new() { episode }; - int numSeriesParents = seriesParents.Count(p => p.IsSeriesParent); - if (numSeriesParents != 1) - { - //There should only ever be 1 top-level parent per episode. If not, log - //so we can figure out what to do about those special cases, and don't - //import the episode. - JsonSerializerSettings Settings = new() - { - MetadataPropertyHandling = MetadataPropertyHandling.Ignore, - DateParseHandling = DateParseHandling.None, - Converters = - { - new IsoDateTimeConverter { DateTimeStyles = DateTimeStyles.AssumeUniversal } - }, - }; - Serilog.Log.Logger.Error($"Found {numSeriesParents} parents for {parent.Asin}\r\nEpisode Product:\r\n{JsonConvert.SerializeObject(parent, Formatting.None, Settings)}"); - return new List(); + var parentAsins = episode.Relationships + .Where(r => r.RelationshipToProduct == RelationshipToProduct.Parent) + .Select(p => p.Asin); + + var seriesParents = await Api.GetCatalogProductsAsync(parentAsins, CatalogOptions.ResponseGroupOptions.ALL_OPTIONS); + + int numSeriesParents = seriesParents.Count(p => p.IsSeriesParent); + if (numSeriesParents != 1) + { + //There should only ever be 1 top-level parent per episode. If not, log + //so we can figure out what to do about those special cases, and don't + //import the episode. + JsonSerializerSettings Settings = new() + { + MetadataPropertyHandling = MetadataPropertyHandling.Ignore, + DateParseHandling = DateParseHandling.None, + Converters = { + new IsoDateTimeConverter { DateTimeStyles = DateTimeStyles.AssumeUniversal } } + }; + Serilog.Log.Logger.Error($"Found {numSeriesParents} parents for {episode.Asin}\r\nEpisode Product:\r\n{JsonConvert.SerializeObject(episode, Formatting.None, Settings)}"); + return new(); + } - var realParent = seriesParents.Single(p => p.IsSeriesParent); - realParent.PurchaseDate = parent.PurchaseDate; + var parent = seriesParents.Single(p => p.IsSeriesParent); + parent.PurchaseDate = episode.PurchaseDate; - Serilog.Log.Logger.Debug("Completed parent scan for {parent}", parent); - parent = realParent; - } - else + setSeries(parent, children); + children.Add(parent); + + Serilog.Log.Logger.Debug("Completed parent scan for {episode}", episode); + + return children; + } + + private async Task getParentEpisodesAsync(ChannelWriter>> channel, Item parent) + { + Serilog.Log.Logger.Debug("Beginning episode scan for {parent}", parent); + + var episodeIds = parent.Relationships + .Where(r => r.RelationshipToProduct == RelationshipToProduct.Child && r.RelationshipType == RelationshipType.Episode) + .Select(r => r.Asin); + + for (int batchNum = 0; episodeIds.Any(); batchNum++) + { + var batch = episodeIds.Take(BatchSize); + + await channel.WriteAsync(getEpisodeBatchAsync(batchNum, parent, batch)); + + episodeIds = episodeIds.Skip(BatchSize); + } + } + + private async Task> getEpisodeBatchAsync(int batchNum, Item parent, IEnumerable childrenIds) + { + try + { + List episodeBatch = await Api.GetCatalogProductsAsync(childrenIds, CatalogOptions.ResponseGroupOptions.ALL_OPTIONS); + + setSeries(parent, episodeBatch); + + if (batchNum == 0) + episodeBatch.Add(parent); + + Serilog.Log.Logger.Debug($"Batch {batchNum}: {episodeBatch.Count} results\t({{parent}})", parent); + + return episodeBatch; + } + catch (Exception ex) + { + Serilog.Log.Logger.Error(ex, "Error fetching batch of episodes. {@DebugInfo}", new { - children = await getEpisodeChildrenAsync(parent); - if (!children.Any()) - return new(); - } + ParentId = parent.Asin, + ParentTitle = parent.Title, + BatchNumber = batchNum, + ChildIdBatch = childrenIds + }); + throw; + } + } - //A series parent will always have exactly 1 Series - parent.Series = new Series[] + private static void setSeries(Item parent, IEnumerable children) + { + //A series parent will always have exactly 1 Series + parent.Series = new[] + { + new Series + { + Asin = parent.Asin, + Sequence = "-1", + Title = parent.TitleWithSubtitle + } + }; + + foreach (var child in children) + { + // use parent's 'DateAdded'. DateAdded is just a convenience prop for: PurchaseDate.UtcDateTime + child.PurchaseDate = parent.PurchaseDate; + // parent is essentially a series + child.Series = new[] { new Series { Asin = parent.Asin, - Sequence = "-1", + // This should properly be Single() not FirstOrDefault(), but FirstOrDefault is defensive for malformed data from audible + Sequence = parent.Relationships.FirstOrDefault(r => r.Asin == child.Asin)?.Sort?.ToString() ?? "0", Title = parent.TitleWithSubtitle } }; - - foreach (var child in children) - { - // use parent's 'DateAdded'. DateAdded is just a convenience prop for: PurchaseDate.UtcDateTime - child.PurchaseDate = parent.PurchaseDate; - // parent is essentially a series - child.Series = new Series[] - { - new Series - { - Asin = parent.Asin, - // This should properly be Single() not FirstOrDefault(), but FirstOrDefault is defensive for malformed data from audible - Sequence = parent.Relationships.FirstOrDefault(r => r.Asin == child.Asin)?.Sort?.ToString() ?? "0", - Title = parent.TitleWithSubtitle - } - }; - } - - children.Add(parent); - - Serilog.Log.Logger.Debug("Completed episode scan for {parent}", parent); - - return children; } - finally - { - concurrencySemaphore.Release(); - } - } - - private async Task> getEpisodeChildrenAsync(Item parent) - { - var childrenIds = parent.Relationships - .Where(r => r.RelationshipToProduct == RelationshipToProduct.Child && r.RelationshipType == RelationshipType.Episode) - .Select(r => r.Asin) - .ToList(); - - // fetch children in batches - const int batchSize = 20; - - var results = new List(); - - for (var i = 1; ; i++) - { - var idBatch = childrenIds.Skip((i - 1) * batchSize).Take(batchSize).ToList(); - if (!idBatch.Any()) - break; - - List childrenBatch; - try - { - childrenBatch = await Api.GetCatalogProductsAsync(idBatch, CatalogOptions.ResponseGroupOptions.ALL_OPTIONS); -#if DEBUG - //var childrenBatchDebug = childrenBatch.Select(i => i.ToJson()).Aggregate((a, b) => $"{a}\r\n\r\n{b}"); - //System.IO.File.WriteAllText($"children of {parent.Asin}.json", childrenBatchDebug); -#endif - } - catch (Exception ex) - { - Serilog.Log.Logger.Error(ex, "Error fetching batch of episodes. {@DebugInfo}", new - { - ParentId = parent.Asin, - ParentTitle = parent.Title, - BatchNumber = i, - ChildIdBatch = idBatch - }); - throw; - } - - Serilog.Log.Logger.Debug($"Batch {i}: {childrenBatch.Count} results\t({{parent}})", parent); - // the service returned no results. probably indicates an error. stop running batches - if (!childrenBatch.Any()) - break; - - results.AddRange(childrenBatch); - } - - Serilog.Log.Logger.Debug("Parent episodes/podcasts series. Children found. {@DebugInfo}", new - { - ParentId = parent.Asin, - ParentTitle = parent.Title, - ChildCount = childrenIds.Count - }); - - if (childrenIds.Count != results.Count) - { - var ex = new ApplicationException($"Mis-match: Children defined by parent={childrenIds.Count}. Children returned by batches={results.Count}"); - Serilog.Log.Logger.Error(ex, "{parent} - Quantity of series episodes defined by parent does not match quantity returned by batch fetching.", parent); - throw ex; - } - - return results; } #endregion }