node.js batch process records in parallel from mysql table

Objective: Assume there are millions of rows in a mysql table need to be processed using node js. Each row has a is_processed field to indicate if it has been processed or not, there is also a field called evenNum, it has to be set to equal to the double of the id of the same row.

Solution: Divide the total number of records into N batches, each batch containing X number of records that won’t exhaust the memory. Each batch is processed one after another and each record within a batch is also processed one after another. The function async.series is the key to make sure everything goes in order.
[code language=”javascript”]
var mysql = require(‘mysql’);
var dbpool = mysql.createPool(
{
"host": "localhost",
"database": "test",
"user": "your_username",
"password": "your_password",
"port": 3306
});
var async = require(‘async’);

//Third function triggered by batch()
//update the record of the provided id
function process(id) {
var updateQuery = mysql.format(‘update test set is_processed=1, evenNum=id*2 where id=?’, [id]);
return function (cb) {
dbpool.query(updateQuery, function (err, result) {
if (err) {
console.log("Error3: ", err);
cb(err);
} else {
cb();
}
});
};
}

//Seond function triggered by the init function run(), get batchSize records and then parallelLimit processes in parallel by calling the function process()
//{number} batchSize, the number of records need to be processed
//{number} parallelLimit, the max number of processes run in parallel
function batch(batchSize, parallelLimit) {
var selectQuery = mysql.format(‘select * from test where is_processed=0 limit ?’, [batchSize]);
var statusQuery = ‘select count(*) as count from test where is_processed=1’;
return function (cb) {
var toDo = [];
dbpool.query(selectQuery, function (err, result) {
if (err) {
console.log("Error1: ", err);
cb(err);
} else {
//pushing each process onto the toDo list
for(var i=0; i<result.length; i++) {
toDo.push(process(result[i].id));
}

//run each process from the toDo list in series
async.parallelLimit(toDo, parallelLimit, function (err, result) {
if (err) {
console.log("Error2", err);
cb(err);
} else {
//Get the number of processed records
dbpool.query(statusQuery, function (err, total) {
//console.log(total[0].count + " rows processed!");
cb();
})
}
});
}
});
}
}

//Init function, divide the total number of records into N batches of each batch containing batchSize records
//put each batch in an array of toDo list, and each one is processed in series in the async.series
//{number} batchSize, required param, the max number of records can have in a batch
//{number} parallelLimit, the max number of processes run in parallel
//{number} total, optional param, the total number of record need to be processed, if is not defined, it will process all the unprocessed records in the table
function run(batchSize, parallelLimit, total) {
function startBatches(total) {
var toDo = [];
console.log(total, " rows to process.");
for(var i=0; i<total; i+=batchSize) {
//pushing each batch onto the toDo list
toDo.push(batch(batchSize, parallelLimit));
}

//run each batch from the toDo list in series
async.series(toDo, function (err, results) {
if(err) console.log("Done! Error: ", err);
//console.log("Done!");
dbpool.end();
console.log(new Date());
});
}

if (typeof total === ‘number’) {
startBatches(total);
} else {
dbpool.query("select count(*) as count from test where is_processed=0", function (err, result) {
if (err) {
console.log("Error1: ", err);
} else {
startBatches(result[0].count);
}
});
}
}

console.log(new Date());
run(1000, 100);
[/code]

mysql test table
[code language=”sql”]
CREATE TABLE `test` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`is_processed` tinyint(1) DEFAULT NULL,
`evenNum` int(11) DEFAULT NULL,
`last_update` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
[/code]

load some data in the table
[code language=”sql”]
INSERT INTO test() VALUE ();
INSERT INTO test() VALUE ();
INSERT INTO test() VALUE ();
— …
[/code]

or create a file with a number in each line and load it into the table.
[code language=”sql”]
LOAD DATA LOCAL INFILE ‘/path/to/test/data/test.csv’
INTO TABLE test
LINES TERMINATED BY ‘\n’
(id);
[/code]

Search within Codexpedia

Custom Search

Search the entire web

Custom Search