diff --git a/Source/ApplicationServices/LibraryCommands.cs b/Source/ApplicationServices/LibraryCommands.cs index cfa9551a..60e55c94 100644 --- a/Source/ApplicationServices/LibraryCommands.cs +++ b/Source/ApplicationServices/LibraryCommands.cs @@ -218,7 +218,7 @@ namespace ApplicationServices { { "Account", account.MaskedLogEntry }, { "ScannedDateTime", DateTime.Now.ToString("u") }, - { "Items", await Task.Run(() => JArray.FromObject(dtoItems)) } + { "Items", await Task.Run(() => JArray.FromObject(dtoItems.Select(i => i.SourceJson))) } }; await archiver.AddFileAsync(fileName, scanFile); diff --git a/Source/AudibleUtilities/ApiExtended.cs b/Source/AudibleUtilities/ApiExtended.cs index 1615f20a..76c17f15 100644 --- a/Source/AudibleUtilities/ApiExtended.cs +++ b/Source/AudibleUtilities/ApiExtended.cs @@ -1,6 +1,5 @@ using System; using System.Collections.Generic; -using System.Globalization; using System.Linq; using System.Threading.Channels; using System.Threading.Tasks; @@ -8,10 +7,9 @@ using System.Diagnostics; using AudibleApi; using AudibleApi.Common; using Dinah.Core; -using Newtonsoft.Json; -using Newtonsoft.Json.Converters; using Polly; using Polly.Retry; +using System.Threading; namespace AudibleUtilities { @@ -91,49 +89,73 @@ namespace AudibleUtilities { Serilog.Log.Logger.Debug("Beginning library scan."); - int count = 0; List items = new(); - List seriesItems = new(); - var sw = Stopwatch.StartNew(); + var totalTime = TimeSpan.Zero; + using var semaphore = new SemaphoreSlim(MaxConcurrency); - //Scan the library for all added books, and add any episode-type items to seriesItems to be scanned for episodes/parents - await foreach (var item in Api.GetLibraryItemAsyncEnumerable(libraryOptions, BatchSize, MaxConcurrency)) + var episodeChannel = Channel.CreateUnbounded(new UnboundedChannelOptions { SingleReader = true, SingleWriter = true }); + var batchReaderTask = readAllAsinsAsync(episodeChannel.Reader, semaphore); + + //Scan the library for all added books. + //Get relationship asins from episode-type items and write them to episodeChannel where they will be batched and queried. + await foreach (var item in Api.GetLibraryItemsPagesAsync(libraryOptions, BatchSize, semaphore)) { - if ((item.IsEpisodes || item.IsSeriesParent) && importEpisodes) - seriesItems.Add(item); - else if (!item.IsEpisodes && !item.IsSeriesParent) - items.Add(item); + if (importEpisodes) + { + var episodes = item.Where(i => i.IsEpisodes).ToList(); + var series = item.Where(i => i.IsSeriesParent).ToList(); - count++; + var parentAsins = episodes + .SelectMany(i => i.Relationships) + .Where(r => r.RelationshipToProduct == RelationshipToProduct.Parent) + .Select(r => r.Asin); + + var episodeAsins = series + .SelectMany(i => i.Relationships) + .Where(r => r.RelationshipToProduct == RelationshipToProduct.Child && r.RelationshipType == RelationshipType.Episode) + .Select(r => r.Asin); + + foreach (var asin in parentAsins.Concat(episodeAsins)) + episodeChannel.Writer.TryWrite(asin); + + items.AddRange(episodes); + items.AddRange(series); + } + + items.AddRange(item.Where(i => !i.IsSeriesParent && !i.IsEpisodes)); } - Serilog.Log.Logger.Debug("Library scan complete. Found {count} books and series. Waiting on series episode scans to complete.", count); - Serilog.Log.Logger.Debug("Beginning episode scan."); - - count = 0; - - //'get' Tasks are activated when they are written to the channel. To avoid more concurrency than is desired, the - //channel is bounded with a capacity of 1. Channel write operations are blocked until the current item is read - var episodeChannel = Channel.CreateBounded>>(new BoundedChannelOptions(1) { SingleReader = true }); - - //Start scanning for all episodes. Episode batch 'get' Tasks are written to the channel. - var scanAllSeriesTask = scanAllSeries(seriesItems, episodeChannel.Writer); - - //Read all episodes from the channel and add them to the import items. - //This method blocks until episodeChannel.Writer is closed by scanAllSeries() - await foreach (var ep in getAllEpisodesAsync(episodeChannel.Reader)) - { - items.AddRange(ep); - count += ep.Count; - } - - //Be sure to await the scanAllSeries Task so that any exceptions are thrown - await scanAllSeriesTask; - sw.Stop(); - Serilog.Log.Logger.Debug("Episode scan complete. Found {count} episodes and series.", count); - Serilog.Log.Logger.Debug($"Completed library scan in {sw.Elapsed.TotalMilliseconds:F0} ms."); + totalTime += sw.Elapsed; + Serilog.Log.Logger.Debug("Library scan complete after {elappsed_ms} ms. Found {count} books and series. Waiting on series episode scans to complete.", sw.ElapsedMilliseconds, items.Count); + sw.Restart(); + + //Signal that we're done adding asins + episodeChannel.Writer.Complete(); + + //Wait for all episodes/parents to be retrived + var allEps = await batchReaderTask; + + sw.Stop(); + totalTime += sw.Elapsed; + Serilog.Log.Logger.Debug("Episode scan complete after {elappsed_ms} ms. Found {count} episodes and series .", sw.ElapsedMilliseconds, allEps.Count); + sw.Restart(); + + Serilog.Log.Logger.Debug("Begin indexing series episodes"); + items.AddRange(allEps); + + //Set the Item.Series info for episodes and parents. + foreach (var parent in items.Where(i => i.IsSeriesParent)) + { + var children = items.Where(i => i.IsEpisodes && i.Relationships.Any(r => r.Asin == parent.Asin)); + setSeries(parent, children); + } + + sw.Stop(); + totalTime += sw.Elapsed; + Serilog.Log.Logger.Information("Completed indexing series episodes after {elappsed_ms} ms.", sw.ElapsedMilliseconds); + Serilog.Log.Logger.Information($"Completed library scan in {totalTime.TotalMilliseconds:F0} ms."); var validators = new List(); validators.AddRange(getValidators()); @@ -159,146 +181,55 @@ namespace AudibleUtilities #region episodes and podcasts /// - /// Read get tasks from the and await results. This method maintains - /// a list of up to get tasks. When any of the get tasks completes, - /// the Items are yielded, that task is removed from the list, and a new get task is read from - /// the channel. + /// Read asins from the channel and request catalog item info in batches of . Blocks until is closed. /// - private async IAsyncEnumerable> getAllEpisodesAsync(ChannelReader>> channel) + /// Input asins to batch + /// Shared semaphore to limit concurrency + /// All s of asins written to the channel. + private async Task> readAllAsinsAsync(ChannelReader channelReader, SemaphoreSlim semaphore) { - List>> concurentGets = new(); + int batchNum = 1; + List>> getTasks = new(); - for (int i = 0; i < MaxConcurrency && await channel.WaitToReadAsync(); i++) - concurentGets.Add(await channel.ReadAsync()); - - while (concurentGets.Count > 0) + while (await channelReader.WaitToReadAsync()) { - var completed = await Task.WhenAny(concurentGets); - concurentGets.Remove(completed); + List asins = new(); - if (await channel.WaitToReadAsync()) - concurentGets.Add(await channel.ReadAsync()); - - yield return completed.Result; - } - } - - /// - /// Gets all child episodes and episode parents belonging to in batches and - /// writes the get tasks to . - /// - private async Task scanAllSeries(IEnumerable seriesItems, ChannelWriter>> channel) - { - try - { - List episodeScanTasks = new(); - - foreach (var item in seriesItems) + while (asins.Count < BatchSize && await channelReader.WaitToReadAsync()) { - if (item.IsEpisodes) - await channel.WriteAsync(getEpisodeParentAsync(item)); - else if (item.IsSeriesParent) - episodeScanTasks.Add(getParentEpisodesAsync(item, channel)); + var asin = await channelReader.ReadAsync(); + + if (!asins.Contains(asin)) + asins.Add(asin); } - - //episodeScanTasks complete only after all episode batch 'gets' have been written to the channel - await Task.WhenAll(episodeScanTasks); - } - finally { channel.Complete(); } - } - - private async Task> getEpisodeParentAsync(Item episode) - { - //Item is a single episode that was added to the library. - //Get the episode's parent and add it to the database. - - Serilog.Log.Logger.Debug("Supplied Parent is an episode. Beginning parent scan for {parent}", episode); - - List children = new() { episode }; - - var parentAsins = episode.Relationships - .Where(r => r.RelationshipToProduct == RelationshipToProduct.Parent) - .Select(p => p.Asin); - - var seriesParents = await Api.GetCatalogProductsAsync(parentAsins, CatalogOptions.ResponseGroupOptions.ALL_OPTIONS); - - int numSeriesParents = seriesParents.Count(p => p.IsSeriesParent); - if (numSeriesParents != 1) - { - //There should only ever be 1 top-level parent per episode. If not, log - //so we can figure out what to do about those special cases, and don't - //import the episode. - JsonSerializerSettings Settings = new() - { - MetadataPropertyHandling = MetadataPropertyHandling.Ignore, - DateParseHandling = DateParseHandling.None, - Converters = { - new IsoDateTimeConverter { DateTimeStyles = DateTimeStyles.AssumeUniversal } - } - }; - Serilog.Log.Logger.Error($"Found {numSeriesParents} parents for {episode.Asin}\r\nEpisode Product:\r\n{JsonConvert.SerializeObject(episode, Formatting.None, Settings)}"); - return new(); + await semaphore.WaitAsync(); + getTasks.Add(getProductsAsync(batchNum++, asins, semaphore)); } - var parent = seriesParents.Single(p => p.IsSeriesParent); - parent.PurchaseDate = episode.PurchaseDate; - - setSeries(parent, children); - children.Add(parent); - - Serilog.Log.Logger.Debug("Completed parent scan for {episode}", episode); - - return children; + var completed = await Task.WhenAll(getTasks); + //We only want Series parents and Series episodes. Explude other relationship types (e.g. 'season') + return completed.SelectMany(l => l).Where(i => i.IsSeriesParent || i.IsEpisodes).ToList(); } - /// - /// Gets all episodes belonging to in batches of and writes the batch get tasks to - /// This method only completes after all episode batch 'gets' have been written to the channel - /// - private async Task getParentEpisodesAsync(Item parent, ChannelWriter>> channel) - { - Serilog.Log.Logger.Debug("Beginning episode scan for {parent}", parent); - - var episodeIds = parent.Relationships - .Where(r => r.RelationshipToProduct == RelationshipToProduct.Child && r.RelationshipType == RelationshipType.Episode) - .Select(r => r.Asin); - - for (int batchNum = 0; episodeIds.Any(); batchNum++) - { - var batch = episodeIds.Take(BatchSize); - - await channel.WriteAsync(getEpisodeBatchAsync(batchNum, parent, batch)); - - episodeIds = episodeIds.Skip(BatchSize); - } - } - - private async Task> getEpisodeBatchAsync(int batchNum, Item parent, IEnumerable childrenIds) + private async Task> getProductsAsync(int batchNum, List asins, SemaphoreSlim semaphore) { + Serilog.Log.Logger.Debug($"Batch {batchNum} Begin: Fetching {asins.Count} asins"); try { - List episodeBatch = await Api.GetCatalogProductsAsync(childrenIds, CatalogOptions.ResponseGroupOptions.ALL_OPTIONS); + var sw = Stopwatch.StartNew(); + var items = await Api.GetCatalogProductsAsync(asins, CatalogOptions.ResponseGroupOptions.ALL_OPTIONS); + sw.Stop(); - setSeries(parent, episodeBatch); + Serilog.Log.Logger.Debug($"Batch {batchNum} End: Retrieved {items.Count} items in {sw.ElapsedMilliseconds} ms"); - if (batchNum == 0) - episodeBatch.Add(parent); - - Serilog.Log.Logger.Debug($"Batch {batchNum}: {episodeBatch.Count} results\t({{parent}})", parent); - - return episodeBatch; + return items; } catch (Exception ex) { - Serilog.Log.Logger.Error(ex, "Error fetching batch of episodes. {@DebugInfo}", new - { - ParentId = parent.Asin, - ParentTitle = parent.Title, - BatchNumber = batchNum, - ChildIdBatch = childrenIds - }); + Serilog.Log.Logger.Error(ex, "Error fetching batch of episodes. {@DebugInfo}", new { asins }); throw; } + finally { semaphore.Release(); } } private static void setSeries(Item parent, IEnumerable children) @@ -314,6 +245,9 @@ namespace AudibleUtilities } }; + if (parent.PurchaseDate == default) + parent.PurchaseDate = children.Select(c => c.PurchaseDate).Order().First(); + foreach (var child in children) { // use parent's 'DateAdded'. DateAdded is just a convenience prop for: PurchaseDate.UtcDateTime @@ -333,4 +267,4 @@ namespace AudibleUtilities } #endregion } -} +} \ No newline at end of file diff --git a/Source/AudibleUtilities/AudibleUtilities.csproj b/Source/AudibleUtilities/AudibleUtilities.csproj index 48394512..d7a3bfb8 100644 --- a/Source/AudibleUtilities/AudibleUtilities.csproj +++ b/Source/AudibleUtilities/AudibleUtilities.csproj @@ -5,7 +5,7 @@ - +