Skip to content

Commit

Permalink
Add ability to throttle disk reads
Browse files Browse the repository at this point in the history
Useful for limiting disk load, but could also be a crude way to limit CPU usage
  • Loading branch information
animetosho committed Jan 19, 2025
1 parent ee72171 commit 5a83691
Show file tree
Hide file tree
Showing 7 changed files with 394 additions and 50 deletions.
35 changes: 35 additions & 0 deletions bin/parpar.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,31 @@ var print_json = function(type, obj) {
};
var arg_parser = require('../lib/arg_parser.js');

var throttleParseFunc = function(prop, v) {
if(!v || v == '0') return null;
var m = (''+v).match(/^(([0-9.]*)([bkmgtper])\/)?([0-9.]*)(m?s|[mhdw])$/i);
if(!m) error('Invalid format for `--'+prop+'-read-throttle`: ' + v);
if(m[4] === '') m[4] = '1';
if(m[1]) {
if(m[2] === '') m[2] = '1';
if(m[3] == 'r' || m[3] == 'R') return {
mode: 'reqrate',
count: m[2],
time: arg_parser.parseTime(m[4] + m[5])
};
else return {
mode: 'rate',
size: arg_parser.parseSize(m[2] + m[3]),
time: arg_parser.parseTime(m[4] + m[5])
};
} else {
return {
mode: 'delay',
time: arg_parser.parseTime(m[4] + m[5])
};
}
};

