Skip to content

Commit

Permalink
Merge pull request #2585 from 10up/feature/dynamic-indexing-4.x.x
Browse files Browse the repository at this point in the history
Dynamic Indexing (for 4.0)
  • Loading branch information
felipeelia authored Mar 4, 2022
2 parents aaf2a33 + 78298d1 commit 424ba22
Show file tree
Hide file tree
Showing 3 changed files with 277 additions and 22 deletions.
6 changes: 5 additions & 1 deletion includes/classes/Command.php
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,9 @@ public function delete_transient_on_int( $signal_no ) {
* [--nobulk]
* : Disable bulk indexing
*
* [--static-bulk]
* : Do not use dynamic bulk requests, i.e., send only one request per batch of documents.
*
* [--show-errors]
* : Show all errors
*
Expand Down Expand Up @@ -741,13 +744,14 @@ public function index( $args, $assoc_args ) {

$index_args = [
'method' => 'cli',
'total_attempts' => 3,
'total_attempts' => 1,
'indexables' => $indexables,
'put_mapping' => ! empty( $setup_option ),
'output_method' => [ $this, 'index_output' ],
'network_wide' => ( ! empty( $assoc_args['network-wide'] ) ) ? $assoc_args['network-wide'] : null,
'nobulk' => $no_bulk,
'offset' => ( ! empty( $assoc_args['offset'] ) ) ? absint( $assoc_args['offset'] ) : 0,
'static_bulk' => ( ! empty( $assoc_args['static-bulk'] ) ) ? $assoc_args['static-bulk'] : null,
];

if ( isset( $assoc_args['show-errors'] ) || ( isset( $assoc_args['show-bulk-errors'] ) && ! $no_bulk ) || ( isset( $assoc_args['show-nobulk-errors'] ) && $no_bulk ) ) {
Expand Down
61 changes: 40 additions & 21 deletions includes/classes/IndexHelper.php
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,8 @@ protected function index_next_batch() {
*/
do_action( 'ep_index_batch_new_attempt', $attempts, $total_attempts );

$should_retry = false;

if ( $nobulk ) {
$object_id = reset( $queued_items_ids );
$return = $indexable->index( $object_id, true );
Expand Down Expand Up @@ -602,30 +604,47 @@ protected function index_next_batch() {
$failed_objects[ $object->ID ] = null;
}
}

if ( is_wp_error( $return ) ) {
$should_retry = true;
}
} else {
$return = $indexable->bulk_index( $queued_items_ids );
if ( ! empty( $this->args['static_bulk'] ) ) {
$bulk_requests = [ $indexable->bulk_index( $queued_items_ids ) ];
} else {
$bulk_requests = $indexable->bulk_index_dynamically( $queued_items_ids );
}

/**
* Fires after bulk indexing
*
* @hook ep_cli_{indexable_slug}_bulk_index
* @param {array} $objects Objects being indexed
* @param {array} response Elasticsearch bulk index response
*/
do_action( "ep_cli_{$indexable->slug}_bulk_index", $queued_items, $return );

if ( is_array( $return ) && isset( $return['errors'] ) && true === $return['errors'] ) {
$failed_objects = array_filter(
$return['items'],
function( $item ) {
return ! empty( $item['index']['error'] );
}
);
$failed_objects = [];
foreach ( $bulk_requests as $return ) {
/**
* Fires after bulk indexing
*
* @hook ep_cli_{indexable_slug}_bulk_index
* @param {array} $objects Objects being indexed
* @param {array} response Elasticsearch bulk index response
*/
do_action( "ep_cli_{$indexable->slug}_bulk_index", $queued_items, $return );

if ( is_wp_error( $return ) ) {
$should_retry = true;
}
if ( is_array( $return ) && isset( $return['errors'] ) && true === $return['errors'] ) {
$failed_objects = array_merge(
$failed_objects,
array_filter(
$return['items'],
function( $item ) {
return ! empty( $item['index']['error'] );
}
)
);
}
}
}

// Things worked, we don't need to try again.
if ( ! is_wp_error( $return ) && ! count( $failed_objects ) ) {
if ( ! $should_retry && ! count( $failed_objects ) ) {
break;
}
}
Expand Down Expand Up @@ -839,9 +858,9 @@ protected function create_network_alias() {
* Output a message.
*
* @since 4.0.0
* @param string $message_text Message to be outputted
* @param string $type Type of message
* @param string $context Context of the output
* @param string|array $message_text Message to be outputted
* @param string $type Type of message
* @param string $context Context of the output
* @return void
*/
protected function output( $message_text, $type = 'info', $context = '' ) {
Expand Down
232 changes: 232 additions & 0 deletions includes/classes/Indexable.php
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,238 @@ public function bulk_index( $object_ids ) {
return $result;
}

/**
* Bulk index objects but with a dynamic size of queue.
*
* @since 4.0.0
* @param array $object_ids Array of object IDs.
* @return array[WP_Error|array] The return of each request made.
*/
public function bulk_index_dynamically( $object_ids ) {
$documents = [];

foreach ( $object_ids as $object_id ) {
$action_args = array(
'index' => array(
'_id' => absint( $object_id ),
),
);

$document = $this->prepare_document( $object_id );

/**
* Conditionally kill indexing on a specific object
*
* @hook ep_bulk_index_action_args
* @param {array} $action_args Bulk action arguments
* @param {array} $document Document to index
* @since 3.0
* @return {array} New action args
*/
$document_str = wp_json_encode( apply_filters( 'ep_bulk_index_action_args', $action_args, $document ) ) . "\n";
$document_str .= addcslashes( wp_json_encode( $document ), "\n" );
$document_str .= "\n\n";

$documents[] = $document_str;
}

$results = $this->send_bulk_index_request( $documents );

/**
* Perform actions after a dynamic bulk indexing is completed
*
* @hook ep_after_bulk_index_dynamically
* @since 4.0.0
* @param {array} $object_ids List of object ids attempted to be indexed
* @param {string} $slug Current indexable slug
* @param {array|bool} $result Result of the Elasticsearch query. False on error.
*/
do_action( 'ep_after_bulk_index_dynamically', $object_ids, $this->slug, $results );

return $results;
}

/**
* Bulk index documents through several requests with dynamic size.
*
* @param array $documents The documents to be sent to Elasticsearch (already formatted.)
* @return array[WP_Error|array]
*/
protected function send_bulk_index_request( $documents ) {
static $min_buffer_size, $max_buffer_size, $current_buffer_size, $incremental_step;

if ( ! $min_buffer_size ) {
/**
* Filter the minimum buffer size for dynamic bulk index requests.
*
* @hook ep_dynamic_bulk_min_buffer_size
* @since 4.0.0
* @param {int} $min_buffer_size Min buffer size for dynamic bulk index (in bytes.)
* @return {int} New size.
*/
$min_buffer_size = apply_filters( 'ep_dynamic_bulk_min_buffer_size', MB_IN_BYTES / 2 );
}

if ( ! $max_buffer_size ) {
/**
* Filter the max buffer size for dynamic bulk index requests.
*
* @hook ep_dynamic_bulk_max_buffer_size
* @since 4.0.0
* @param {int} $max_buffer_size Max buffer size for dynamic bulk index (in bytes.)
* @return {int} New size.
*/
$max_buffer_size = apply_filters( 'ep_dynamic_bulk_max_buffer_size', 150 * MB_IN_BYTES );
}

if ( ! $incremental_step ) {
/**
* Filter the number of bytes the current buffer size should be incremented in case of success.
*
* @hook ep_dynamic_bulk_incremental_step
* @since 4.0.0
* @param {int} $incremental_step Number of bytes to add to the current buffer size.
* @return {int} New incremental step.
*/
$incremental_step = apply_filters( 'ep_dynamic_bulk_incremental_step', MB_IN_BYTES / 2 );
}

/**
* Perform actions before a new batch of documents is processed.
*
* @hook ep_before_send_dynamic_bulk_requests
* @since 4.0.0
* @param {array} $documents Array of documents to be sent to Elasticsearch.
*/
do_action( 'ep_before_send_dynamic_bulk_requests', $documents );

if ( ! $current_buffer_size ) {
$current_buffer_size = $min_buffer_size;
}

$results = [];

$body = [];

$requests = 0;

/*
* This script will use two main arrays: $body and $documents, being $body the
* documents to be sent in the next request and $documents the list of docs to be indexed.
* The do-while loop will stop if all documents are sent or if a request fails even sending
* a buffer as small as possible.
*/
do {
$next_document = array_shift( $documents );

// If the next document alone takes the entire current buffer size,
// let's add it back to the pipe and send what we have first
if ( mb_strlen( $next_document ) > $current_buffer_size && count( $body ) > 0 ) {
array_unshift( $documents, $next_document );
} else {
if ( mb_strlen( $next_document ) > $max_buffer_size ) {
/**
* Perform actions when a post is bigger than the max buffer size.
*
* @hook ep_dynamic_bulk_post_too_big
* @since 4.0.0
* @param {string} $document JSON string of the post detected as too big.
*/
do_action( 'ep_dynamic_bulk_post_too_big', $next_document );
$results[] = new \WP_Error( 'ep_too_big_request_skipped', 'Indexable too big. Request not sent.' );
continue;
}
$body[] = $next_document;
if ( mb_strlen( implode( '', $body ) ) < $current_buffer_size && ! empty( $documents ) ) {
continue;
}
if ( mb_strlen( implode( '', $body ) ) > $max_buffer_size ) {
// The last document added to body made it too big, so let's give it back.
array_unshift( $documents, array_pop( $body ) );
}
}

// Try the request.
timer_start();
$result = Elasticsearch::factory()->bulk_index( $this->get_index_name(), $this->slug, implode( '', $body ) );
$request_time = timer_stop();
$requests++;

/**
* Perform actions before a new batch of documents is processed.
*
* @hook ep_after_send_dynamic_bulk_request
* @since 4.0.0
* @param {WP_Error|array} $result Result of the request.
* @param {array} $body Array of documents sent to Elasticsearch.
* @param {array} $documents Array of documents to be sent to Elasticsearch.
* @param {int} $min_buffer_size Min buffer size for dynamic bulk index (in bytes.)
* @param {int} $max_buffer_size Max buffer size for dynamic bulk index (in bytes.)
* @param {int} $current_buffer_size Current buffer size for dynamic bulk index (in bytes.)
* @param {int} $request_time Total time of the request.
*/
do_action( 'ep_after_send_dynamic_bulk_request', $result, $body, $documents, $min_buffer_size, $max_buffer_size, $current_buffer_size, $request_time );

// It failed, possibly adjust the buffer size and try again.
if ( is_wp_error( $result ) ) {
// Too many requests, wait and try again.
if ( 429 === $result->get_error_code() ) {
sleep( 2 );
}

// If the error is not a "Request too big" then we really fail this batch of documents.
if ( 413 !== $result->get_error_code() ) {
$results[] = $result;
continue;
}

if ( count( $body ) === 1 ) {
$max_buffer_size = min( $max_buffer_size, mb_strlen( implode( '', $body ) ) );
$results[] = $result;
$body = [];
continue;
}

// As the buffer is as small as possible, return the error.
if ( mb_strlen( implode( '', $body ) ) === $min_buffer_size ) {
$results[] = $result;
continue;
}

// We have a too big buffer. Remove one doc from the body, and set both max and current as its size.
array_unshift( $documents, array_pop( $body ) );

$max_buffer_size = count( $body ) ?
max( $min_buffer_size, mb_strlen( implode( '', $body ) ) ) :
$min_buffer_size;

$current_buffer_size = $max_buffer_size;
continue;
}

// Things worked so we can try to bump the buffer size.
if ( $current_buffer_size < $max_buffer_size && mb_strlen( implode( '', $body ) ) > $current_buffer_size ) {
$current_buffer_size = min( ( $current_buffer_size + $incremental_step ), $max_buffer_size );
}

$results[] = $result;

$body = [];
} while ( ! empty( $documents ) );

/**
* Perform actions after a batch of documents was processed.
*
* @hook ep_after_send_dynamic_bulk_requests
* @since 4.0.0
* @param {array} $results Array of results sent.
* @param {int} $requests Number of all requests sent.
*/
do_action( 'ep_after_send_dynamic_bulk_requests', $results, $requests );

return $results;
}

/**
* Query Elasticsearch for documents
*
Expand Down

0 comments on commit 424ba22

Please sign in to comment.