A parallel MongoDB client with Perl and fork

Concurrency is hard, and that's just as true in Perl as it is in most languages. While Perl has threads, they aren't lightweight, so they aren't an obvious answer to parallel processing the way they are elsewhere. In Perl, doing concurrent work generally means (a) a non-blocking/asynchronous framework or (b) forking sub-processes as workers.

There is no officially-supported async MongoDB driver for Perl (yet), so this article is about forking.

The problem with forking a MongoDB client object is that forks don't automatically close sockets. And having two (or more) processes trying to use the same socket is a recipe for corruption.

At one point in the design of the MongoDB Perl driver v1.0.0, I had it cache the PID on creation and then check if it had changed before every operation. If so, the socket to the MongoDB server would be closed and re-opened. It was auto-magic!

The problem with this approach is that it incurs overhead on every operation, regardless of whether forks are in use. Even if forks are used, they are rare compared to the frequency of database operations for any non-trivial program.

So I took out that mis-feature. Now, you must manually call the reconnect method on your client objects after you fork (or spawn a thread, too).

Here's a pattern I've found myself using from time to time to do parallel processing with Parallel::ForkManager, adapted to reconnect the MongoDB client object in each child:

use Parallel::ForkManager;

# Pass in a MongoDB::MongoClient object, the number of parallel jobs to
# run, and a code-reference to execute. The code reference is passed
# the client and the iteration number.
sub parallel_mongodb {
    my ( $client, $jobs, $fcn ) = @_;

    my $pm = Parallel::ForkManager->new( $jobs > 1 ? $jobs : 0 );

    local $SIG{INT} = sub {
        warn "Caught SIGINT; Waiting for child processes\n";
        $pm->wait_all_children;
        exit 1;
    };

    for my $i ( 0 .. $jobs - 1 ) {
        $pm->start and next;
        $SIG{INT} = sub { $pm->finish };
        $client->reconnect;
        $fcn->( $i );
        $pm->finish;
    }

    $pm->wait_all_children;
}

To use this subroutine, I partition the input data into the number of jobs to run. Then I call parallel_mongodb with a closure that can find the input data from the job number:

use MongoDB;

# Partition input data into N parts.  Assume each is a document to insert.
my @data = (
   [ { a => 1 },  {b => 2},  ... ],
   [ { m => 11 }, {n => 12}, ... ],
   ...
);
my $number_of_jobs = @data;

my $client = MongoDB->connect;
my $coll = $client->ns("test.dataset");

parallel_mongodb( $client, $number_of_jobs,
  sub {
    $coll->insert_many( $data[ shift ], { ordered => 0 } );
  }
);

Of course, you want to be careful that the job count (i.e. the partition count) is optimal. I find that having it roughly equal to the number of CPUs tends to work pretty well in practice.

What you don't want to do, however, is to call $pm->start more than the number of child tasks you want running in parallel. You don't want a new process for every data item to process, since each fork also has to reconnect to the database, which is slow. That's why you should figure out the partitioning first, and only spawn a process per partition.

This is best for "embarrassingly parallel" problems, where there's no need for communication back from the child processes. And while what I've shown does a manual partition into arrays, you could also do this with a single array, where child workers only processes indices where the index modulo the number of jobs is equal to the job ID. Or you could have child workers pulling from a common task queue over a network, etc.

TIMTOWTDI, and now you can do it in parallel.

This entry was posted in mongodb, perl programming and tagged , , , . Bookmark the permalink. Both comments and trackbacks are currently closed.

© 2009-2017 David Golden All Rights Reserved