Thomas Bruederli
2014-06-04 3412e50b54e3daac8745234e21ab6e72be0ed165
program/lib/Roundcube/rcube_imap_search.php
@@ -5,7 +5,7 @@
 | This file is part of the Roundcube Webmail client                     |
 |                                                                       |
 | Copyright (C) 2013, The Roundcube Dev Team                            |
 | Copyright (C) 2013, Kolab Systems AG                                  |
 | Copyright (C) 2014, Kolab Systems AG                                  |
 |                                                                       |
 | Licensed under the GNU General Public License version 3 or            |
 | any later version with exceptions for skins & plugins.                |
@@ -18,15 +18,8 @@
 +-----------------------------------------------------------------------+
*/
// create classes defined by the pthreads module if that isn't installed
if (!defined('PTHREADS_INHERIT_ALL')) {
    class Worker { }
    class Stackable { }
}
/**
 * Class to control search jobs on multiple IMAP folders.
 * This implement a simple threads pool using the pthreads extension.
 *
 * @package    Framework
 * @subpackage Storage
@@ -36,12 +29,10 @@
{
    public $options = array();
    private $size = 10;
    private $next = 0;
    private $workers = array();
    private $states = array();
    private $jobs = array();
    private $conn;
    protected $jobs = array();
    protected $timelimit = 0;
    protected $results;
    protected $conn;
    /**
     * Default constructor
@@ -63,27 +54,32 @@
     */
    public function exec($folders, $str, $charset = null, $sort_field = null, $threading=null)
    {
        $pthreads = defined('PTHREADS_INHERIT_ALL');
        $start = floor(microtime(true));
        $results = new rcube_result_multifolder($folders);
        // start a search job for every folder to search in
        foreach ($folders as $folder) {
            $job = new rcube_imap_search_job($folder, $str, $charset, $sort_field, $threading);
            if ($pthreads && $this->submit($job)) {
                $this->jobs[] = $job;
            // a complete result for this folder already exists
            $result = $this->results ? $this->results->get_set($folder) : false;
            if ($result && !$result->incomplete) {
                $results->add($result);
            }
            else {
                $job = new rcube_imap_search_job($folder, $str, $charset, $sort_field, $threading);
                $job->worker = $this;
                $job->run();
                $this->jobs[] = $job;
            }
        }
        // wait for all workers to be done
        $this->shutdown();
        // gather results
        $results = new rcube_result_multifolder;
        // execute jobs and gather results
        foreach ($this->jobs as $job) {
            // only run search if within the configured time limit
            // TODO: try to estimate the required time based on folder size and previous search performance
            if (!$this->timelimit || floor(microtime(true)) - $start < $this->timelimit) {
                $job->run();
            }
            // add result (may have ->incomplete flag set)
            $results->add($job->get_result());
        }
@@ -91,51 +87,21 @@
    }
    /**
     * Assign the given job object to one of the worker threads for execution
     * Setter for timelimt property
     */
    public function submit(Stackable $job)
    public function set_timelimit($seconds)
    {
        if (count($this->workers) < $this->size) {
            $id = count($this->workers);
            $this->workers[$id] = new rcube_imap_search_worker($id, $this->options);
            $this->workers[$id]->start(PTHREADS_INHERIT_ALL);
            if ($this->workers[$id]->stack($job)) {
                return $job;
            }
            else {
                // trigger_error(sprintf("Failed to push Stackable onto %s", $id), E_USER_WARNING);
            }
        }
        if (($worker = $this->workers[$this->next])) {
            $this->next = ($this->next+1) % $this->size;
            if ($worker->stack($job)) {
                return $job;
            }
            else {
                // trigger_error(sprintf("Failed to stack onto selected worker %s", $worker->id), E_USER_WARNING);
            }
        }
        else {
            // trigger_error(sprintf("Failed to select a worker for Stackable"), E_USER_WARNING);
        }
        return false;
        $this->timelimit = $seconds;
    }
    /**
     * Shutdown the pool of threads cleanly, retaining exit status locally
     * Setter for previous (potentially incomplete) search results
     */
    public function shutdown()
    public function set_results($res)
    {
        foreach ($this->workers as $worker) {
            $this->states[$worker->getThreadId()] = $worker->shutdown();
            $worker->close();
        }
        # console('shutdown', $this->states);
        $this->results = $res;
    }
    /**
     * Get connection to the IMAP server
     * (used for single-thread mode)
@@ -150,7 +116,7 @@
/**
 * Stackable item to run the search on a specific IMAP folder
 */
class rcube_imap_search_job extends Stackable
class rcube_imap_search_job /* extends Stackable */
{
    private $folder;
    private $search;
@@ -168,13 +134,14 @@
        $this->charset = $charset;
        $this->sort_field = $sort_field;
        $this->threading = $threading;
        $this->result = new rcube_result_index($folder);
        $this->result->incomplete = true;
    }
    public function run()
    {
        // trigger_error("Start search $this->folder", E_USER_NOTICE);
        $this->result = $this->search_index();
        // trigger_error("End search $this->folder: " . $this->result->count(), E_USER_NOTICE);
    }
    /**
@@ -182,7 +149,6 @@
     */
    protected function search_index()
    {
        $pthreads = defined('PTHREADS_INHERIT_ALL');
        $criteria = $this->search;
        $charset = $this->charset;
@@ -192,10 +158,10 @@
            trigger_error("No IMAP connection for $this->folder", E_USER_WARNING);
            if ($this->threading) {
                return new rcube_result_thread();
                return new rcube_result_thread($this->folder);
            }
            else {
                return new rcube_result_index();
                return new rcube_result_index($this->folder);
            }
        }
@@ -218,10 +184,6 @@
                $threads = $imap->thread($this->folder, $this->threading,
                    rcube_imap::convert_criteria($criteria, $charset), true, 'US-ASCII');
            }
            // close IMAP connection again
            if ($pthreads)
                $imap->closeConnection();
            return $threads;
        }
@@ -248,10 +210,6 @@
            }
        }
        // close IMAP connection again
        if ($pthreads)
            $imap->closeConnection();
        return $messages;
    }
@@ -270,67 +228,7 @@
    {
        return $this->result;
    }
}
/**
 * Worker thread to run search jobs while maintaining a common context
 */
class rcube_imap_search_worker extends Worker
{
    public $id;
    public $options;
    private $conn;
    private $counts = 0;
    /**
     * Default constructor
     */
    public function __construct($id, $options)
    {
        $this->id = $id;
        $this->options = $options;
    }
    /**
     * Get a dedicated connection to the IMAP server
     */
    public function get_imap()
    {
        // TODO: make this connection persistent for several jobs
        // This doesn't seem to work. Socket connections don't survive serialization which is used in pthreads
        $conn = new rcube_imap_generic();
        # $conn->setDebug(true, function($conn, $message){ trigger_error($message, E_USER_NOTICE); });
        if ($this->options['user'] && $this->options['password']) {
            $this->options['ident']['command'] = 'search-' . $this->id . 't' . ++$this->counts;
            $conn->connect($this->options['host'], $this->options['user'], $this->options['password'], $this->options);
        }
        if ($conn->error)
            trigger_error($conn->error, E_USER_WARNING);
        return $conn;
    }
    /**
     * @override
     */
    public function run()
    {
    }
    /**
     * Close IMAP connection
     */
    public function close()
    {
        if ($this->conn) {
            $this->conn->close();
        }
    }
}