At the end of my recent post on building a cron service for your Zend Framework application, I mentioned a couple of weaknesses in the approach I took. Most notably, my cron service lacked any kind of locking mechanism to prevent cron runs from overlapping. When I wrote that, I was planning to add a service-level locking mechanism as described in Abhinav Singh's blog last month; however, Greg's comment on my post gave me a different idea.

Thing is, some cron tasks take forever, and some are over in seconds. If I lock at the service layer and the longest task takes longer than the cron interval, all the other shorter tasks get locked off too…which means they don't run nearly as often as they should.

Commenter Greg suggested the solution: "You could get around this by having each task maintain a lock file and spawning child processes to run the tasks in parallel." With each task maintaining its own lock, long tasks can take forever without keeping shorter tasks from running next time cron is called. Also, by running each task in a separate process, we can let a lot of those shorter, less intensive tasks run concurrently so there's less chance of an overlap for any one of them.

So how do we go about doing this? Well, there are a few main things we need to change about the previous approach:

  1. Task classes must implement some kind of locking mechanism. For starters, we'll just put this into the interface to enforce it; however, we'll also provide an abstract cron task that implements a sensible default locking mechanism.
  2. Tasks need to be run in parallel processes using pcntl_fork().
  3. The cron service that spawns all those processes needs to be able to collect any error messages they produce and aggregate them into the final output.

I'm busy

First off, we'll need to make some slight adjustments to our Blahg_Plugin_Cron_CronInterface interface to ensure that all cron task plugins have a locking mechanism. The interface changes are pretty simple:

<?php
interface Blahg_Plugin_Cron_CronInterface
{
    public function __construct($args = null);
 
    /**
     * Lock
     * @return integer pid of this process
     * @throws Blahg_Plugin_Cron_Exception if already locked
     */
    public function lock();
 
    /**
     * Unlock
     * @return boolean true if successful
     * @throws Blahg_Plugin_Cron_Exception if an error occurs
     */
    public function unlock();
 
    /**
     * Is locked
     * @return integer|boolean pid of existing process or false if there isn't one
     */
    public function isLocked();
 
    public function run();
}

Cron tasks will now be required to implement basic locking. We should also provide a sensible default for how this locking is to be done by defining an abstract task class. In this example, we'll use temporary files to manage the locks:

<?php
abstract class Blahg_Plugin_Cron_CronAbstract implements Blahg_Plugin_Cron_CronInterface
{
    public function lock()
    {   
        if ($pid = $this->isLocked()) {
            throw new Blahg_Plugin_Cron_Exception('This task is already locked.');
        }
 
        $pid = getmypid();
        if (!file_put_contents($this->_getLockFile(), $pid)) {
            throw new Blahg_Plugin_Cron_Exception('A lock could not be obtained.');
        }
 
        return $pid;
    }
 
    public function unlock()
    {   
        if (!file_exists($this->_getLockFile())) {
            throw new Blahg_Plugin_Cron_Exception('This task is not locked.');
        }
 
        if (!unlink($this->_getLockFile())) {
            throw new Blahg_Plugin_Cron_Exception('The lock could not be deleted.');
        }
 
        return true;
    }
 
    public function isLocked()
    {
        if (!file_exists($this->_getLockFile())) {
            return false;
        }
 
        return true;
    }
 
    protected function _getLockFile()
    {   
        $fileName = 'cron.' . get_class($this) . '.lock';
        $lockFile = realpath(APPLICATION_PATH . '/../tmp/') . '/' . $fileName;
        return $lockFile;
    }
}

Pretty self-explanatory; if the lock file is present, the task is considered locked.

In the event of an emergency

Our original approach to error handling was pretty simple; the service layer wrapped each action's run() call in a try...catch block, and stored the error messages for any exception it caught in the Blahg_Service_Cron::$_errors array for later output.

This method works great when you're running all your actions in a row as part of the same process. However, when we start using pcntl_fork() to run each task in a child process, the parent process's $_errors array won't contain anything added by its children. To fix this, we're going to need to store our error messages somewhere other than the current process's memory.

To do this, we first remove the existing Errors() methods and associated member variables. We're going to replace them with something even simpler: a Zend_Log instance tied to the parent process. As long as we initialize the log instance before we fork, every child process will have a reference to the exact same log file as the parent; therefore any messages they pass to it will be easily retrieved by the parent later on. Here's the code for setting up the log; we'll get to the actual usage in a bit:

