Skip to content

Commit

Permalink
- Finalizing stream support (TorrentStream)
Browse files Browse the repository at this point in the history
- Fixing an issue with 0 length files (fixed in APF)
- [App LibVLCStreaming] Updating with the Stream code
- Updating APF library
  • Loading branch information
SuRGeoNix committed Mar 4, 2021
1 parent c25c55c commit ab29190
Show file tree
Hide file tree
Showing 10 changed files with 192 additions and 124 deletions.
4 changes: 2 additions & 2 deletions BitSwarm (WinForms Demo)/BitSwarm (WinForms Demo).csproj
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@
<Reference Include="BencodeNET, Version=3.1.4.0, Culture=neutral, processorArchitecture=MSIL">
<HintPath>..\packages\BencodeNET.3.1.4\lib\netstandard2.0\BencodeNET.dll</HintPath>
</Reference>
<Reference Include="Partfiles, Version=1.2.0.0, Culture=neutral, processorArchitecture=MSIL">
<HintPath>..\packages\APF.1.2.0\lib\netstandard2.0\Partfiles.dll</HintPath>
<Reference Include="Partfiles, Version=1.3.0.0, Culture=neutral, processorArchitecture=MSIL">
<HintPath>..\packages\APF.1.3.0\lib\netstandard2.0\Partfiles.dll</HintPath>
</Reference>
<Reference Include="System" />
<Reference Include="System.Buffers, Version=4.0.3.0, Culture=neutral, PublicKeyToken=cc7b13ffcd2ddd51, processorArchitecture=MSIL">
Expand Down
2 changes: 1 addition & 1 deletion BitSwarm (WinForms Demo)/packages.config
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="APF" version="1.2.0" targetFramework="net472" />
<package id="APF" version="1.3.0" targetFramework="net472" />
<package id="BencodeNET" version="3.1.4" targetFramework="net472" />
<package id="System.Buffers" version="4.5.1" targetFramework="net472" />
<package id="System.IO.Pipelines" version="5.0.0" targetFramework="net472" />
Expand Down
14 changes: 8 additions & 6 deletions BitSwarm/BEP/Peer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,7 @@ private void Receive(int len)
long startedAt = 0;
long diffMs;
int lenAvailable;
bool bufferTimeoutUsed = false;

