| | |
| | | | This file is part of the Roundcube Webmail client | |
| | | | | |
| | | | Copyright (C) 2013, The Roundcube Dev Team | |
| | | | Copyright (C) 2013, Kolab Systems AG | |
| | | | Copyright (C) 2014, Kolab Systems AG | |
| | | | | |
| | | | Licensed under the GNU General Public License version 3 or | |
| | | | any later version with exceptions for skins & plugins. | |
| | |
| | | +-----------------------------------------------------------------------+ |
| | | */ |
| | | |
| | | // create classes defined by the pthreads module if that isn't installed |
| | | if (!defined('PTHREADS_INHERIT_ALL')) { |
| | | class Worker { } |
| | | class Stackable { } |
| | | } |
| | | |
| | | /** |
| | | * Class to control search jobs on multiple IMAP folders. |
| | | * This implement a simple threads pool using the pthreads extension. |
| | | * |
| | | * @package Framework |
| | | * @subpackage Storage |
| | |
| | | { |
| | | public $options = array(); |
| | | |
| | | private $size = 10; |
| | | private $next = 0; |
| | | private $workers = array(); |
| | | private $states = array(); |
| | | private $jobs = array(); |
| | | private $conn; |
| | | protected $jobs = array(); |
| | | protected $timelimit = 0; |
| | | protected $results; |
| | | protected $conn; |
| | | |
| | | /** |
| | | * Default constructor |
| | |
| | | public function __construct($options, $conn) |
| | | { |
| | | $this->options = $options; |
| | | $this->conn = $conn; |
| | | $this->conn = $conn; |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public function exec($folders, $str, $charset = null, $sort_field = null, $threading=null) |
| | | { |
| | | $pthreads = defined('PTHREADS_INHERIT_ALL'); |
| | | $start = floor(microtime(true)); |
| | | $results = new rcube_result_multifolder($folders); |
| | | |
| | | // start a search job for every folder to search in |
| | | foreach ($folders as $folder) { |
| | | $job = new rcube_imap_search_job($folder, $str, $charset, $sort_field, $threading); |
| | | if ($pthreads && $this->submit($job)) { |
| | | $this->jobs[] = $job; |
| | | // a complete result for this folder already exists |
| | | $result = $this->results ? $this->results->get_set($folder) : false; |
| | | if ($result && !$result->incomplete) { |
| | | $results->add($result); |
| | | } |
| | | else { |
| | | $search = is_array($str) && $str[$folder] ? $str[$folder] : $str; |
| | | $job = new rcube_imap_search_job($folder, $search, $charset, $sort_field, $threading); |
| | | $job->worker = $this; |
| | | $job->run(); |
| | | $this->jobs[] = $job; |
| | | } |
| | | } |
| | | |
| | | // wait for all workers to be done |
| | | $this->shutdown(); |
| | | |
| | | // gather results |
| | | $results = new rcube_result_multifolder; |
| | | // execute jobs and gather results |
| | | foreach ($this->jobs as $job) { |
| | | // only run search if within the configured time limit |
| | | // TODO: try to estimate the required time based on folder size and previous search performance |
| | | if (!$this->timelimit || floor(microtime(true)) - $start < $this->timelimit) { |
| | | $job->run(); |
| | | } |
| | | |
| | | // add result (may have ->incomplete flag set) |
| | | $results->add($job->get_result()); |
| | | } |
| | | |
| | |
| | | } |
| | | |
| | | /** |
| | | * Assign the given job object to one of the worker threads for execution |
| | | * Setter for timelimt property |
| | | */ |
| | | public function submit(Stackable $job) |
| | | public function set_timelimit($seconds) |
| | | { |
| | | if (count($this->workers) < $this->size) { |
| | | $id = count($this->workers); |
| | | $this->workers[$id] = new rcube_imap_search_worker($id, $this->options); |
| | | $this->workers[$id]->start(PTHREADS_INHERIT_ALL); |
| | | |
| | | if ($this->workers[$id]->stack($job)) { |
| | | return $job; |
| | | } |
| | | else { |
| | | // trigger_error(sprintf("Failed to push Stackable onto %s", $id), E_USER_WARNING); |
| | | } |
| | | } |
| | | if (($worker = $this->workers[$this->next])) { |
| | | $this->next = ($this->next+1) % $this->size; |
| | | if ($worker->stack($job)) { |
| | | return $job; |
| | | } |
| | | else { |
| | | // trigger_error(sprintf("Failed to stack onto selected worker %s", $worker->id), E_USER_WARNING); |
| | | } |
| | | } |
| | | else { |
| | | // trigger_error(sprintf("Failed to select a worker for Stackable"), E_USER_WARNING); |
| | | } |
| | | |
| | | return false; |
| | | $this->timelimit = $seconds; |
| | | } |
| | | |
| | | /** |
| | | * Shutdown the pool of threads cleanly, retaining exit status locally |
| | | * Setter for previous (potentially incomplete) search results |
| | | */ |
| | | public function shutdown() |
| | | public function set_results($res) |
| | | { |
| | | foreach ($this->workers as $worker) { |
| | | $this->states[$worker->getThreadId()] = $worker->shutdown(); |
| | | $worker->close(); |
| | | } |
| | | |
| | | # console('shutdown', $this->states); |
| | | $this->results = $res; |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Get connection to the IMAP server |
| | | * (used for single-thread mode) |
| | |
| | | /** |
| | | * Stackable item to run the search on a specific IMAP folder |
| | | */ |
| | | class rcube_imap_search_job extends Stackable |
| | | class rcube_imap_search_job /* extends Stackable */ |
| | | { |
| | | private $folder; |
| | | private $search; |
| | | private $charset; |
| | | private $sort_field; |
| | | private $threading; |
| | | private $searchset; |
| | | private $result; |
| | | private $pagesize = 100; |
| | | |
| | | public function __construct($folder, $str, $charset = null, $sort_field = null, $threading=false) |
| | | { |
| | | $this->folder = $folder; |
| | | $this->search = $str; |
| | | $this->charset = $charset; |
| | | $this->folder = $folder; |
| | | $this->search = $str; |
| | | $this->charset = $charset; |
| | | $this->sort_field = $sort_field; |
| | | $this->threading = $threading; |
| | | $this->threading = $threading; |
| | | |
| | | $this->result = new rcube_result_index($folder); |
| | | $this->result->incomplete = true; |
| | | } |
| | | |
| | | public function run() |
| | | { |
| | | // trigger_error("Start search $this->folder", E_USER_NOTICE); |
| | | $this->result = $this->search_index(); |
| | | // trigger_error("End search $this->folder: " . $this->result->count(), E_USER_NOTICE); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | protected function search_index() |
| | | { |
| | | $pthreads = defined('PTHREADS_INHERIT_ALL'); |
| | | $criteria = $this->search; |
| | | $charset = $this->charset; |
| | | |
| | | $imap = $this->worker->get_imap(); |
| | | $charset = $this->charset; |
| | | $imap = $this->worker->get_imap(); |
| | | |
| | | if (!$imap->connected()) { |
| | | trigger_error("No IMAP connection for $this->folder", E_USER_WARNING); |
| | | |
| | | if ($this->threading) { |
| | | return new rcube_result_thread(); |
| | | return new rcube_result_thread($this->folder); |
| | | } |
| | | else { |
| | | return new rcube_result_index(); |
| | | return new rcube_result_index($this->folder); |
| | | } |
| | | } |
| | | |
| | |
| | | $threads = $imap->thread($this->folder, $this->threading, |
| | | rcube_imap::convert_criteria($criteria, $charset), true, 'US-ASCII'); |
| | | } |
| | | |
| | | // close IMAP connection again |
| | | if ($pthreads) |
| | | $imap->closeConnection(); |
| | | |
| | | return $threads; |
| | | } |
| | |
| | | } |
| | | } |
| | | |
| | | // close IMAP connection again |
| | | if ($pthreads) |
| | | $imap->closeConnection(); |
| | | |
| | | return $messages; |
| | | } |
| | | |
| | |
| | | return $this->result; |
| | | } |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Worker thread to run search jobs while maintaining a common context |
| | | */ |
| | | class rcube_imap_search_worker extends Worker |
| | | { |
| | | public $id; |
| | | public $options; |
| | | |
| | | private $conn; |
| | | |
| | | /** |
| | | * Default constructor |
| | | */ |
| | | public function __construct($id, $options) |
| | | { |
| | | $options['ident']['command'] = 'search-'.$id; |
| | | |
| | | $this->id = $id; |
| | | $this->options = $options; |
| | | } |
| | | |
| | | /** |
| | | * Get a dedicated connection to the IMAP server |
| | | */ |
| | | public function get_imap() |
| | | { |
| | | // TODO: make this connection persistent for several jobs |
| | | #if ($this->conn) |
| | | # return $this->conn; |
| | | |
| | | $conn = new rcube_imap_generic(); |
| | | # $conn->setDebug(true, function($conn, $message){ trigger_error($message, E_USER_NOTICE); }); |
| | | |
| | | if ($this->options['user'] && $this->options['password']) { |
| | | // TODO: do this synchronized to avoid warnings like "Only one Id allowed in non-authenticated state" |
| | | $conn->connect($this->options['host'], $this->options['user'], $this->options['password'], $this->options); |
| | | } |
| | | |
| | | if ($conn->error) |
| | | trigger_error($conn->error, E_USER_WARNING); |
| | | |
| | | #$this->conn = $conn; |
| | | return $conn; |
| | | } |
| | | |
| | | /** |
| | | * @override |
| | | */ |
| | | public function run() |
| | | { |
| | | |
| | | } |
| | | |
| | | /** |
| | | * Close IMAP connection |
| | | */ |
| | | public function close() |
| | | { |
| | | if ($this->conn) { |
| | | $this->conn->close(); |
| | | } |
| | | } |
| | | } |
| | | |