connect(); return $this->do_write($handle, $pairs, $commit_batch_size); } public function write(&$pairs, $commit_batch_size, $repeat = 1) { $handle = $this->connect(); $written = 0; for ($i = 0; $i < $repeat; $i++) { $written += $this->do_write($handle, $pairs, $commit_batch_size); } return $written; } abstract protected function do_write($handle, $pairs, $commit_batch_size); public function read(&$pairs, $repeat = 1) { $handle = $this->connect(); $fetched = 0; for ($i = 0; $i < $repeat; $i++) { $fetched += $this->do_read($handle, $pairs); } return $fetched; } abstract protected function do_read($handle, $pairs); abstract public function get_name(); } class mysqli_store extends store { protected $host; protected $user; protected $pwd; protected $db; protected $port; protected $socket; public function __construct($host, $user, $pwd, $db, $port, $socket) { $this->host = $host; $this->user = $user; $this->pwd = $pwd; $this->db = $db; $this->port = $port; $this->socket = $socket; } protected function connect() { $mysqli = new mysqli($this->host, $this->user, $this->pwd, $this->db, $this->port, $this->socket); if ($mysqli->connect_errno) { throw new Exception(sprintf("[%d] %s\n", $mysqli->connect_errno, $mysqli->connect_error)); } return $mysqli; } public function flush() { $mysqli = $this->connect(); if (!$mysqli->query(sprintf("DELETE FROM demo_test"))) { throw new Exception(sprintf("[%d] %s\n", $mysqli->errno, $mysqli->error)); } } protected function do_write($mysqli, $pairs, $commit_batch_size) { $inserted = 0; $mysqli->autocommit = false; $last = microtime(true); foreach ($pairs as $k => $pair) { if (!$mysqli->query(sprintf("REPLACE INTO demo_test(c1, c2) VALUES ('%s', '%s')", $pair[0], $pair[1]))) { throw new Exception(sprintf("[%d] %s\n", $mysqli->errno, $mysqli->error)); } $inserted++; if ($inserted % $commit_batch_size == 0) { $mysqli->commit(); printf("\t\tSQL COMMIT after %.2fs\n", microtime(true) - $last); $last = microtime(true); } } $mysqli->commit(); $mysqli->autocommit = true; return $inserted; } protected function do_read($mysqli, $pairs) { $fetched = 0; $num = count($pairs); while (count($pairs)) { do { $idx = mt_rand(0, $num); } while (!isset($pairs[$idx])); $res = $mysqli->query($sql = "SELECT c2 FROM demo_test WHERE c1='" . $pairs[$idx][0] . "'"); if (!$res) { throw new Exception(sprintf("[%d] %s\n", $mysqli->errno, $mysqli->error)); } $row = $res->fetch_row(); $res->free(); assert($pairs[$idx][1] == $row[0]); $fetched++; unset($pairs[$idx]); } return $fetched; } public function get_name() { return __CLASS__; } } class memcache_store extends store { protected $host; protected $port; protected $socket; public function __construct($host, $port, $socket) { $this->host = $host; $this->port = $port; $this->socket = $socket; } protected function connect() { $memc = new Memcached(); if (!$memc->addServer($this->host, $this->port)) { throw new Exception(sprintf("[%d] Memc connect error\n", $memc->getResultCode())); } return $memc; } public function flush() { printf("NOTE: memcache store cannot flush records, restart daemon manually!\n"); return; } protected function do_write($memc, $pairs, $commit_batch_size) { $inserted = 0; foreach ($pairs as $k => $pair) { if (false == $memc->set($pair[0], $pair[1])) { throw new Exception(sprintf("[%d] Memc error\n", $memc->getResultCode())); } $inserted++; } return $inserted; } protected function do_read($memc, $pairs) { $fetched = 0; $num = count($pairs); while (count($pairs)) { do { $idx = mt_rand(0, $num); } while (!isset($pairs[$idx])); if (false == ($value = $memc->get($pairs[$idx][0]))) { throw new Exception(sprintf("[%d] Memc error\n", $memc->getResultCode())); } assert($pairs[$idx][1] == $value); $fetched++; unset($pairs[$idx]); } return $fetched; } public function get_name() { return __CLASS__; } } class mysqli_memcache_store extends memcache_store { protected $host; protected $user; protected $pwd; protected $db; protected $port; protected $socket; protected $memc_port; protected $memc_socket; protected $commit_batch_size; protected $name; public function __construct($host, $user, $pwd, $db, $port, $socket, $memc_port, $memc_socket, $commit_batch_size) { $this->host = $host; $this->user = $user; $this->pwd = $pwd; $this->db = $db; $this->port = $port; $this->socket = $socket; $this->memc_port = $memc_port; $this->memc_socket = $memc_socket; $this->commit_batch_size = $commit_batch_size; } protected function connect() { $memc = new Memcached(); if (!$memc->addServer($this->host, $this->memc_port)) { throw new Exception(sprintf("[%d] Memc connect error\n", $memc->getResultCode())); } return $memc; } public function flush() { $mysqli = new mysqli($this->host, $this->user, $this->pwd, $this->db, $this->port, $this->socket); if ($mysqli->connect_errno) { throw new Exception(sprintf("[%d] %s\n", $mysqli->connect_errno, $mysqli->connect_error)); } if (!$mysqli->query(sprintf("DELETE FROM demo_test"))) { throw new Exception(sprintf("[%d] %s\n", $mysqli->errno, $mysqli->error)); } } protected function do_write($memc, $pairs, $commit_batch_size) { $inserted = 0; foreach ($pairs as $k => $pair) { $inserted++; if (false == $memc->set($pair[0], $pair[1])) { printf("Memcache error %06d (%06d) %d\n", $inserted, $inserted % $this->commit_batch_size, $memc->getResultCode()); if (($this->commit_batch_size != 0) && (14 == $memc->getResultCode())) { for ($ok = false, $retries = 0; ($retries < 3) && (!$ok); $retries++) { $ok = $memc->set($pair[0], $pair[1]); } if ($retries == 3) { throw new Exception(sprintf("[%d] Memc error, retries %d\n", $memc->getResultCode(), $retries)); } } else { throw new Exception(sprintf("[%d] Memc error\n", $memc->getResultCode())); } } } return $inserted; } public function set_name($name) { $this->name = $name; } public function get_name() { return ($this->name != "") ? $this->name : __CLASS__; } } class redis_store extends store { protected $servers = array(); protected $last_server = 0; protected $host; protected $port; public function __construct($servers) { $this->servers = $servers; } protected function connect($host = NULL, $port = NULL) { if ($host == NULL) { $this->last_server++; $which = $this->servers[getmypid() % count($this->servers)]; if (!($redis = phpiredis_connect($which['host'], $which['port']))) { throw new Exception("Redis connect failed!"); } printf("\t\t%d Connecting to Redis on port %d\n", getmypid(), $which['port']); } else { if (!($redis = phpiredis_connect($host, $port))) { throw new Exception("Redis connect failed!"); } } return $redis; } public function load(&$pairs, $commit_batch_size) { $written = 0; foreach ($this->servers as $server) { $redis = $this->connect($server['host'], $server['port']); $written += $this->do_write($redis, $pairs, $commit_batch_size); } return $written; } public function flush() { $ok = true; foreach ($this->servers as $server) { $redis = $this->connect($server['host'], $server['port']); $ok = $ok & phpiredis_command($redis, 'FLUSHALL'); } return $ok; } protected function do_write($redis, $pairs, $commit_batch_size) { $inserted = 0; foreach ($pairs as $k => $pair) { if (false == phpiredis_command($redis, "SET " . $pair[0] . " " . $pair[1])) { throw new Exception("Redis SET failed"); } $inserted++; } return $inserted; } protected function do_read($redis, $pairs) { $fetched = 0; $num = count($pairs); while (count($pairs)) { do { $idx = mt_rand(0, $num); } while (!isset($pairs[$idx])); if (false == ($value = phpiredis_command($redis, "GET " . $pairs[$idx][0]))) { throw new Exception("Redis GET failed"); } assert($pairs[$idx][1] == $value); $fetched++; unset($pairs[$idx]); } return $fetched; } public function get_name() { return __CLASS__; } } class runner { protected $settings = array( 'num_values' => 10000, 'key_len' => 100, 'value_len' => 400, 'rest_time' => 6, 'read_worker' => 2, 'read_repeat' => 10, 'write_worker' => 2, 'write_repeat' => 10, 'write_batch_size' => 1, ); protected $pairs; protected $stores; protected $times; protected $run_id; protected $host; protected $user; protected $pwd; protected $db; protected $port; protected $socket; public function __construct($settings, $host, $user, $pwd, $db, $port, $socket) { foreach ($settings as $k => $v) { if (isset($this->settings[$k])) { $this->settings[$k] = $v; } } printf("\nSettings:\n"); foreach ($this->settings as $k => $v) { printf("%s = %s, ", $k, $v); } printf("\n"); $this->host = $host; $this->user = $user; $this->pwd = $pwd; $this->db = $db; $this->port = $port; $this->socket = $socket; $this->generate_pairs($this->settings['num_values'], $this->settings['key_len'], $this->settings['value_len']); $this->setup_db(); $this->times = array(); } protected function connect() { $mysqli = new mysqli($this->host, $this->user, $this->pwd, $this->db, $this->port, $this->socket); if ($mysqli->connect_errno) { throw new Exception(sprintf("[%d] %s\n", $mysqli->connect_errno, $mysqli->connect_error)); } return $mysqli; } public function setup_db() { $mysqli = $this->connect(); $sql = "CREATE TABLE IF NOT EXISTS `php_bench` ( `run_id` int(11) NOT NULL, `pid` int(11) unsigned NOT NULL, `store` varchar(255) NOT NULL, `label` varchar(255) NOT NULL, `runtime` decimal(10,6) unsigned NOT NULL, `num_op` int(10) unsigned NOT NULL, `ops` int(10) unsigned NOT NULL, `settings` BLOB, `modified` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`run_id`,`label`, `store`, `pid`) ) ENGINE=InnoDB"; if (!$mysqli->query($sql)) { throw new Exception(sprintf("[%d] %s\n", $mysqli->errno, $mysqli->error)); } } public function generate_pairs($num, $key_len, $value_len) { $this->pairs = array(); $anum = "0123456789ABCDEFGHIJKLMNOPQRSTWXYZabcdefghijklmnopqrstuvwxyz"; $anum_len = strlen($anum) - 1; for ($i = 0; $i < $num; $i++) { $key = ""; for ($j = 0; $j < $key_len; $j++) { $key .= substr($anum, mt_rand(0, $anum_len), 1); } $value = $key . strrev($key) . $key . strrev($key); $this->pairs[] = array($key, $value); } return count($this->pairs); } public function add_store(store $store) { $this->stores[$store->get_name()] = $store; } public function execute_read($worker = NULL) { if (NULL == $worker) { $worker = $this->settings['read_worker']; } printf("\tREAD test\n"); foreach ($this->stores as $store) { $this->timer('flush', $store->get_name()); $store->flush(); $this->timer('flush', $store->get_name(), 1); $this->timer('load', $store->get_name()); $store->load($this->pairs, $this->settings['write_batch_size']); $this->timer('load', $store->get_name(), count($this->pairs)); printf("\tFlushed and loaded data into '%s' store\n", $store->get_name()); } $run_id = $this->persist_times(); foreach ($this->stores as $store) { printf("\tResting for %d seconds before testing store '%s'...\n", $this->settings['rest_time'], $store->get_name()); sleep($this->settings['rest_time']); $pids = array(); for ($fetch_worker = 1; $fetch_worker <= $worker; $fetch_worker++) { switch ($pid = pcntl_fork()) { case -1: printf("Fork failed!\n"); break; case 0: $this->timer('read', $store->get_name()); $op = $store->read($this->pairs, $this->settings['read_repeat']); $this->timer('read', $store->get_name(), $op); printf("\tPID %d, finished read test for '%s' store\n", getmypid(), $store->get_name()); $this->persist_times($run_id); exit(0); break; default: printf("\t\tParent has created worker [%d] (pid = %d)\n", $fetch_worker, $pid); $pids[] = $pid; pcntl_waitpid($pid, $status, WNOHANG); break; } } foreach ($pids as $pid) { pcntl_waitpid($pid, $status); } } return $run_id; } public function execute_write($worker = NULL) { if ($worker == NULL) { $worker = $this->settings['write_worker']; } printf("\tWRITE test\n"); foreach ($this->stores as $store) { $this->timer('flush', $store->get_name()); $store->flush(); $this->timer('flush', $store->get_name(), 1); printf("\tFlushed data from store '%s'\n", $store->get_name()); } $run_id = $this->persist_times(); foreach ($this->stores as $store) { printf("\tResting for %d seconds before testing store '%s'...\n", $this->settings['rest_time'], $store->get_name()); sleep($this->settings['rest_time']); $pids = array(); for ($fetch_worker = 1; $fetch_worker <= $worker; $fetch_worker++) { switch ($pid = pcntl_fork()) { case -1: printf("Fork failed!\n"); break; case 0: $this->timer('write', $store->get_name()); $op = $store->write($this->pairs, $this->settings['write_batch_size'], $this->settings['write_repeat']); $this->timer('write', $store->get_name(), $op); printf("\tPID %d, finished write test for '%s' store\n", getmypid(), $store->get_name()); $this->persist_times($run_id); exit(0); break; default: printf("\t\tParent has created worker [%d] (pid = %d)\n", $fetch_worker, $pid); $pids[] = $pid; pcntl_waitpid($pid, $status, WNOHANG); break; } } foreach ($pids as $pid) { pcntl_waitpid($pid, $status); } } return $run_id; } public function timer($label, $store, $op = 0) { if (!isset($this->times[$label])) { $this->times[$label] = array(); } if (!isset($this->times[$label][$store])) { $this->times[$label][$store] = array(); } $pid = getmypid(); if (!isset($this->times[$label][$store][$pid])) { $this->times[$label][$store][$pid] = array( 'runtime' => microtime(true), 'op' => $op ); } else { $this->times[$label][$store][$pid] = array( 'runtime' => microtime(true) - $this->times[$label][$store][$pid]['runtime'], 'op' => $op ); } return $this->times[$label][$store][$pid]; } public function persist_times($run_id = NULL) { $mysqli = $this->connect(); $mysqli->autocommit = false; if (is_null($run_id)) { $sql = 'SELECT max(run_id) as _run_id FROM php_bench'; if (!($res = $mysqli->query($sql))) { throw new Exception(sprintf("[%d] %s\n", $mysqli->errno, $mysqli->error)); } $row = $res->fetch_assoc(); $run_id = $row['_run_id'] + 1; $this->run_id = $run_id; } $settings = ""; foreach ($this->settings as $k => $v) { $settings .= sprintf('%s = %s, ', $k, $v); } $settings = $mysqli->real_escape_string(substr($settings, 0, -2)); foreach ($this->times as $label => $details) { foreach ($details as $store => $proc_details) { foreach ($proc_details as $pid => $entry) { $sql = sprintf('INSERT INTO php_bench(run_id, pid, store, label, runtime, num_op, ops, settings) VALUES (%d, %d, "%s", "%s", %4.6f, %d, %d, "%s")', $run_id, $pid, $store, $label, $entry['runtime'], $entry['op'], ($entry['runtime'] == 0) ? 0 : ($entry['op'] / $entry['runtime']), $settings ); if (!$mysqli->query($sql)) { throw new Exception(sprintf("[%d] %s\n", $mysqli->errno, $mysqli->error)); } } } } $mysqli->commit(); $mysqli->autocommit = true; $this->times = array(); return $this->run_id; } public function print_summary($run_id, $details = false) { $mysqli = $this->connect(); printf("\n%s\n", str_repeat('=', 80)); printf("Summary of run %d\n", $run_id); if (!($res = $mysqli->query(sprintf("SELECT DISTINCT(label) AS _label FROM php_bench WHERE run_id = %d ORDER BY label ASC", $run_id)))) { throw new Exception(sprintf("[%d] %s\n", $mysqli->errno, $mysqli->error)); } $labels = array(); while ($row = $res->fetch_assoc()) { $labels[$row['_label']] = $row['_label']; } if (!($res = $mysqli->query(sprintf("SELECT DISTINCT(store) AS _store FROM php_bench WHERE run_id = %d ORDER BY store ASC", $run_id)))) { throw new Exception(sprintf("[%d] %s\n", $mysqli->errno, $mysqli->error)); } $stores = array(); while ($row = $res->fetch_assoc()) { $stores[$row['_store']]= $row['_store']; } printf("Data stores tested:\n"); foreach ($stores as $store) { printf("\t%s\n", $store); } printf("\n"); foreach ($labels as $label) { printf("\n%s\n%s\n", $label, str_repeat('-', 80)); if (!($res = $mysqli->query( sprintf("SELECT store, COUNT(pid) AS _num, AVG(ops) AS _ops, SUM(num_op) AS _total_ops, MIN(runtime) AS _min_runtime, MAX(runtime) AS _max_runtime FROM php_bench WHERE run_id = %d AND label = '%s' GROUP BY store ORDER BY _ops DESC", $run_id, $label)))) { throw new Exception(sprintf("[%d] %s\n", $mysqli->errno, $mysqli->error)); } while ($row = $res->fetch_assoc()) { printf("%-40s\n", $row['store']); printf("\t%-20s: %d\n", "operations/s", $row['_ops']); if ($details) { printf("\t%-20s: %d\n", "# operations", $row['_total_ops']); printf("\t%-20s: %4.2fs\n", "min runtime", $row['_min_runtime']); printf("\t%-20s: %4.2fs\n", "max runtime", $row['_max_runtime']); printf("\t%-20s: %d\n", "# worker", $row['_num']); printf("\n"); } } } printf("\n"); } } $num_workers = array(1, 2, 4, 8, 16, 32); foreach ($num_workers as $worker) { $settings = array( 'num_values' => 25000, 'key_len' => 100, 'value_len' => 400, 'rest_time' => 5, 'read_worker' => $worker, 'read_repeat' => 20, 'write_worker' => $worker, 'write_repeat' => 2, 'write_batch_size' => 3000, ); $r = new runner($settings, "127.0.0.1", "root", "", "test", 3307, NULL); //$r->add_store(new mysqli_store("127.0.0.1", "root", "", "test", 3307, NULL)); //$r->add_store(new memcache_store("127.0.0.1", 11212, NULL)); //$mysqli_memcache_store_57 = new mysqli_memcache_store("127.0.0.1", "root", "", "test", 3307, NULL, 11211, NULL, $settings['write_batch_size']); //$mysqli_memcache_store_57->set_name("MySQL 5.7 Memcached"); //$mysqli_memcache_store_56 = new mysqli_memcache_store("127.0.0.1", "root", "", "test", 3308, NULL, 11213, NULL, $settings['write_batch_size']); //$mysqli_memcache_store_56->set_name("MySQL 5.6 Memcached"); //$r->add_store($mysqli_memcache_store_57); //$r->add_store($mysqli_memcache_store_56); $r->add_store(new redis_store(array(array('host' => "127.0.0.1", 'port' => 6379), array('host' => "127.0.0.1", "port" => 7777), array('host' => "127.0.0.1", "port" => 8888) ))); $r->print_summary($r->execute_read(), true); // $r->print_summary($r->execute_write(), true); unset($r); sleep(10); }