node.js batch process records in parallel from mysql table
Objective: Assume there are millions of rows in a mysql table need to be processed using node js. Each row has a is_processed field to indicate if it has been processed or not, there is also a field called evenNum, it has to be set to equal to the double of the id of the same row.
Solution: Divide the total number of records into N batches, each batch containing X number of records that won’t exhaust the memory. Each batch is processed one after another and each record within a batch is also processed one after another. The function async.series is the key to make sure everything goes in order.
var mysql = require('mysql'); var dbpool = mysql.createPool( { "host": "localhost", "database": "test", "user": "your_username", "password": "your_password", "port": 3306 }); var async = require('async'); //Third function triggered by batch() //update the record of the provided id function process(id) { var updateQuery = mysql.format('update test set is_processed=1, evenNum=id*2 where id=?', [id]); return function (cb) { dbpool.query(updateQuery, function (err, result) { if (err) { console.log("Error3: ", err); cb(err); } else { cb(); } }); }; } //Seond function triggered by the init function run(), get batchSize records and then parallelLimit processes in parallel by calling the function process() //{number} batchSize, the number of records need to be processed //{number} parallelLimit, the max number of processes run in parallel function batch(batchSize, parallelLimit) { var selectQuery = mysql.format('select * from test where is_processed=0 limit ?', [batchSize]); var statusQuery = 'select count(*) as count from test where is_processed=1'; return function (cb) { var toDo = []; dbpool.query(selectQuery, function (err, result) { if (err) { console.log("Error1: ", err); cb(err); } else { //pushing each process onto the toDo list for(var i=0; i<result.length; i++) { toDo.push(process(result[i].id)); } //run each process from the toDo list in series async.parallelLimit(toDo, parallelLimit, function (err, result) { if (err) { console.log("Error2", err); cb(err); } else { //Get the number of processed records dbpool.query(statusQuery, function (err, total) { //console.log(total[0].count + " rows processed!"); cb(); }) } }); } }); } } //Init function, divide the total number of records into N batches of each batch containing batchSize records //put each batch in an array of toDo list, and each one is processed in series in the async.series //{number} batchSize, required param, the max number of records can have in a batch //{number} parallelLimit, the max number of processes run in parallel //{number} total, optional param, the total number of record need to be processed, if is not defined, it will process all the unprocessed records in the table function run(batchSize, parallelLimit, total) { function startBatches(total) { var toDo = []; console.log(total, " rows to process."); for(var i=0; i<total; i+=batchSize) { //pushing each batch onto the toDo list toDo.push(batch(batchSize, parallelLimit)); } //run each batch from the toDo list in series async.series(toDo, function (err, results) { if(err) console.log("Done! Error: ", err); //console.log("Done!"); dbpool.end(); console.log(new Date()); }); } if (typeof total === 'number') { startBatches(total); } else { dbpool.query("select count(*) as count from test where is_processed=0", function (err, result) { if (err) { console.log("Error1: ", err); } else { startBatches(result[0].count); } }); } } console.log(new Date()); run(1000, 100);
mysql test table
CREATE TABLE `test` ( `id` int(11) NOT NULL AUTO_INCREMENT, `is_processed` tinyint(1) DEFAULT NULL, `evenNum` int(11) DEFAULT NULL, `last_update` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
load some data in the table
INSERT INTO test() VALUE (); INSERT INTO test() VALUE (); INSERT INTO test() VALUE (); -- ...
or create a file with a number in each line and load it into the table.
LOAD DATA LOCAL INFILE '/path/to/test/data/test.csv' INTO TABLE test LINES TERMINATED BY '\n' (id);
Search within Codexpedia
Search the entire web