node.js batch process records in parallel from mysql table

Objective: Assume there are millions of rows in a mysql table need to be processed using node js. Each row has a is_processed field to indicate if it has been processed or not, there is also a field called evenNum, it has to be set to equal to the double of the id of the same row.

Solution: Divide the total number of records into N batches, each batch containing X number of records that won’t exhaust the memory. Each batch is processed one after another and each record within a batch is also processed one after another. The function async.series is the key to make sure everything goes in order.

var mysql = require('mysql');
var dbpool = mysql.createPool(
	{
		"host": "localhost",
		"database": "test",
		"user": "your_username",
		"password": "your_password",
		"port": 3306
    });
var async = require('async');

//Third function triggered by batch()
//update the record of the provided id
function process(id) {
	var updateQuery = mysql.format('update test set is_processed=1, evenNum=id*2 where id=?', [id]);
	return function (cb) {
		dbpool.query(updateQuery, function (err, result) {
			if (err) {
				console.log("Error3: ", err);
				cb(err);
			} else {
				cb();
			}
		});
	};
}

//Seond function triggered by the init function run(), get batchSize records and then parallelLimit processes in parallel by calling the function process()  
//{number} batchSize, the number of records need to be processed
//{number} parallelLimit, the max number of processes run in parallel
function batch(batchSize, parallelLimit) {
	var selectQuery = mysql.format('select * from test where is_processed=0 limit ?', [batchSize]);
	var statusQuery = 'select count(*) as count from test where is_processed=1';
	return function (cb) {
		var toDo = [];
		dbpool.query(selectQuery, function (err, result) {
			if (err) {
				console.log("Error1: ", err);
				cb(err);
			} else {
				//pushing each process onto the toDo list
				for(var i=0; i<result.length; i++) {
					toDo.push(process(result[i].id));
				}

				//run each process from the toDo list in series
				async.parallelLimit(toDo, parallelLimit, function (err, result) {
					if (err) {
						console.log("Error2", err);
						cb(err);
					} else {
						//Get the number of processed records
						dbpool.query(statusQuery, function (err, total) {
							//console.log(total[0].count + " rows processed!");
							cb();
						})
					}
				});
			}
		});
	}
}

//Init function, divide the total number of records into N batches of each batch containing batchSize records
//put each batch in an array of toDo list, and each one is processed in series in the async.series
//{number} batchSize, required param, the max number of records can have in a batch
//{number} parallelLimit, the max number of processes run in parallel
//{number} total, optional param, the total number of record need to be processed, if is not defined, it will process all the unprocessed records in the table
function run(batchSize, parallelLimit, total) {
	function startBatches(total) {
		var toDo = [];
		console.log(total, " rows to process.");
		for(var i=0; i<total; i+=batchSize) {
			//pushing each batch onto the toDo list
			toDo.push(batch(batchSize, parallelLimit));
		}

		//run each batch from the toDo list in series
		async.series(toDo, function (err, results) {
			if(err) console.log("Done! Error: ", err);
			//console.log("Done!");
			dbpool.end();
			console.log(new Date());
		});		
	}

	if (typeof total === 'number') {
		startBatches(total);
	} else {
		dbpool.query("select count(*) as count from test where is_processed=0", function (err, result) {
			if (err) {
				console.log("Error1: ", err);
			} else {
				startBatches(result[0].count);
			}
		});
	}
}

console.log(new Date());
run(1000, 100);

mysql test table

CREATE TABLE `test` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `is_processed` tinyint(1) DEFAULT NULL,
  `evenNum` int(11) DEFAULT NULL,
  `last_update` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

load some data in the table

INSERT INTO test() VALUE ();
INSERT INTO test() VALUE ();
INSERT INTO test() VALUE (); 
-- ...

or create a file with a number in each line and load it into the table.

LOAD DATA LOCAL INFILE '/path/to/test/data/test.csv'
INTO TABLE test
LINES TERMINATED BY '\n'
(id);

Search within Codexpedia

Custom Search

Search the entire web

Custom Search