emitIntermediate($value, $type); * }; * * $reducer = function ($numbers, $type, $mr) { * $mr->emit(array_unique($numbers), $type); * }; * $results = new MapReduce($data, $mapper, $reducer); * ``` * * Previous example will generate the following result: * * ``` * ['odd' => [1, 3, 5], 'even' => [2, 4]] * ``` * * @param \Traversable $data the original data to be processed * @param callable $mapper the mapper callback. This function will receive 3 arguments. * The first one is the current value, second the current results key and third is * this class instance so you can call the result emitters. * @param callable|null $reducer the reducer callback. This function will receive 3 arguments. * The first one is the list of values inside a bucket, second one is the name * of the bucket that was created during the mapping phase and third one is an * instance of this class. */ public function __construct(Traversable $data, callable $mapper, callable $reducer = null) { $this->_data = $data; $this->_mapper = $mapper; $this->_reducer = $reducer; } /** * Returns an iterator with the end result of running the Map and Reduce * phases on the original data * * @return \ArrayIterator */ public function getIterator() { if (!$this->_executed) { $this->_execute(); } return new ArrayIterator($this->_result); } /** * Appends a new record to the bucket labelled with $key, usually as a result * of mapping a single record from the original data. * * @param mixed $val The record itself to store in the bucket * @param string $bucket the name of the bucket where to put the record * @return void */ public function emitIntermediate($val, $bucket) { $this->_intermediate[$bucket][] = $val; } /** * Appends a new record to the final list of results and optionally assign a key * for this record. * * @param mixed $val The value to be appended to the final list of results * @param string|null $key and optional key to assign to the value * @return void */ public function emit($val, $key = null) { $this->_result[$key === null ? $this->_counter : $key] = $val; $this->_counter++; } /** * Runs the actual Map-Reduce algorithm. This is iterate the original data * and call the mapper function for each , then for each intermediate * bucket created during the Map phase call the reduce function. * * @return void * @throws \LogicException if emitIntermediate was called but no reducer function * was provided */ protected function _execute() { $mapper = $this->_mapper; foreach ($this->_data as $key => $val) { $mapper($val, $key, $this); } $this->_data = null; if (!empty($this->_intermediate) && empty($this->_reducer)) { throw new LogicException('No reducer function was provided'); } /** @var callable $reducer */ $reducer = $this->_reducer; foreach ($this->_intermediate as $key => $list) { $reducer($list, $key, $this); } $this->_intermediate = []; $this->_executed = true; } }