Use new AudibleApi methods
This commit is contained in:
parent
eed42bd108
commit
ec7dd1b54a
@ -218,7 +218,7 @@ namespace ApplicationServices
|
||||
{
|
||||
{ "Account", account.MaskedLogEntry },
|
||||
{ "ScannedDateTime", DateTime.Now.ToString("u") },
|
||||
{ "Items", await Task.Run(() => JArray.FromObject(dtoItems)) }
|
||||
{ "Items", await Task.Run(() => JArray.FromObject(dtoItems.Select(i => i.SourceJson))) }
|
||||
};
|
||||
|
||||
await archiver.AddFileAsync(fileName, scanFile);
|
||||
|
||||
@ -1,6 +1,5 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Globalization;
|
||||
using System.Linq;
|
||||
using System.Threading.Channels;
|
||||
using System.Threading.Tasks;
|
||||
@ -8,10 +7,9 @@ using System.Diagnostics;
|
||||
using AudibleApi;
|
||||
using AudibleApi.Common;
|
||||
using Dinah.Core;
|
||||
using Newtonsoft.Json;
|
||||
using Newtonsoft.Json.Converters;
|
||||
using Polly;
|
||||
using Polly.Retry;
|
||||
using System.Threading;
|
||||
|
||||
namespace AudibleUtilities
|
||||
{
|
||||
@ -91,49 +89,73 @@ namespace AudibleUtilities
|
||||
{
|
||||
Serilog.Log.Logger.Debug("Beginning library scan.");
|
||||
|
||||
int count = 0;
|
||||
List<Item> items = new();
|
||||
List<Item> seriesItems = new();
|
||||
|
||||
var sw = Stopwatch.StartNew();
|
||||
var totalTime = TimeSpan.Zero;
|
||||
using var semaphore = new SemaphoreSlim(MaxConcurrency);
|
||||
|
||||
//Scan the library for all added books, and add any episode-type items to seriesItems to be scanned for episodes/parents
|
||||
await foreach (var item in Api.GetLibraryItemAsyncEnumerable(libraryOptions, BatchSize, MaxConcurrency))
|
||||
var episodeChannel = Channel.CreateUnbounded<string>(new UnboundedChannelOptions { SingleReader = true, SingleWriter = true });
|
||||
var batchReaderTask = readAllAsinsAsync(episodeChannel.Reader, semaphore);
|
||||
|
||||
//Scan the library for all added books.
|
||||
//Get relationship asins from episode-type items and write them to episodeChannel where they will be batched and queried.
|
||||
await foreach (var item in Api.GetLibraryItemsPagesAsync(libraryOptions, BatchSize, semaphore))
|
||||
{
|
||||
if ((item.IsEpisodes || item.IsSeriesParent) && importEpisodes)
|
||||
seriesItems.Add(item);
|
||||
else if (!item.IsEpisodes && !item.IsSeriesParent)
|
||||
items.Add(item);
|
||||
if (importEpisodes)
|
||||
{
|
||||
var episodes = item.Where(i => i.IsEpisodes).ToList();
|
||||
var series = item.Where(i => i.IsSeriesParent).ToList();
|
||||
|
||||
count++;
|
||||
var parentAsins = episodes
|
||||
.SelectMany(i => i.Relationships)
|
||||
.Where(r => r.RelationshipToProduct == RelationshipToProduct.Parent)
|
||||
.Select(r => r.Asin);
|
||||
|
||||
var episodeAsins = series
|
||||
.SelectMany(i => i.Relationships)
|
||||
.Where(r => r.RelationshipToProduct == RelationshipToProduct.Child && r.RelationshipType == RelationshipType.Episode)
|
||||
.Select(r => r.Asin);
|
||||
|
||||
foreach (var asin in parentAsins.Concat(episodeAsins))
|
||||
episodeChannel.Writer.TryWrite(asin);
|
||||
|
||||
items.AddRange(episodes);
|
||||
items.AddRange(series);
|
||||
}
|
||||
|
||||
items.AddRange(item.Where(i => !i.IsSeriesParent && !i.IsEpisodes));
|
||||
}
|
||||
|
||||
Serilog.Log.Logger.Debug("Library scan complete. Found {count} books and series. Waiting on series episode scans to complete.", count);
|
||||
Serilog.Log.Logger.Debug("Beginning episode scan.");
|
||||
|
||||
count = 0;
|
||||
|
||||
//'get' Tasks are activated when they are written to the channel. To avoid more concurrency than is desired, the
|
||||
//channel is bounded with a capacity of 1. Channel write operations are blocked until the current item is read
|
||||
var episodeChannel = Channel.CreateBounded<Task<List<Item>>>(new BoundedChannelOptions(1) { SingleReader = true });
|
||||
|
||||
//Start scanning for all episodes. Episode batch 'get' Tasks are written to the channel.
|
||||
var scanAllSeriesTask = scanAllSeries(seriesItems, episodeChannel.Writer);
|
||||
|
||||
//Read all episodes from the channel and add them to the import items.
|
||||
//This method blocks until episodeChannel.Writer is closed by scanAllSeries()
|
||||
await foreach (var ep in getAllEpisodesAsync(episodeChannel.Reader))
|
||||
{
|
||||
items.AddRange(ep);
|
||||
count += ep.Count;
|
||||
}
|
||||
|
||||
//Be sure to await the scanAllSeries Task so that any exceptions are thrown
|
||||
await scanAllSeriesTask;
|
||||
|
||||
sw.Stop();
|
||||
Serilog.Log.Logger.Debug("Episode scan complete. Found {count} episodes and series.", count);
|
||||
Serilog.Log.Logger.Debug($"Completed library scan in {sw.Elapsed.TotalMilliseconds:F0} ms.");
|
||||
totalTime += sw.Elapsed;
|
||||
Serilog.Log.Logger.Debug("Library scan complete after {elappsed_ms} ms. Found {count} books and series. Waiting on series episode scans to complete.", sw.ElapsedMilliseconds, items.Count);
|
||||
sw.Restart();
|
||||
|
||||
//Signal that we're done adding asins
|
||||
episodeChannel.Writer.Complete();
|
||||
|
||||
//Wait for all episodes/parents to be retrived
|
||||
var allEps = await batchReaderTask;
|
||||
|
||||
sw.Stop();
|
||||
totalTime += sw.Elapsed;
|
||||
Serilog.Log.Logger.Debug("Episode scan complete after {elappsed_ms} ms. Found {count} episodes and series .", sw.ElapsedMilliseconds, allEps.Count);
|
||||
sw.Restart();
|
||||
|
||||
Serilog.Log.Logger.Debug("Begin indexing series episodes");
|
||||
items.AddRange(allEps);
|
||||
|
||||
//Set the Item.Series info for episodes and parents.
|
||||
foreach (var parent in items.Where(i => i.IsSeriesParent))
|
||||
{
|
||||
var children = items.Where(i => i.IsEpisodes && i.Relationships.Any(r => r.Asin == parent.Asin));
|
||||
setSeries(parent, children);
|
||||
}
|
||||
|
||||
sw.Stop();
|
||||
totalTime += sw.Elapsed;
|
||||
Serilog.Log.Logger.Information("Completed indexing series episodes after {elappsed_ms} ms.", sw.ElapsedMilliseconds);
|
||||
Serilog.Log.Logger.Information($"Completed library scan in {totalTime.TotalMilliseconds:F0} ms.");
|
||||
|
||||
var validators = new List<IValidator>();
|
||||
validators.AddRange(getValidators());
|
||||
@ -159,146 +181,55 @@ namespace AudibleUtilities
|
||||
#region episodes and podcasts
|
||||
|
||||
/// <summary>
|
||||
/// Read get tasks from the <paramref name="channel"/> and await results. This method maintains
|
||||
/// a list of up to <see cref="MaxConcurrency"/> get tasks. When any of the get tasks completes,
|
||||
/// the Items are yielded, that task is removed from the list, and a new get task is read from
|
||||
/// the channel.
|
||||
/// Read asins from the channel and request catalog item info in batches of <see cref="BatchSize"/>. Blocks until <paramref name="channelReader"/> is closed.
|
||||
/// </summary>
|
||||
private async IAsyncEnumerable<List<Item>> getAllEpisodesAsync(ChannelReader<Task<List<Item>>> channel)
|
||||
/// <param name="channelReader">Input asins to batch</param>
|
||||
/// <param name="semaphore">Shared semaphore to limit concurrency</param>
|
||||
/// <returns>All <see cref="Item"/>s of asins written to the channel.</returns>
|
||||
private async Task<List<Item>> readAllAsinsAsync(ChannelReader<string> channelReader, SemaphoreSlim semaphore)
|
||||
{
|
||||
List<Task<List<Item>>> concurentGets = new();
|
||||
int batchNum = 1;
|
||||
List<Task<List<Item>>> getTasks = new();
|
||||
|
||||
for (int i = 0; i < MaxConcurrency && await channel.WaitToReadAsync(); i++)
|
||||
concurentGets.Add(await channel.ReadAsync());
|
||||
|
||||
while (concurentGets.Count > 0)
|
||||
while (await channelReader.WaitToReadAsync())
|
||||
{
|
||||
var completed = await Task.WhenAny(concurentGets);
|
||||
concurentGets.Remove(completed);
|
||||
List<string> asins = new();
|
||||
|
||||
if (await channel.WaitToReadAsync())
|
||||
concurentGets.Add(await channel.ReadAsync());
|
||||
|
||||
yield return completed.Result;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Gets all child episodes and episode parents belonging to <paramref name="seriesItems"/> in batches and
|
||||
/// writes the get tasks to <paramref name="channel"/>.
|
||||
/// </summary>
|
||||
private async Task scanAllSeries(IEnumerable<Item> seriesItems, ChannelWriter<Task<List<Item>>> channel)
|
||||
{
|
||||
try
|
||||
{
|
||||
List<Task> episodeScanTasks = new();
|
||||
|
||||
foreach (var item in seriesItems)
|
||||
while (asins.Count < BatchSize && await channelReader.WaitToReadAsync())
|
||||
{
|
||||
if (item.IsEpisodes)
|
||||
await channel.WriteAsync(getEpisodeParentAsync(item));
|
||||
else if (item.IsSeriesParent)
|
||||
episodeScanTasks.Add(getParentEpisodesAsync(item, channel));
|
||||
var asin = await channelReader.ReadAsync();
|
||||
|
||||
if (!asins.Contains(asin))
|
||||
asins.Add(asin);
|
||||
}
|
||||
|
||||
//episodeScanTasks complete only after all episode batch 'gets' have been written to the channel
|
||||
await Task.WhenAll(episodeScanTasks);
|
||||
}
|
||||
finally { channel.Complete(); }
|
||||
}
|
||||
|
||||
private async Task<List<Item>> getEpisodeParentAsync(Item episode)
|
||||
{
|
||||
//Item is a single episode that was added to the library.
|
||||
//Get the episode's parent and add it to the database.
|
||||
|
||||
Serilog.Log.Logger.Debug("Supplied Parent is an episode. Beginning parent scan for {parent}", episode);
|
||||
|
||||
List<Item> children = new() { episode };
|
||||
|
||||
var parentAsins = episode.Relationships
|
||||
.Where(r => r.RelationshipToProduct == RelationshipToProduct.Parent)
|
||||
.Select(p => p.Asin);
|
||||
|
||||
var seriesParents = await Api.GetCatalogProductsAsync(parentAsins, CatalogOptions.ResponseGroupOptions.ALL_OPTIONS);
|
||||
|
||||
int numSeriesParents = seriesParents.Count(p => p.IsSeriesParent);
|
||||
if (numSeriesParents != 1)
|
||||
{
|
||||
//There should only ever be 1 top-level parent per episode. If not, log
|
||||
//so we can figure out what to do about those special cases, and don't
|
||||
//import the episode.
|
||||
JsonSerializerSettings Settings = new()
|
||||
{
|
||||
MetadataPropertyHandling = MetadataPropertyHandling.Ignore,
|
||||
DateParseHandling = DateParseHandling.None,
|
||||
Converters = {
|
||||
new IsoDateTimeConverter { DateTimeStyles = DateTimeStyles.AssumeUniversal }
|
||||
}
|
||||
};
|
||||
Serilog.Log.Logger.Error($"Found {numSeriesParents} parents for {episode.Asin}\r\nEpisode Product:\r\n{JsonConvert.SerializeObject(episode, Formatting.None, Settings)}");
|
||||
return new();
|
||||
await semaphore.WaitAsync();
|
||||
getTasks.Add(getProductsAsync(batchNum++, asins, semaphore));
|
||||
}
|
||||
|
||||
var parent = seriesParents.Single(p => p.IsSeriesParent);
|
||||
parent.PurchaseDate = episode.PurchaseDate;
|
||||
|
||||
setSeries(parent, children);
|
||||
children.Add(parent);
|
||||
|
||||
Serilog.Log.Logger.Debug("Completed parent scan for {episode}", episode);
|
||||
|
||||
return children;
|
||||
var completed = await Task.WhenAll(getTasks);
|
||||
//We only want Series parents and Series episodes. Explude other relationship types (e.g. 'season')
|
||||
return completed.SelectMany(l => l).Where(i => i.IsSeriesParent || i.IsEpisodes).ToList();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Gets all episodes belonging to <paramref name="parent"/> in batches of <see cref="BatchSize"/> and writes the batch get tasks to <paramref name="channel"/>
|
||||
/// This method only completes after all episode batch 'gets' have been written to the channel
|
||||
/// </summary>
|
||||
private async Task getParentEpisodesAsync(Item parent, ChannelWriter<Task<List<Item>>> channel)
|
||||
{
|
||||
Serilog.Log.Logger.Debug("Beginning episode scan for {parent}", parent);
|
||||
|
||||
var episodeIds = parent.Relationships
|
||||
.Where(r => r.RelationshipToProduct == RelationshipToProduct.Child && r.RelationshipType == RelationshipType.Episode)
|
||||
.Select(r => r.Asin);
|
||||
|
||||
for (int batchNum = 0; episodeIds.Any(); batchNum++)
|
||||
{
|
||||
var batch = episodeIds.Take(BatchSize);
|
||||
|
||||
await channel.WriteAsync(getEpisodeBatchAsync(batchNum, parent, batch));
|
||||
|
||||
episodeIds = episodeIds.Skip(BatchSize);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<List<Item>> getEpisodeBatchAsync(int batchNum, Item parent, IEnumerable<string> childrenIds)
|
||||
private async Task<List<Item>> getProductsAsync(int batchNum, List<string> asins, SemaphoreSlim semaphore)
|
||||
{
|
||||
Serilog.Log.Logger.Debug($"Batch {batchNum} Begin: Fetching {asins.Count} asins");
|
||||
try
|
||||
{
|
||||
List<Item> episodeBatch = await Api.GetCatalogProductsAsync(childrenIds, CatalogOptions.ResponseGroupOptions.ALL_OPTIONS);
|
||||
var sw = Stopwatch.StartNew();
|
||||
var items = await Api.GetCatalogProductsAsync(asins, CatalogOptions.ResponseGroupOptions.ALL_OPTIONS);
|
||||
sw.Stop();
|
||||
|
||||
setSeries(parent, episodeBatch);
|
||||
Serilog.Log.Logger.Debug($"Batch {batchNum} End: Retrieved {items.Count} items in {sw.ElapsedMilliseconds} ms");
|
||||
|
||||
if (batchNum == 0)
|
||||
episodeBatch.Add(parent);
|
||||
|
||||
Serilog.Log.Logger.Debug($"Batch {batchNum}: {episodeBatch.Count} results\t({{parent}})", parent);
|
||||
|
||||
return episodeBatch;
|
||||
return items;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Serilog.Log.Logger.Error(ex, "Error fetching batch of episodes. {@DebugInfo}", new
|
||||
{
|
||||
ParentId = parent.Asin,
|
||||
ParentTitle = parent.Title,
|
||||
BatchNumber = batchNum,
|
||||
ChildIdBatch = childrenIds
|
||||
});
|
||||
Serilog.Log.Logger.Error(ex, "Error fetching batch of episodes. {@DebugInfo}", new { asins });
|
||||
throw;
|
||||
}
|
||||
finally { semaphore.Release(); }
|
||||
}
|
||||
|
||||
private static void setSeries(Item parent, IEnumerable<Item> children)
|
||||
@ -314,6 +245,9 @@ namespace AudibleUtilities
|
||||
}
|
||||
};
|
||||
|
||||
if (parent.PurchaseDate == default)
|
||||
parent.PurchaseDate = children.Select(c => c.PurchaseDate).Order().First();
|
||||
|
||||
foreach (var child in children)
|
||||
{
|
||||
// use parent's 'DateAdded'. DateAdded is just a convenience prop for: PurchaseDate.UtcDateTime
|
||||
@ -333,4 +267,4 @@ namespace AudibleUtilities
|
||||
}
|
||||
#endregion
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -5,7 +5,7 @@
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="AudibleApi" Version="8.1.0.1" />
|
||||
<PackageReference Include="AudibleApi" Version="8.2.0.1" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user