while ((lenAvailable = tcpClient.Client.Available) < len)
{
Expand All @@ -733,15 +734,16 @@ private void Receive(int len)
curLoop++;

// Piece Timeouts
if (status == Status.DOWNLOADING && curLoop % (Beggar.Options.PieceTimeout / 40) == 0)
if (status == Status.DOWNLOADING && (((!Beggar.FocusAreInUse || !Beggar.Options.EnableBuffering) && curLoop % (Beggar.Options.PieceTimeout / 40) == 0) || (Beggar.FocusAreInUse && Beggar.Options.EnableBuffering && curLoop % (Beggar.Options.PieceBufferTimeout / 40) == 0)))
{
PieceTimeouts++;
if (Beggar.FocusAreInUse && Beggar.Options.EnableBuffering) bufferTimeoutUsed = true;

if (PieceTimeouts == 1 && Beggar.Options.PieceRetries > 0 && lastPieces != null && lastPieces.Count > 0) Beggar.ResetRequests(this, lastPieces);
PieceTimeouts++;
if (PieceTimeouts == 1 && ((bufferTimeoutUsed && Beggar.Options.PieceBufferRetries > 0) || (!bufferTimeoutUsed && Beggar.Options.PieceRetries > 0)) && lastPieces != null && lastPieces.Count > 0) Beggar.ResetRequests(this, lastPieces);

if (Beggar.Options.Verbosity > 0) Log(4, $"[TIMEOUT] {PieceTimeouts} ({lenAvailable} < {len} , Requests: {PiecesRequested}, Pieces: {lastPieces.Count}, Timeouts: {PieceTimeouts})");

if (PieceTimeouts > Beggar.Options.PieceRetries)
if ((bufferTimeoutUsed && PieceTimeouts > Beggar.Options.PieceBufferRetries) || (!bufferTimeoutUsed && PieceTimeouts > Beggar.Options.PieceRetries))
{
if (Beggar.Options.Verbosity > 0) Log(4, $"[DROP] Piece Timeout ({lenAvailable} < {len} , Requests: {PiecesRequested}, Pieces: {lastPieces.Count}, Timeouts: {PieceTimeouts})");

Expand All @@ -768,10 +770,10 @@ private void Receive(int len)
throw new Exception("CUSTOM Connection closed");
}

// Drop Choked (after 30 seconds)
// Drop Choked (after 40 seconds with enough downloaders)
else if (!stageYou.unchoked)
{
if (diffMs > 30 * 1000)
if (diffMs > 40 * 1000 && Beggar.Stats.PeersDownloading > Beggar.Options.MaxThreads / 3)
{
if (Beggar.Options.Verbosity > 0) Log(4, $"[DROP] Choked Peer ({lenAvailable} < {len} , Requests: {PiecesRequested}), Pieces: {lastPieces.Count}, Timeouts: {PieceTimeouts})");
status = Status.FAILED2;
Expand Down
206 changes: 109 additions & 97 deletions BitSwarm/BEP/Torrent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Runtime.Serialization.Formatters.Binary;
using System.Security.Cryptography;
using System.Text.RegularExpressions;
using System.Threading;
using System.Web;

using BencodeNET.Parsing;
Expand Down Expand Up @@ -89,6 +90,50 @@ public struct TorrentFile
/// List of SHA1 Hashes for all torrent pieces
/// </summary>
public List<byte[]> pieces;

/// <summary>
/// Gets the array index of the specified file
/// </summary>
/// <param name="filename"></param>
/// <returns></returns>
public int GetFileIndex(string filename)
{
for (int i=0; i<paths.Count; i++)
if (paths[i].ToLower() == filename.ToLower()) return i;

return -1;
}

/// <summary>
/// Gets the size of the specified file
/// </summary>
/// <param name="filename"></param>
/// <returns></returns>
public long GetLength(string filename)
{
int ret = GetFileIndex(filename);
if (ret == -1) return -1;

return lengths[ret];
}

/// <summary>
/// Gets file position in the torrent
/// </summary>
/// <param name="filename"></param>
/// <returns></returns>
public long GetStartPosition(string filename)
{
long startPos = 0;

for (int i=0; i<paths.Count; i++)
{
if (paths[i].ToLower() == filename.ToLower()) return startPos;
startPos += lengths[i];
}

return -1;
}
}

/// <summary>
Expand Down Expand Up @@ -252,6 +297,15 @@ public Torrent (BitSwarm bitSwarm)
file.trackers = new List<Uri>();
}

public TorrentStream GetTorrentStream(string filename)
{
int fileIndex = file.GetFileIndex(filename);
if (fileIndex == -1 || data.files[file.GetFileIndex(filename)] == null) return null;


return new TorrentStream(this, data.files[file.GetFileIndex(filename)], file.GetStartPosition(filename));
}

public void FillFromMagnetLink(Uri magnetLink)
{
// TODO: Check v2 Magnet Link
Expand Down Expand Up @@ -350,7 +404,6 @@ public void FillFromInfo(BDictionary bInfo)

Partfiles.Options opt = new Partfiles.Options();
opt.AutoCreate = true;
StreamFiles = new Dictionary<string, TorrentStream>();
long startPos = 0;

if (isMultiFile)
Expand All @@ -373,13 +426,6 @@ public void FillFromInfo(BDictionary bInfo)
{
data.files[i] = new Partfile(file.paths[i], file.pieceLength, file.lengths[i], opt);
data.filesIncludes.Add(file.paths[i]);

string ext = Path.GetExtension(file.paths[i]);
if (MovieExts.Contains(ext.Substring(1,ext.Length-1)))
{
StreamFiles.Add(file.paths[i], new TorrentStream(data.files[i], startPos));
data.files[i].BeforeReading += Torrent_BeforeReading;
}
startPos += file.lengths[i];
}
}
Expand All @@ -397,13 +443,6 @@ public void FillFromInfo(BDictionary bInfo)
opt.PartOverwrite = true;

data.files[0] = new Partfile(Utils.GetValidFileName(file.name), file.pieceLength, file.length, opt);
string ext = Path.GetExtension(file.name);
if (MovieExts.Contains(ext.Substring(1,ext.Length-1)))
{
StreamFiles.Add(file.name, new TorrentStream(data.files[0], 0));
data.files[0].BeforeReading += Torrent_BeforeReading;
}

file.paths = new List<string>() { file.name };
file.lengths = new List<long>() { file.length };

Expand All @@ -430,7 +469,6 @@ public void FillFromInfo(BDictionary bInfo)

public void FillFromSession()
{
StreamFiles = new Dictionary<string, TorrentStream>();
data.files = new Partfile[file.paths == null ? 1 : file.paths.Count];
data.filesIncludes = new List<string>();

Expand All @@ -449,17 +487,8 @@ public void FillFromSession()
for (int i=0; i<file.paths.Count; i++)
{
if (!File.Exists(Path.Combine(data.folder, file.paths[i])) && File.Exists(Path.Combine(data.folderTemp, file.paths[i] + opt.PartExtension)))
{
data.files[i] = new Partfile(Path.Combine(data.folderTemp, file.paths[i] + opt.PartExtension), true, opt);

string ext = Path.GetExtension(file.paths[i]);
if (MovieExts.Contains(ext.Substring(1,ext.Length-1)))
{
StreamFiles.Add(file.paths[i], new TorrentStream(data.files[i], startPos));
data.files[i].BeforeReading += Torrent_BeforeReading;
}
}

data.filesIncludes.Add(file.paths[i]);
startPos += file.lengths[i];
}
Expand All @@ -471,15 +500,7 @@ public void FillFromSession()
string validFilename = Utils.GetValidFileName(file.name);

if (!File.Exists(Path.Combine(bitSwarm.OptionsClone.FolderComplete, validFilename)) && File.Exists(Path.Combine(bitSwarm.OptionsClone.FolderIncomplete, validFilename + opt.PartExtension)))
{
data.files[0] = new Partfile(Path.Combine(bitSwarm.OptionsClone.FolderIncomplete, validFilename + opt.PartExtension), true, opt);
string ext = Path.GetExtension(file.name);
if (MovieExts.Contains(ext.Substring(1,ext.Length-1)))
{
StreamFiles.Add(file.name, new TorrentStream(data.files[0], 0));
data.files[0].BeforeReading += Torrent_BeforeReading;
}
}

data.filesIncludes.Add(file.name);
}
Expand Down Expand Up @@ -625,106 +646,97 @@ protected virtual void Dispose(bool disposing)
foreach (Partfile file in data.files)
file?.Dispose();

if (data.pieceProgress != null) data.pieceProgress.Clear();

// Delete Completed Folder (If Empty)
if (data.folder != null && Directory.Exists(data.folder) && Directory.GetFiles(data.folder, "*", SearchOption.AllDirectories).Length == 0)
Directory.Delete(data.folder, true);

// Delete Temp Folder (If Empty)
if (data.folderTemp != null && Directory.Exists(data.folderTemp) && Directory.GetFiles(data.folderTemp, "*", SearchOption.AllDirectories).Length == 0)
Directory.Delete(data.folderTemp, true);
} catch (Exception) { }


} catch (Exception) { }
}