<?php
    protected $_log;
 
    public function getLogFile()
    {   
        return realpath(APPLICATION_PATH . '/../log/') . '/cron.' . getmypid() . '.log';
    }
 
    public function getLog()
    {   
        if (null === $this->_log) {
            $writer = new Zend_Log_Writer_Stream($this->getLogFile());
            $formatter = new Zend_Log_Formatter_Simple('%timestamp% %priorityName% (%priority%): %message%' . PHP_EOL);
            $writer->setFormatter($formatter);
 
            $log = new Zend_Log();
            $log->addWriter($writer);
            $this->setLog($log);
        }
        return $this->_log;
    }
 
    public function setLog(Zend_Log $log)
    {   
        if (null !== $this->_log) {
            // Letting the log be set and re-set by various processes could result in child processes
            // using a different log file than the parent process; we can't have that.
            throw new Blahg_Service_Exception('The log has already been established; it cannot be set again.');
        }
        $this->_log = $log;
        return $this;
    }

Now, when it comes time to re-work our error collection code, we'll have somewhere consistent to send the errors. So far so good.

Cron spawn

Remember the foreach loop in our original Blahg_Service_Cron::run() implementation? Essentially, it looped through an array of task class names, instantiated each of them, called their run() methods, and collected any exceptions thrown in its own error array. In this edition, we'll want to modify that loop such that it runs each task in a separate child process. The new method looks like this:

<?php
    public function run()
    {
        // Initialize the log before we fork; that way child processes will
        // have a reference to the same log as the parent process.
        $log = $this->getLog();
        $children = array();
        foreach ($this->_actions as $key => $action) {
            $class = $this->getLoader()->load($action);
            if (null !== $this->_actionsArgs[$key]) {
                $action = new $class($this->_actionsArgs[$key]);
            } else {
                $action = new $class;
            }
 
            if (!($action instanceof Blahg_Plugin_Cron_CronInterface)) {
                throw new Blahg_Service_Exception('One of the specified actions is not the right kind of class.');
            }
 
            // Check to see if this task is locked (currently running,
            // probably due to an earlier cron run); if it is, don't run
            // it again.
            if ($action->isLocked()) {
                continue;
            }
 
            $pid = pcntl_fork();
            if ($pid == -1) {
                $log->err('Could not fork.');
                continue;
            } else if ($pid == 0) {
                // This is the child.
                $mypid = getmypid();
 
                unset($children);
                try {
                    $action->lock();
                    $action->run();
                } catch (Blahg_Plugin_Cron_Exception $e) {
                    $log->err('[' . $mypid . '] ' . $e->getMessage());
                } catch (Exception $e) {
                    if (APPLICATION_ENV == 'development') {
                        $log->err('[' . $mypid . '] [DEV]: ' . $e->getMessage());
                    } else {
                        $log->err('[' . $mypid . '] An undefined error occurred.');
                    }
                }
 
                // Unlock regardless of results.
                try {
                    $action->unlock();
                } catch (Exception $e) {
                    if (APPLICATION_ENV == 'development') {
                        $log->err('[' . $mypid . '] [DEV]: ' . $e->getMessage());
                    } else {
                        $log->err('[' . $mypid . '] An unlocking error occurred.');
                    }
                }
 
                // Child process doesn't need to continue; it's done its job.
                exit;
            } else {
                // This is the parent.
                $children[] = $pid;
            }
        }
 
        // Now that we've started all the actions, we just need to wait
        // for them to finish and clean everything up.  The following
        // gets rid of the zombie processes leftover when the child
        // processes die.
        foreach ($children as $child) {
            pcntl_waitpid($child, $status);
        }
 
        // At this point all the child processes should be finished; we can
        // output the log.  Save a copy as "cron.latest.log" so we can look
        // it over if necessary, but in general if there's any output it'll
        // be emailed to the cron runner user anyway.
        $output = file_get_contents($this->getLogFile());
        rename($this->getLogFile(), realpath(APPLICATION_PATH . '/../log/') . '/cron.latest.log');
        return $output;
    }

This may look like a whole lot of new code, but most of it's pretty similar to what we had before …the main difference is the pcntl_fork() magic. If you're not familiar with pcntl_fork(), I highly recommend Frans-Jan v. Steenbeek's thorough look at it; I copied a lot of his basic structure when I set the above code up. Essentially, pcntl_fork() creates a child process which, up to the fork point, is absolutely identical to its parent. That's why it works to set up our log in the parent; as long as we set it up before forking, the child will have access to it.

Also, see how the error handling works? I replaced all my previous edition's addError() calls with simple Zend_Log::err() calls instead; subsequently, we can simply output the log file so that the cron daemon can send it on to the appropriate user in the event that something goes wrong. Actually a bit cleaner than the previous approach, I think.

One side effect

In my original post I recommended calling your cron service over HTTP with curl. This won't work for the new version, since pcntl_fork() is typically disabled when PHP is compiled as an Apache module; so, we'll need to move our cron entry point out of the webroot and adjust our crontab accordingly. Easy enough to do, and probably more secure anyway.

Hopefully this helps someone; special thanks again to Greg for making the suggestion, and to Abhinav Singh for getting me interested in the subject in the first place.

Categories: