Improved NetworkFileStream asynchronous operation.

This commit is contained in:
Michael Bucari-Tovo 2021-08-13 22:53:17 -06:00
parent a44c46333f
commit 7bdcf4eef0
2 changed files with 55 additions and 72 deletions

View File

@ -112,7 +112,6 @@ namespace AaxDecrypter
{
nfsPersister = NewNetworkFilePersister();
}
nfsPersister.NetworkFileStream.BeginDownloading();
aaxFile = new AaxFile(nfsPersister.NetworkFileStream);

View File

@ -6,7 +6,6 @@ using System.IO;
using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
namespace AaxDecrypter
{
@ -27,7 +26,7 @@ namespace AaxDecrypter
public CookieCollection GetCookies()
{
return base.GetCookies(Uri);
return GetCookies(Uri);
}
}
@ -79,15 +78,14 @@ namespace AaxDecrypter
#endregion
#region Private Properties
private HttpWebRequest HttpRequest { get; set; }
private FileStream _writeFile { get; }
private FileStream _readFile { get; }
private Stream _networkStream { get; set; }
private bool hasBegunDownloading { get; set; }
private bool isCancelled { get; set; }
private bool finishedDownloading { get; set; }
private Action downloadThreadCompleteCallback { get; set; }
private EventWaitHandle downloadEnded { get; set; }
private EventWaitHandle downloadedPiece { get; set; }
#endregion
@ -147,7 +145,7 @@ namespace AaxDecrypter
private void Update()
{
RequestHeaders = HttpRequest.Headers;
Updated?.Invoke(this, new EventArgs());
Updated?.Invoke(this, EventArgs.Empty);
}
/// <summary>
@ -160,8 +158,8 @@ namespace AaxDecrypter
if (uriToSameFile.Host != Uri.Host)
throw new ArgumentException($"New uri to the same file must have the same host.\r\n Old Host :{Uri.Host}\r\nNew Host: {uriToSameFile.Host}");
if (hasBegunDownloading && !finishedDownloading)
throw new Exception("Cannot change Uri during a download operation.");
if (hasBegunDownloading)
throw new InvalidOperationException("Cannot change Uri after download has started.");
Uri = uriToSameFile;
HttpRequest = WebRequest.CreateHttp(Uri);
@ -176,25 +174,27 @@ namespace AaxDecrypter
/// <summary>
/// Begins downloading <see cref="Uri"/> to <see cref="SaveFilePath"/> in a background thread.
/// </summary>
public void BeginDownloading()
private void BeginDownloading()
{
downloadEnded = new EventWaitHandle(false, EventResetMode.ManualReset);
if (ContentLength != 0 && WritePosition == ContentLength)
{
hasBegunDownloading = true;
finishedDownloading = true;
downloadEnded.Set();
return;
}
if (ContentLength != 0 && WritePosition > ContentLength)
throw new Exception($"Specified write position (0x{WritePosition:X10}) is larger than the file size.");
throw new WebException($"Specified write position (0x{WritePosition:X10}) is larger than {nameof(ContentLength)} (0x{ContentLength:X10}).");
var response = HttpRequest.GetResponse() as HttpWebResponse;
if (response.StatusCode != HttpStatusCode.PartialContent)
throw new Exception($"Server at {Uri.Host} responded with unexpected status code: {response.StatusCode}.");
throw new WebException($"Server at {Uri.Host} responded with unexpected status code: {response.StatusCode}.");
if (response.Headers.GetValues("Accept-Ranges").FirstOrDefault(r => r.EqualsInsensitive("bytes")) is null)
throw new Exception($"Server at {Uri.Host} does not support Http ranges");
throw new WebException($"Server at {Uri.Host} does not support Http ranges");
//Content length is the length of the range request, and it is only equal
//to the complete file length if requesting Range: bytes=0-
@ -202,10 +202,12 @@ namespace AaxDecrypter
ContentLength = response.ContentLength;
_networkStream = response.GetResponseStream();
downloadedPiece = new EventWaitHandle(false, EventResetMode.AutoReset);
//Download the file in the background.
Thread downloadThread = new Thread(() => DownloadFile()) { IsBackground = true };
downloadThread.Start();
new Thread(() => DownloadFile())
{ IsBackground = true }
.Start();
hasBegunDownloading = true;
return;
@ -216,13 +218,13 @@ namespace AaxDecrypter
/// </summary>
private void DownloadFile()
{
long downloadPosition = WritePosition;
long nextFlush = downloadPosition + DATA_FLUSH_SZ;
var downloadPosition = WritePosition;
var nextFlush = downloadPosition + DATA_FLUSH_SZ;
byte[] buff = new byte[DOWNLOAD_BUFF_SZ];
var buff = new byte[DOWNLOAD_BUFF_SZ];
do
{
int bytesRead = _networkStream.Read(buff, 0, DOWNLOAD_BUFF_SZ);
var bytesRead = _networkStream.Read(buff, 0, DOWNLOAD_BUFF_SZ);
_writeFile.Write(buff, 0, bytesRead);
downloadPosition += bytesRead;
@ -233,6 +235,7 @@ namespace AaxDecrypter
WritePosition = downloadPosition;
Update();
nextFlush = downloadPosition + DATA_FLUSH_SZ;
downloadedPiece.Set();
}
} while (downloadPosition < ContentLength && !isCancelled);
@ -243,13 +246,12 @@ namespace AaxDecrypter
_networkStream.Close();
if (!isCancelled && WritePosition < ContentLength)
throw new Exception("File download ended before finishing.");
throw new WebException($"Downloaded size (0x{WritePosition:X10}) is less than {nameof(ContentLength)} (0x{ContentLength:X10}).");
if (WritePosition > ContentLength)
throw new Exception("Downloaded file is larger than expected.");
throw new WebException($"Downloaded size (0x{WritePosition:X10}) is greater than {nameof(ContentLength)} (0x{ContentLength:X10}).");
finishedDownloading = true;
downloadThreadCompleteCallback?.Invoke();
downloadEnded.Set();
}
#endregion
@ -330,9 +332,7 @@ namespace AaxDecrypter
var result = new WebHeaderCollection();
foreach (var kvp in jObj)
{
result.Add(kvp.Key, kvp.Value.Value<string>());
}
return result;
}
@ -341,8 +341,8 @@ namespace AaxDecrypter
public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer)
{
JObject jObj = new JObject();
Type type = value.GetType();
var jObj = new JObject();
var type = value.GetType();
var headers = value as WebHeaderCollection;
var jHeaders = headers.AllKeys.Select(k => new JProperty(k, headers[k]));
jObj.Add(jHeaders);
@ -364,13 +364,21 @@ namespace AaxDecrypter
public override bool CanWrite => false;
[JsonIgnore]
public override long Length => ContentLength;
public override long Length
{
get
{
if (!hasBegunDownloading)
BeginDownloading();
return ContentLength;
}
}
[JsonIgnore]
public override long Position { get => _readFile.Position; set => Seek(value, SeekOrigin.Begin); }
[JsonIgnore]
public override bool CanTimeout => base.CanTimeout;
public override bool CanTimeout => false;
[JsonIgnore]
public override int ReadTimeout { get => base.ReadTimeout; set => base.ReadTimeout = value; }
@ -387,63 +395,39 @@ namespace AaxDecrypter
if (!hasBegunDownloading)
BeginDownloading();
long toRead = Math.Min(count, Length - Position);
long requiredPosition = Position + toRead;
//read operation will block until file contains enough data
//to fulfil the request, or until cancelled.
while (requiredPosition > WritePosition && !isCancelled)
Thread.Sleep(2);
var toRead = Math.Min(count, Length - Position);
WaitToPosition(Position + toRead);
return _readFile.Read(buffer, offset, count);
}
public override long Seek(long offset, SeekOrigin origin)
{
long newPosition;
var newPosition = origin switch
{
SeekOrigin.Current => Position + offset,
SeekOrigin.End => ContentLength + offset,
_ => offset,
};
switch (origin)
{
case SeekOrigin.Current:
newPosition = Position + offset;
break;
case SeekOrigin.End:
newPosition = ContentLength + offset;
break;
default:
newPosition = offset;
break;
}
ReadToPosition(newPosition);
_readFile.Position = newPosition;
return newPosition;
WaitToPosition(newPosition);
return _readFile.Position = newPosition;
}
/// <summary>
/// Ensures that the file has downloaded to at least <paramref name="neededPosition"/>, then returns.
/// Blocks until the file has downloaded to at least <paramref name="requiredPosition"/>, then returns.
/// </summary>
/// <param name="neededPosition">The minimum required data length in <see cref="SaveFilePath"/>.</param>
private void ReadToPosition(long neededPosition)
{
byte[] buff = new byte[DOWNLOAD_BUFF_SZ];
do
{
Read(buff, 0, DOWNLOAD_BUFF_SZ);
} while (neededPosition > WritePosition);
/// <param name="requiredPosition">The minimum required flished data length in <see cref="SaveFilePath"/>.</param>
private void WaitToPosition(long requiredPosition)
{
while (requiredPosition > WritePosition && !isCancelled && hasBegunDownloading && !downloadedPiece.WaitOne(1000)) ;
}
public override void Close()
{
isCancelled = true;
downloadThreadCompleteCallback = CloseAction;
//ensure that close will run even if called after callback was fired.
if (finishedDownloading)
CloseAction();
while (downloadEnded is not null && !downloadEnded.WaitOne(1000)) ;
}
private void CloseAction()
{
_readFile.Close();
_writeFile.Close();
_networkStream?.Close();