diff --git a/Source/AudibleUtilities/ApiExtended.cs b/Source/AudibleUtilities/ApiExtended.cs index 52585b00..1f89f511 100644 --- a/Source/AudibleUtilities/ApiExtended.cs +++ b/Source/AudibleUtilities/ApiExtended.cs @@ -91,20 +91,13 @@ namespace AudibleUtilities { Serilog.Log.Logger.Debug("Beginning library scan."); - var episodeChannel = Channel.CreateBounded>>( - new BoundedChannelOptions(1) - { - SingleReader = true, - SingleWriter = false, - FullMode = BoundedChannelFullMode.Wait - }); - int count = 0; List items = new(); List seriesItems = new(); var sw = Stopwatch.StartNew(); + //Scan the library for all added books, and add ay episode-type items to seriesItems to be scanned for episodes/parents await foreach (var item in Api.GetLibraryItemAsyncEnumerable(libraryOptions, BatchSize, MaxConcurrency)) { if ((item.IsEpisodes || item.IsSeriesParent) && importEpisodes) @@ -119,15 +112,24 @@ namespace AudibleUtilities Serilog.Log.Logger.Debug("Beginning episode scan."); count = 0; - var episodeDlTask = scanAllSeries(seriesItems, episodeChannel.Writer); - await foreach (var ep in getAllEpisodesAsync(episodeChannel.Reader, MaxConcurrency)) + //'get' Tasks are activated when they are written to the channel. To avoid more concurrency than is desired, the + //channel is bounded with a capacity of 1. Channel write operations are blocked until the current item is read + var episodeChannel = Channel.CreateBounded>>(new BoundedChannelOptions(1) { SingleReader = true }); + + //Start scanning for all episodes. Episode batch 'get' Tasks are written to the channel. + var scanAllSeriesTask = scanAllSeries(seriesItems, episodeChannel.Writer); + + //Read all episodes from the channel and add them to the import items. + //This method blocks until episodeChannel.Writer is closed by scanAllSeries() + await foreach (var ep in getAllEpisodesAsync(episodeChannel.Reader)) { items.Add(ep); count++; } - await episodeDlTask; + //Be sure to await the scanAllSeries Task so that any exceptions are thrown + await scanAllSeriesTask; sw.Stop(); Serilog.Log.Logger.Debug("Episode scan complete. Found {count} episodes and series.", count); @@ -164,27 +166,37 @@ namespace AudibleUtilities #region episodes and podcasts - private async IAsyncEnumerable getAllEpisodesAsync(ChannelReader>> allEpisodes, int maxConcurrency) + /// + /// Read get tasks from the and await results. This method maintains + /// a list of up to get tasks. When any of the get tasks completes, + /// the Items are yielded, that task is removed from the list, and a new get task is read from + /// the channel. + /// + private async IAsyncEnumerable getAllEpisodesAsync(ChannelReader>> channel) { List>> concurentGets = new(); - for (int i = 0; i < maxConcurrency && await allEpisodes.WaitToReadAsync(); i++) - concurentGets.Add(await allEpisodes.ReadAsync()); + for (int i = 0; i < MaxConcurrency && await channel.WaitToReadAsync(); i++) + concurentGets.Add(await channel.ReadAsync()); while (concurentGets.Count > 0) { var completed = await Task.WhenAny(concurentGets); concurentGets.Remove(completed); - if (await allEpisodes.WaitToReadAsync()) - concurentGets.Add(await allEpisodes.ReadAsync()); + if (await channel.WaitToReadAsync()) + concurentGets.Add(await channel.ReadAsync()); foreach (var item in completed.Result) yield return item; } } - private async Task scanAllSeries(IEnumerable seriesItems, ChannelWriter>> allEpisodes) + /// + /// Gets all child episodes and episode parents belonging to in batches and + /// writes the get tasks to . + /// + private async Task scanAllSeries(IEnumerable seriesItems, ChannelWriter>> channel) { try { @@ -193,14 +205,15 @@ namespace AudibleUtilities foreach (var item in seriesItems) { if (item.IsEpisodes) - await allEpisodes.WriteAsync(getEpisodeParentAsync(item)); + await channel.WriteAsync(getEpisodeParentAsync(item)); else if (item.IsSeriesParent) - episodeScanTasks.Add(getParentEpisodesAsync(allEpisodes, item)); + episodeScanTasks.Add(getParentEpisodesAsync(item, channel)); } + //episodeScanTasks complete only after all episode batch 'gets' have been written to the channel await Task.WhenAll(episodeScanTasks); } - finally { allEpisodes.Complete(); } + finally { channel.Complete(); } } private async Task> getEpisodeParentAsync(Item episode) @@ -247,7 +260,11 @@ namespace AudibleUtilities return children; } - private async Task getParentEpisodesAsync(ChannelWriter>> channel, Item parent) + /// + /// Gets all episodes belonging to in batches of and writes the batch get tasks to + /// This method only completes after all episode batch 'gets' have been written to the channel + /// + private async Task getParentEpisodesAsync(Item parent, ChannelWriter>> channel) { Serilog.Log.Logger.Debug("Beginning episode scan for {parent}", parent);