disposedValue = true;
}
}
public void Dispose() { Dispose(true); }
#endregion
}

#region Preparing Stream Support
[NonSerialized]
public static List<string> MovieExts = new List<string>() { "mp4", "m4v", "m4e", "mkv", "mpg", "mpeg" , "mpv", "mp4p", "mpe" , "m1v", "m2ts", "m2p", "m2v", "movhd", "moov", "movie", "movx", "mjp", "mjpeg", "mjpg", "amv" , "asf", "m4v", "3gp", "ogm", "ogg", "vob", "ts", "rm", "3gp", "3gp2", "3gpp", "3g2", "f4v", "f4a", "f4p", "f4b", "mts", "m2ts", "gifv", "avi", "mov", "flv", "wmv", "qt", "avchd", "swf", "cam", "nsv", "ram", "rm", "x264", "xvid", "wmx", "wvx", "wx", "video", "viv", "vivo", "vid", "dat", "bik", "bix", "dmf", "divx" };
public class TorrentStream : PartStream
{
public long StartPos { get; set; }
public long EndPos { get; set; }
public int LastPiece { get; set; }

[NonSerialized]
public Dictionary<string, TorrentStream> StreamFiles = new Dictionary<string, TorrentStream>();
Torrent torrent;
bool cancel;

public class TorrentStream
public TorrentStream(Torrent torrent, Partfile pf, long distance) : base(pf)
{
public Partfile Stream { get; set; }
public long StartPos { get; set; }
public long EndPos { get; set; }

public TorrentStream(Partfile pf, long distance) { Stream = pf; StartPos = distance; EndPos = StartPos + pf.Size; }
this.torrent = torrent;
StartPos = torrent.file.GetStartPosition(pf.Filename);
EndPos = StartPos + Length;
LastPiece = FilePosToPiece(Length);
}
public void PrepareStreamFiles()
{
/* TODO
*
* 1. Alphanumeric sorting
* 2. Add FirstFilePiece / LastFilePiece ?
*/
long startPos = 0;
if (StreamFiles == null) StreamFiles = new Dictionary<string, TorrentStream>();

for (int i=0; i<file.paths.Count; i++)
{
string ext = Path.GetExtension(file.paths[i]);
if (ext == null || ext.Trim() == "") { startPos += file.lengths[i]; continue; }
public void Cancel() { cancel = true; }

if (MovieExts.Contains(ext.Substring(1,ext.Length-1)))
{
StreamFiles.Add(file.paths[i], new TorrentStream(data.files[i], startPos));
data.files[i].BeforeReading += Torrent_BeforeReading;
}
/// <summary>
/// Reads the specified actual bytes from the part file until they are available or cancel
/// </summary>
/// <param name="buffer"></param>
/// <param name="offset"></param>
/// <param name="count"></param>
/// <returns>Bytes read size | -2 on Error | -1 on Cancel</returns>
public override int Read(byte[] buffer, int offset, int count)
{
int startPiece = FilePosToPiece(Position);
int endPiece = FilePosToPiece(Position + count);

startPos += file.lengths[i];
}
}
if (!torrent.bitSwarm.FocusAreInUse)
torrent.bitSwarm.FocusArea = new Tuple<int, int>(startPiece, LastPiece); // Set every time to follow the decoder?

private void Torrent_BeforeReading(Partfile pf, Partfile.BeforeReadingEventArgs e)
{
/* TODO
*
* 1. Open/Seek Piece Timeouts for faster buffering
* 2. Cancellation support?
* 3. Review insist on Focus Area
* 4. Open/Seek timeouts | When FA changes to different areas we should use a different timeout (for few reads?)
*/
TorrentStream curStream = StreamFiles[pf.Filename];

int startPiece = FilePosToPiece(curStream, e.Position);
int endPiece = FilePosToPiece(curStream, e.Position + e.Count);
int lastPiece = FilePosToPiece(curStream, pf.Size);

bitSwarm.FocusArea = new Tuple<int, int>(startPiece, lastPiece); // Set in every seek?

if (data.progress.GetFirst0(startPiece, endPiece) != -1)
if (torrent.data.progress.GetFirst0(startPiece, endPiece) != -1)
{
try
lock(torrent) // Only one FA can be active
{
Console.WriteLine($"[FA: {startPiece} - {endPiece}] Buffering");
while (data.progress.GetFirst0(startPiece, endPiece) != -1)
{
bitSwarm.FocusArea = new Tuple<int, int>(startPiece, lastPiece);
cancel = false;
torrent.bitSwarm.FocusAreInUse = true;
torrent.bitSwarm.Log($"[FOCUS {startPiece} - {endPiece}] Buffering Started");

System.Threading.Thread.Sleep(25);
while (torrent.data.progress.GetFirst0(startPiece, endPiece) != -1 && !cancel)
{
torrent.bitSwarm.FocusArea = new Tuple<int, int>(startPiece, LastPiece);
Thread.Sleep(25);
}

Console.WriteLine($"[FA: {startPiece} - {endPiece}] Done");
torrent.bitSwarm.FocusAreInUse = false;

if (cancel)
{
torrent.bitSwarm.Log($"[FOCUS {startPiece} - {endPiece}] Buffering Cancel");
return -1;
}
else
torrent.bitSwarm.Log($"[FOCUS {startPiece} - {endPiece}] Buffering Done");
}
catch (Exception e2) { Console.WriteLine("[Torrent] Error " + e2.Message); }
}

return base.Read(buffer, offset, count);
}

private int FilePosToPiece(TorrentStream ts, long filePos)
public int FilePosToPiece(long filePos)
{
int piece = (int)((ts.StartPos + filePos) / file.pieceLength);
if (piece >= data.pieces) piece = data.pieces - 1;
int piece = (int)((StartPos + filePos) / torrent.file.pieceLength);
if (piece >= torrent.data.pieces) piece = torrent.data.pieces - 1;

return piece;
}
#endregion

}
#pragma warning restore CS1591 // Missing XML comment for publicly visible type or member
}
4 changes: 3 additions & 1 deletion BitSwarm/BitSwarm.cs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ public StatsUpdatedArgs(Stats stats)
public bool isStopped => status == Status.STOPPED;
public bool isDHTRunning => dht.status == DHT.Status.RUNNING;
public string LoadedSessionFile { get; set; }

public bool FocusAreInUse { get; set; }
public Tuple<int, int> FocusArea { get { lock (lockerTorrent) return focusArea; } set { lock (lockerTorrent) focusArea = value; } }

internal Tuple<int, int> focusArea;
Expand All @@ -149,7 +151,7 @@ public StatsUpdatedArgs(Stats stats)
readonly object lockerMetadata = new object();

// Generators (Hash / Random)
private static SHA1 sha1 = new SHA1Managed();
private SHA1 sha1 = new SHA1Managed();
private static Random rnd = new Random();
internal byte[] peerID;

Expand Down
Loading

0 comments on commit ab29190

Please sign in to comment.