var opts = {
'input-slices': {
alias: 's',
Expand Down Expand Up @@ -173,6 +198,16 @@ var opts = {
type: 'int',
map: 'chunkReadThreads'
},
'seq-read-throttle': {
type: 'string',
map: 'seqReadThrottle',
fn: throttleParseFunc.bind(null, 'seq')
},
'chunk-read-throttle': {
type: 'string',
map: 'chunkReadThrottle',
fn: throttleParseFunc.bind(null, 'chunk')
},
'read-buffers': {
type: 'int',
map: 'readBuffers'
Expand Down
7 changes: 7 additions & 0 deletions help-full.txt
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,13 @@ I/O Tuning Options:
Default `2`
--read-buffers Maximum number of read buffers to read into and
send to processing backend. Default `8`
--seq-read-throttle Throttle sequential read speed. The following
examples demonstrate permitted values:
`2.5s`: delay each read request by 2.5 seconds
`2.5M/s`: limit to 2.5MB per second
`5r/2s`: limit of 5 requests per 2 seconds
--chunk-read-throttle This is the same as `--seq-read-throttle` but
applies to chunked reading.
--read-hash-queue Number of read buffers to queue up for hashing
before reading from a different file. Lower
values may be more optimal on disks with faster
Expand Down
37 changes: 23 additions & 14 deletions lib/filechunkreader.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
var fs = require('fs');
var async = require('async');
var ProcQueue = require('./procqueue');
var ThrottleQueue = require('./throttlequeue');
var bufferSlice = Buffer.prototype.readBigInt64BE ? Buffer.prototype.subarray : Buffer.prototype.slice;

function FileChunkReader(files, sliceSize, chunkSize, chunkOffset, bufPool, concurrency, cbChunk, cb) {
function FileChunkReader(files, sliceSize, chunkSize, chunkOffset, bufPool, concurrency, throttle, cbChunk, cb) {
var readQ = new ProcQueue(concurrency);
var readErr = null;
if(!throttle) throttle = new ThrottleQueue.NoThrottle();
async.eachSeries(files, function(file, cb) {
if(file.size == 0 || file.size <= chunkOffset) return cb();
fs.open(file.name, 'r', function(err, fd) {
Expand All @@ -20,26 +22,33 @@ function FileChunkReader(files, sliceSize, chunkSize, chunkOffset, bufPool, conc
bufPool.get(function(buffer) {
readQ.run(function(readDone) {
if(readErr) return cb(readErr);
fs.read(fd, buffer, 0, chunkSize, filePos, function(err, bytesRead) {
if(err) readErr = err;
else cbChunk(file, bufferSlice.call(buffer, 0, bytesRead), sliceNum, bufPool.put.bind(bufPool, buffer));

if(--chunksLeft == 0) {
// all chunks read from this file, so close it
fs.close(fd, function(err) {
if(err) readErr = err;
throttle.pass(chunkSize, function(cancelled, throttleReadDone) {
if(cancelled) return cb();
fs.read(fd, buffer, 0, chunkSize, filePos, function(err, bytesRead) {
throttleReadDone();
if(err) readErr = err;
else cbChunk(file, bufferSlice.call(buffer, 0, bytesRead), sliceNum, bufPool.put.bind(bufPool, buffer));

if(--chunksLeft == 0) {
// all chunks read from this file, so close it
fs.close(fd, function(err) {
if(err) readErr = err;
readDone();
});
} else
readDone();
});
} else
readDone();
});
cb();
});
cb();
});
});
}, cb);
});
}, function(err) {
if(err) return cb(err);
if(err) {
throttle.cancel();
return cb(err);
}
readQ.end(function() {
cb(readErr);
});
Expand Down
75 changes: 41 additions & 34 deletions lib/fileseqreader.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"use strict";

var fs = require('fs');
var ThrottleQueue = require('./throttlequeue');
var allocBuffer = (Buffer.allocUnsafe || Buffer);

function FileReaderData(file, buffer, len, pos, parent) {
Expand All @@ -25,14 +26,15 @@ FileReaderData.prototype = {
}
};

function FileSeqReader(files, readSize, readBuffers) {
function FileSeqReader(files, readSize, readBuffers, throttleQ) {
this.fileQueue = files.filter(function(file) {
return file.size > 0;
});
this.buf = [];
this.readSize = readSize;
this.maxBufs = readBuffers;
this.openFiles = [];
this.throttleQ = throttleQ || (new ThrottleQueue.NoThrottle());
}

FileSeqReader.prototype = {
Expand All @@ -46,6 +48,7 @@ FileSeqReader.prototype = {
cb: null,
finishCb: null,
_isReading: false,
throttleQ: null,

// when doing sequential read with chunker, caller requires the first chunkLen bytes of every slice, so ensure that this always arrives as one piece
reqSliceLen: 0,
Expand Down Expand Up @@ -110,40 +113,44 @@ FileSeqReader.prototype = {
_doRead: function(file, buffer) {
var self = this;
var readSize = this._readSize(file.pos, file.info.size);
fs.read(file.fd, buffer, 0, readSize[0], null, function(err, bytesRead) {
if(err) return self.cb(err);

// file position/EOF tracking
var newPos = file.pos + bytesRead;
if(newPos > file.info.size)
return self.cb(new Error('Read past expected end of file - latest position (' + newPos + ') exceeds size (' + file.info.size + ')'));

var eof = (newPos == file.info.size);
if(!eof && bytesRead != readSize[0])
return self.cb(new Error("Read failure - expected " + readSize[0] + " bytes, got " + bytesRead + " bytes instead."));

// increase hashing count and wait for other end to signal when done
var ret = new FileReaderData(file, buffer, bytesRead, file.pos, self);
if(readSize[1])
ret.chunks = readSize[1];
file.hashQueue++;
file.pos += bytesRead;
self.cb(null, ret);

if(eof) {
// remove from openFiles
for(var i=0; i<self.openFiles.length; i++)
if(self.openFiles[i].fd == file.fd) {
self.openFiles.splice(i, 1);
break;
}
this.throttleQ.pass(readSize[0], function(cancelled, readDone) {
if(cancelled) return; // this should never happen because we only read once at a time
fs.read(file.fd, buffer, 0, readSize[0], null, function(err, bytesRead) {
readDone();
if(err) return self.cb(err);

fs.close(file.fd, function(err) {
if(err) self.cb(err);
else self.readNext();
});
} else
self.readNext();
// file position/EOF tracking
var newPos = file.pos + bytesRead;
if(newPos > file.info.size)
return self.cb(new Error('Read past expected end of file - latest position (' + newPos + ') exceeds size (' + file.info.size + ')'));

var eof = (newPos == file.info.size);
if(!eof && bytesRead != readSize[0])
return self.cb(new Error("Read failure - expected " + readSize[0] + " bytes, got " + bytesRead + " bytes instead."));

// increase hashing count and wait for other end to signal when done
var ret = new FileReaderData(file, buffer, bytesRead, file.pos, self);
if(readSize[1])
ret.chunks = readSize[1];
file.hashQueue++;
file.pos += bytesRead;
self.cb(null, ret);

if(eof) {
// remove from openFiles
for(var i=0; i<self.openFiles.length; i++)
if(self.openFiles[i].fd == file.fd) {
self.openFiles.splice(i, 1);
break;
}

fs.close(file.fd, function(err) {
if(err) self.cb(err);
else self.readNext();
});
} else
self.readNext();
});
});
},

Expand Down
42 changes: 40 additions & 2 deletions lib/par2gen.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ var fs = require('fs');
var path = require('path');
var FileSeqReader = require('./fileseqreader');
var FileChunkReader = require('./filechunkreader');
var ThrottleQueue = require('./throttlequeue');
var BufferPool = require('./bufferpool');
var PAR2OutFile = require('./par2outfile');
var bufferSlice = Buffer.prototype.readBigInt64BE ? Buffer.prototype.subarray : Buffer.prototype.slice;
Expand Down Expand Up @@ -187,6 +188,34 @@ function calcNumRecoverySlices(spec, sliceSize, inSlices, files) {
}, 0);
}

function createThrottle(opts) {
var throttleTime = opts.time|0;
switch(opts.mode) {
case 'delay':
if(throttleTime <= 0)
throw new Error('Invalid throttle delay (' + throttleTime + ')');
return new ThrottleQueue.DelayThrottle(throttleTime);
case 'rate':
if(!throttleTime) throttleTime = 1000;
if(throttleTime <= 0)
throw new Error('Invalid throttle time (' + throttleTime + ')');
var throttleSize = opts.size|0;
if(throttleSize <= 0)
throw new Error('Invalid throttle size (' + throttleSize + ')');
return new ThrottleQueue.RateThrottle(throttleSize, throttleTime);
case 'reqrate':
if(!throttleTime) throttleTime = 1000;
if(throttleTime <= 0)
throw new Error('Invalid throttle time (' + throttleTime + ')');
var throttleCount = opts.count|0;
if(throttleCount <= 0 || throttleCount != +opts.count)
throw new Error('Invalid throttle count (' + throttleCount + ')');
return new ThrottleQueue.ReqRateThrottle(throttleCount, throttleTime);
default:
throw new Error('Unknown throttle mode (' + opts.mode + ')');
}
}

// use negative value for sliceSize to indicate exact number of input blocks
function PAR2Gen(fileInfo, sliceSize, opts) {
if(!(this instanceof PAR2Gen))
Expand Down Expand Up @@ -240,6 +269,8 @@ function PAR2Gen(fileInfo, sliceSize, opts) {
displayNameBase: '.', // base path, only used if displayNameFormat is 'path'
seqReadSize: 4*1048576, // 4MB
chunkReadThreads: 2,
seqReadThrottle: null, // no throttle, otherwise format is {mode: ('delay'|'rate'|'reqrate') [,time: <ms>] [,size: <bytes>] [,count: <num>]}
chunkReadThrottle: null, // same as above
readBuffers: 8,
readHashQueue: 5,
numThreads: null, // null => number of processors
Expand Down Expand Up @@ -715,6 +746,11 @@ function PAR2Gen(fileInfo, sliceSize, opts) {
throw new Error('Minimum chunk size (' + friendlySize(o.minChunkSize) + ') cannot exceed read buffer size (' + friendlySize(o.seqReadSize) + ')');
}

if(o.seqReadThrottle)
this.seqReadThrottle = createThrottle(o.seqReadThrottle);
if(o.chunkReadThrottle)
this.chunkReadThrottle = createThrottle(o.chunkReadThrottle);

if(o.memoryLimit) {
var cpuMinChunk = Math.ceil(cpuRatio * o.minChunkSize /2) *2;
if(o.minChunkSize && (o.recDataSize+1) * o.minChunkSize > o.memoryLimit)
Expand Down Expand Up @@ -1014,6 +1050,8 @@ PAR2Gen.prototype = {
chunkOffset: 0,
readSize: 0,
_buf: null,
seqReadThrottle: null,
chunkReadThrottle: null,

_rfPush: function(numSlices, sliceOffsetOrExponents, critPackets, creator) {
var packets, recvSize = 0, critTotalSize = sumSize(critPackets);
Expand Down Expand Up @@ -1306,15 +1344,15 @@ PAR2Gen.prototype = {
if(seeking) {
if(!self._chunker) return cb(new Error('Trying to perform chunked reads without a chunker'));
var bufPool = new BufferPool(this._buf, chunkSize, this.opts.readBuffers);
FileChunkReader(this.files, this.opts.sliceSize, chunkSize, this.chunkOffset, bufPool, this.opts.chunkReadThreads, function(file, buffer, sliceNum, cb) {
FileChunkReader(this.files, this.opts.sliceSize, chunkSize, this.chunkOffset, bufPool, this.opts.chunkReadThreads, this.chunkReadThrottle, function(file, buffer, sliceNum, cb) {
if(cbProgress) cbProgress('processing_slice', file, sliceNum);
self._chunker.processData(file.sliceOffset+sliceNum, buffer, cb);
}, function(err) {
if(err) cb(err);
else bufPool.end(cb);
});
} else {
var reader = new FileSeqReader(this.files, this.readSize, this.opts.readBuffers);
var reader = new FileSeqReader(this.files, this.readSize, this.opts.readBuffers, this.seqReadThrottle);
reader.setBuffers(this._buf);
reader.maxQueuePerFile = this.opts.readHashQueue;

Expand Down
Loading

0 comments on commit 5a83691

Please sign in to comment.