=== false ) { // The query failed to select anything from the table, so there must be an issue reading from it. return new \WP_Error( 'custom_table_unable_to_read', 'Jetpack Sync Custom table: Unable to read from table' ); } if ( $wpdb->last_error ) { // There was an error reading, that's not necessarily failing the query. // TODO check if we need this error check. // TODO add more information about the erorr in the return value. return new \WP_Error( 'custom_table_unable_to_read_sql_error', 'Jetpack Sync Custom table: Unable to read from table - SQL error' ); } // Check if we can write in the table if ( ! $this->insert_item( 'test', 'test' ) ) { return new \WP_Error( 'custom_table_unable_to_writeread', 'Jetpack Sync Custom table: Unable to write into table' ); } // See if we can read the item back $items = $this->fetch_items_by_ids( array( 'test' ) ); if ( empty( $items ) || ! is_object( $items[0] ) || $items[0]->value !== 'test' ) { return new \WP_Error( 'custom_table_unable_to_writeread', 'Jetpack Sync Custom table: Unable to read item after writing' ); } // Try to insert an item, read it back and then delete it. $this->delete_items_by_ids( array( 'test' ) ); // Try to fetch the item back. It should not exist. $items = $this->fetch_items_by_ids( array( 'test' ) ); if ( ! empty( $items ) ) { return new \WP_Error( 'custom_table_unable_to_writeread', 'Jetpack Sync Custom table: Unable to delete from table' ); } return true; } /** * Drop the custom table as part of cleanup. * * @return bool If the table is cleared. */ public function drop_table() { global $wpdb; if ( $this->custom_table_exists() ) { // Ignoring the linting warning, as there's still no placeholder replacement for DB field name. // phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.DirectDatabaseQuery.NoCaching,WordPress.DB.PreparedSQL.InterpolatedNotPrepared,WordPress.DB.DirectDatabaseQuery.SchemaChange return (bool) $wpdb->query( "DROP TABLE {$this->table_name}" ); } } /** * Queue API implementation */ /** * Insert an item in the queue. * * @param string $item_id The item ID. * @param string $item Serialized item data. * * @return bool If the item was added. */ public function insert_item( $item_id, $item ) { global $wpdb; // phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.DirectDatabaseQuery.NoCaching,WordPress.DB.PreparedSQL.InterpolatedNotPrepared $rows_added = $wpdb->query( $wpdb->prepare( /** * Ignoring the linting warning, as there's still no placeholder replacement for DB field name, * in this case this is `$this->table_name` */ // phpcs:ignore WordPress.DB.PreparedSQL.InterpolatedNotPrepared "INSERT INTO {$this->table_name} (queue_id, event_id, event_payload) VALUES (%s, %s,%s)", $this->queue_id, $item_id, $item ) ); return ( 0 !== $rows_added ); } /** * Fetch items from the queue. * * @param int|null $item_count How many items to fetch from the queue. * The parameter is null-able, if no limit on the amount of items. * * @return object[]|null */ public function fetch_items( $item_count ) { global $wpdb; /** * Ignoring the linting warning, as there's still no placeholder replacement for DB field name, * in this case this is `$this->table_name` */ // phpcs:disable WordPress.DB.PreparedSQL.InterpolatedNotPrepared // TODO make it more simple for the $item_count if ( $item_count ) { // phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.DirectDatabaseQuery.NoCaching,WordPress.DB.PreparedSQL.InterpolatedNotPrepared $items = $wpdb->get_results( $wpdb->prepare( " SELECT event_id AS id, event_payload AS value FROM {$this->table_name} WHERE queue_id LIKE %s ORDER BY event_id ASC LIMIT %d ", $this->queue_id, $item_count ) ); } else { // phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.DirectDatabaseQuery.NoCaching,WordPress.DB.PreparedSQL.InterpolatedNotPrepared $items = $wpdb->get_results( $wpdb->prepare( " SELECT event_id AS id, event_payload AS value FROM {$this->table_name} WHERE queue_id LIKE %s ORDER BY event_id ASC ", $this->queue_id ) ); } // phpcs:enable WordPress.DB.PreparedSQL.InterpolatedNotPrepared return $items; } /** * Fetches items with specific IDs from the Queue. * * @param array $items_ids Items IDs to fetch from the queue. * * @return object[]|null */ public function fetch_items_by_ids( $items_ids ) { global $wpdb; // return early if $items_ids is empty or not an array. if ( empty( $items_ids ) || ! is_array( $items_ids ) ) { return array(); } $ids_placeholders = implode( ', ', array_fill( 0, count( $items_ids ), '%s' ) ); $query_with_placeholders = "SELECT event_id AS id, event_payload AS value FROM {$this->table_name} WHERE queue_id = %s AND event_id IN ( $ids_placeholders )"; $replacement_values = array_merge( array( $this->queue_id ), $items_ids ); // phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.DirectDatabaseQuery.NoCaching,WordPress.DB.PreparedSQL.InterpolatedNotPrepared $items = $wpdb->get_results( $wpdb->prepare( $query_with_placeholders, // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared $replacement_values ), OBJECT ); return $items; } /** * Check how many items are in the queue. * * @return int */ public function get_item_count() { global $wpdb; // phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.DirectDatabaseQuery.NoCaching,WordPress.DB.PreparedSQL.InterpolatedNotPrepared $items_count = (int) $wpdb->get_var( $wpdb->prepare( /** * Ignoring the linting warning, as there's still no placeholder replacement for DB field name, * in this case this is `$this->table_name` */ // phpcs:ignore WordPress.DB.PreparedSQL.InterpolatedNotPrepared "SELECT COUNT(*) FROM {$this->table_name} WHERE queue_id = %s", $this->queue_id ) ); // If the table does not exist, disable the custom queue table and send an error. if ( ! empty( $wpdb->last_error ) && str_contains( $wpdb->last_error, $this->table_name_no_prefix . "' doesn't exist" ) && ! get_transient( self::CUSTOM_QUEUE_TABLE_DISABLE_WPDB_ERROR_NOT_EXIST_FLAG ) ) { set_transient( self::CUSTOM_QUEUE_TABLE_DISABLE_WPDB_ERROR_NOT_EXIST_FLAG, true, 6 * HOUR_IN_SECONDS ); Settings::update_settings( array( 'custom_queue_table_enabled' => 0 ) ); $data = array( 'timestamp' => microtime( true ), 'error' => $wpdb->last_error, ); $sender = Sender::get_instance(); $sender->send_action( 'jetpack_sync_storage_error_custom_table_not_exist', $data ); } return $items_count; } /** * Clear out the queue. * * @return bool|int|\mysqli_result|resource|null */ public function clear_queue() { global $wpdb; // phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.DirectDatabaseQuery.NoCaching,WordPress.DB.PreparedSQL.InterpolatedNotPrepared return $wpdb->query( $wpdb->prepare( /** * Ignoring the linting warning, as there's still no placeholder replacement for DB field name, * in this case this is `$this->table_name` */ // phpcs:ignore WordPress.DB.PreparedSQL.InterpolatedNotPrepared "DELETE FROM {$this->table_name} WHERE queue_id = %s", $this->queue_id ) ); } /** * Return the lag amount for the queue. * * @param float|int|null $now A timestamp to use as starting point when calculating the lag. * * @return float|int The lag amount. */ public function get_lag( $now = null ) { global $wpdb; // TODO replace with peek and a flag to fetch only the name. // phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.DirectDatabaseQuery.NoCaching,WordPress.DB.PreparedSQL.InterpolatedNotPrepared $first_item_name = $wpdb->get_var( $wpdb->prepare( /** * Ignoring the linting warning, as there's still no placeholder replacement for DB field name, * in this case this is `$this->table_name` */ // phpcs:ignore WordPress.DB.PreparedSQL.InterpolatedNotPrepared "SELECT event_id FROM {$this->table_name} WHERE queue_id = %s ORDER BY event_id ASC LIMIT 1", $this->queue_id ) ); if ( ! $first_item_name ) { return 0; } if ( null === $now ) { $now = microtime( true ); } // Break apart the item name to get the timestamp. $matches = null; if ( preg_match( '/^jpsq_' . $this->queue_id . '-(\d+\.\d+)-/', $first_item_name, $matches ) ) { return $now - (float) $matches[1]; } else { return 0; } } /** * Add multiple items to the queue at once. * * @param array $items Array of items to add. * @param string $id_prefix Prefix to use for all the items. * * @return bool|int|\mysqli_result|resource|null */ public function add_all( $items, $id_prefix ) { global $wpdb; $query = "INSERT INTO {$this->table_name} (queue_id, event_id, event_payload ) VALUES "; $rows = array(); $count_items = count( $items ); for ( $i = 0; $i < $count_items; ++$i ) { // skip empty items. if ( empty( $items[ $i ] ) ) { continue; } try { $event_id = esc_sql( $id_prefix . '-' . $i ); $event_payload = esc_sql( serialize( $items[ $i ] ) ); // phpcs:ignore WordPress.PHP.DiscouragedPHPFunctions.serialize_serialize $rows[] = "('{$this->queue_id}', '$event_id','$event_payload')"; } catch ( \Exception $e ) { // Item cannot be serialized so skip. continue; } } $rows_added = $wpdb->query( $query . implode( ',', $rows ) ); // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared, WordPress.DB.DirectDatabaseQuery.DirectQuery, WordPress.DB.DirectDatabaseQuery.NoCaching return $rows_added; } /** * Return $max_count items from the queue, including their value string length. * * @param int $max_count How many items to fetch from the queue. * * @return object[]|null */ public function get_items_ids_with_size( $max_count ) { global $wpdb; // TODO optimize the fetch to happen by queue name not by the IDs as it can be issue cross-queues. // phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.DirectDatabaseQuery.NoCaching,WordPress.DB.PreparedSQL.InterpolatedNotPrepared return $wpdb->get_results( $wpdb->prepare( /** * Ignoring the linting warning, as there's still no placeholder replacement for DB field name, * in this case this is `$this->table_name` */ // phpcs:ignore WordPress.DB.PreparedSQL.InterpolatedNotPrepared "SELECT event_id AS id, LENGTH( event_payload ) AS value_size FROM {$this->table_name} WHERE queue_id = %s ORDER BY event_id ASC LIMIT %d", $this->queue_id, $max_count ), OBJECT ); } /** * Delete items with specific IDs from the queue. * * @param array $ids IDs of the items to remove from the queue. * * @return bool|int|\mysqli_result|resource|null */ public function delete_items_by_ids( $ids ) { global $wpdb; $ids_placeholders = implode( ', ', array_fill( 0, count( $ids ), '%s' ) ); // phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.DirectDatabaseQuery.NoCaching,WordPress.DB.PreparedSQL.InterpolatedNotPrepared return $wpdb->query( $wpdb->prepare( /** * Ignoring the linting warning, as there's still no placeholder replacement for DB field name, * in this case this is `$this->table_name` */ // phpcs:ignore WordPress.DB.PreparedSQL.InterpolatedNotPrepared "DELETE FROM {$this->table_name} WHERE queue_id = %s AND event_id IN ( $ids_placeholders )", array_merge( array( $this->queue_id ), $ids ) ) ); } /** * Table initialization */ public static function initialize_custom_sync_table() { /** * Initialize an instance of the class with a test name, so we can use table prefix and then test if the table is healthy. */ $custom_table_instance = new Queue_Storage_Table( 'test_queue' ); // Check if the table exists if ( ! $custom_table_instance->custom_table_exists() ) { $custom_table_instance->create_table(); } return $custom_table_instance->is_custom_table_healthy(); } /** * Migrates the existing Sync events from the options table to the Custom table * * @return bool */ public static function migrate_from_options_table_to_custom_table() { global $wpdb; // phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.DirectDatabaseQuery.NoCaching $count_result = $wpdb->get_row( " SELECT COUNT(*) as item_count FROM {$wpdb->options} WHERE option_name LIKE 'jpsq_%' " ); $item_count = $count_result->item_count; $limit = 100; $offset = 0; do { // get all the records from the options table $query = " SELECT option_name as event_id, option_value as event_payload FROM {$wpdb->options} WHERE option_name LIKE 'jpsq_%' ORDER BY option_name ASC LIMIT $offset, $limit "; // phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.DirectDatabaseQuery.NoCaching,WordPress.DB.PreparedSQL.NotPrepared $rows = $wpdb->get_results( $query ); $insert_rows = array(); foreach ( $rows as $event ) { $event_id = $event->event_id; // Parse the event if ( preg_match( '!jpsq_(?P[^-]+)-(?P[^-]+)-.+!', $event_id, $events_matches ) ) { $queue_id = $events_matches['queue_id']; $timestamp = $events_matches['timestamp']; $insert_rows[] = $wpdb->prepare( '(%s, %s, %s, %s)', array( $queue_id, $event_id, $event->event_payload, (int) $timestamp, ) ); } } // Instantiate table storage, so we can get the table name. Queue ID is just a placeholder here. $queue_table_storage = new Queue_Storage_Table( 'test_queue' ); if ( ! empty( $insert_rows ) ) { $insert_query = 'INSERT INTO ' . $queue_table_storage->table_name . ' (queue_id, event_id, event_payload, timestamp) VALUES '; $insert_query .= implode( ',', $insert_rows ); // phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.DirectDatabaseQuery.NoCaching,WordPress.DB.PreparedSQL.NotPrepared $wpdb->query( $insert_query ); } $offset += $limit; } while ( $offset < $item_count ); // Clear out the options queue // phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.DirectDatabaseQuery.NoCaching,WordPress.DB.PreparedSQL.NotPrepared $wpdb->query( $wpdb->prepare( "DELETE FROM $wpdb->options WHERE option_name LIKE %s", 'jpsq_%-%' ) ); return true; } /** * Migrates the existing Sync events from the Custom table to the Options table * * @return void */ public static function migrate_from_custom_table_to_options_table() { global $wpdb; // Instantiate table storage, so we can get the table name. Queue ID is just a placeholder here. $queue_table_storage = new Queue_Storage_Table( 'test_queue' ); $custom_table_name = $queue_table_storage->table_name; // phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.DirectDatabaseQuery.NoCaching,WordPress.DB.PreparedSQL.InterpolatedNotPrepared $count_result = $wpdb->get_row( "SELECT COUNT(*) as item_count FROM {$custom_table_name}" ); if ( $wpdb->last_error ) { return; } $item_count = $count_result->item_count; $limit = 100; $offset = 0; do { // get all the records from the options table $query = " SELECT event_id, event_payload FROM {$custom_table_name} ORDER BY event_id ASC LIMIT $offset, $limit "; // phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.DirectDatabaseQuery.NoCaching,WordPress.DB.PreparedSQL.NotPrepared $rows = $wpdb->get_results( $query ); $insert_rows = array(); foreach ( $rows as $event ) { $insert_rows[] = $wpdb->prepare( '(%s, %s, "no")', array( $event->event_id, $event->event_payload, ) ); } if ( ! empty( $insert_rows ) ) { $insert_query = "INSERT INTO {$wpdb->options} (option_name, option_value, autoload) VALUES "; $insert_query .= implode( ',', $insert_rows ); // phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.DirectDatabaseQuery.NoCaching,WordPress.DB.PreparedSQL.NotPrepared $wpdb->query( $insert_query ); } $offset += $limit; } while ( $offset < $item_count ); // Clear the custom table // phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.DirectDatabaseQuery.NoCaching,WordPress.DB.PreparedSQL.InterpolatedNotPrepared $wpdb->query( "DELETE FROM {$custom_table_name}" ); // TODO should we drop the table here instead? } }
Fatal error: Uncaught Error: Class "Automattic\Jetpack\Sync\Queue\Queue_Storage_Table" not found in /htdocs/wp-content/plugins/jetpack/jetpack_vendor/automattic/jetpack-sync/src/class-queue.php:65 Stack trace: #0 /htdocs/wp-content/plugins/jetpack/jetpack_vendor/automattic/jetpack-sync/src/class-listener.php(467): Automattic\Jetpack\Sync\Queue->__construct('sync') #1 /htdocs/wp-content/plugins/jetpack/jetpack_vendor/automattic/jetpack-sync/src/class-listener.php(75): Automattic\Jetpack\Sync\Listener->set_defaults() #2 /htdocs/wp-content/plugins/jetpack/jetpack_vendor/automattic/jetpack-sync/src/class-listener.php(63): Automattic\Jetpack\Sync\Listener->__construct() #3 /htdocs/wp-content/plugins/jetpack/jetpack_vendor/automattic/jetpack-sync/src/class-actions.php(759): Automattic\Jetpack\Sync\Listener::get_instance() #4 /htdocs/wp-content/plugins/jetpack/jetpack_vendor/automattic/jetpack-sync/src/class-actions.php(146): Automattic\Jetpack\Sync\Actions::initialize_listener() #5 /htdocs/wp-content/plugins/jetpack/jetpack_vendor/automattic/jetpack-sync/src/class-main.php(111): Automattic\Jetpack\Sync\Actions::init() #6 /htdocs/wp-includes/class-wp-hook.php(324): Automattic\Jetpack\Sync\Main::on_plugins_loaded_late('') #7 /htdocs/wp-includes/class-wp-hook.php(348): WP_Hook->apply_filters(NULL, Array) #8 /htdocs/wp-includes/plugin.php(517): WP_Hook->do_action(Array) #9 /htdocs/wp-settings.php(559): do_action('plugins_loaded') #10 /htdocs/wp-config.php(85): require_once('/htdocs/wp-sett...') #11 /htdocs/wp-load.php(50): require_once('/htdocs/wp-conf...') #12 /htdocs/wp-blog-header.php(13): require_once('/htdocs/wp-load...') #13 /htdocs/index.php(17): require('/htdocs/wp-blog...') #14 {main} thrown in /htdocs/wp-content/plugins/jetpack/jetpack_vendor/automattic/jetpack-sync/src/class-queue.php on line 65