Skip to content

Commit

Permalink
Merge pull request #549 from ndinh215/v2_support_import_gzip_json_file
Browse files Browse the repository at this point in the history
Support import gzip json file by command
  • Loading branch information
saimaz committed Jan 14, 2016
2 parents 7769a7b + d4b9a5d commit 8f7e2a0
Show file tree
Hide file tree
Showing 8 changed files with 98 additions and 47 deletions.
15 changes: 14 additions & 1 deletion Command/IndexImportCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ protected function configure()
InputOption::VALUE_REQUIRED,
'Set bulk size for import',
1000
)
->addOption(
'gzip',
'z',
InputOption::VALUE_NONE,
'Import a gzip file'
);
}

Expand All @@ -53,13 +59,20 @@ protected function execute(InputInterface $input, OutputInterface $output)
{
$manager = $this->getManager($input->getOption('manager'));

// Initialize options array
$options = [];
if ($input->getOption('gzip')) {
$options['gzip'] = null;
}
$options['bulk-size'] = $input->getOption('bulk-size');

/** @var ImportService $importService */
$importService = $this->getContainer()->get('es.import');
$importService->importIndex(
$manager,
$input->getArgument('filename'),
$output,
$input->getOption('bulk-size')
$options
);

$output->writeln('<info>Data import completed!</info>');
Expand Down
3 changes: 2 additions & 1 deletion Resources/doc/commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ Imports data to the selected index. We are using custom `JSON` notation to speci
| Options | Value | What it does |
|:-------------:|:----------------------------:|:--------------------------------------------------------------------------------------:|
| `--manager` | *Manager name. e.g.* `default` | Used to select manager to create index for. If not specified, default manager is used. |
| `--bulk-size` | *Bulk size, default 1000* | The document frequency to flush the index on import.
| `--bulk-size` | *Bulk size, default 1000* | The document frequency to flush the index on import. |
| `--gzip` | *not required* | Used to import Gzip Json files.|

So here's a simple example how the data looks like:

Expand Down
19 changes: 12 additions & 7 deletions Service/ImportService.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,21 @@ class ImportService
* @param Manager $manager
* @param string $filename
* @param OutputInterface $output
* @param int $bulkSize
* @param array $options
*/
public function importIndex(Manager $manager, $filename, OutputInterface $output, $bulkSize)
{
$reader = $this->getReader($manager, $this->getFilePath($filename), false);
public function importIndex(
Manager $manager,
$filename,
OutputInterface $output,
$options
) {
$reader = $this->getReader($manager, $this->getFilePath($filename), $options);

$progress = new ProgressBar($output, $reader->count());
$progress->setRedrawFrequency(100);
$progress->start();

$bulkSize = $options['bulk-size'];
foreach ($reader as $key => $document) {
$data = $document['_source'];
$data['_id'] = $document['_id'];
Expand Down Expand Up @@ -80,12 +85,12 @@ protected function getFilePath($filename)
*
* @param Manager $manager
* @param string $filename
* @param bool $convertDocuments
* @param array $options
*
* @return JsonReader
*/
protected function getReader($manager, $filename, $convertDocuments)
protected function getReader($manager, $filename, $options)
{
return new JsonReader($manager, $filename, $convertDocuments);
return new JsonReader($manager, $filename, $options);
}
}
51 changes: 13 additions & 38 deletions Service/Json/JsonReader.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
namespace ONGR\ElasticsearchBundle\Service\Json;

use ONGR\ElasticsearchBundle\Service\Manager;
use ONGR\ElasticsearchBundle\Result\Converter;
use Symfony\Component\OptionsResolver\OptionsResolver;

/**
Expand Down Expand Up @@ -45,11 +44,6 @@ class JsonReader implements \Countable, \Iterator
*/
private $metadata;

/**
* @var Converter
*/
private $converter;

/**
* @var Manager
*/
Expand All @@ -61,23 +55,23 @@ class JsonReader implements \Countable, \Iterator
private $optionsResolver;

/**
* @var bool
* @var array
*/
private $convertDocuments;
private $options;

/**
* Constructor.
*
* @param Manager $manager
* @param string $filename
* @param bool $convertDocuments
* @param array $options
*
*/
public function __construct($manager, $filename, $convertDocuments = true)
public function __construct($manager, $filename, $options)
{
$this->manager = $manager;
$this->filename = $filename;
$this->converter = $manager->getConverter();
$this->convertDocuments = $convertDocuments;
$this->options = $options;
}

/**
Expand Down Expand Up @@ -108,7 +102,12 @@ public function getManager()
protected function getFileHandler()
{
if ($this->handle === null) {
$fileHandler = @fopen($this->filename, 'r');
$isGzip = array_key_exists('gzip', $this->options);

$filename = !$isGzip?
$this->filename:
sprintf('compress.zlib://%s', $this->filename);
$fileHandler = @fopen($filename, 'r');

if ($fileHandler === false) {
throw new \LogicException('Can not open file.');
Expand Down Expand Up @@ -167,7 +166,7 @@ protected function readLine()
}

$data = json_decode(rtrim($buffer, ','), true);
$this->currentLine = $this->convertDocument($this->getOptionsResolver()->resolve($data));
$this->currentLine = $this->getOptionsResolver()->resolve($data);
}

/**
Expand Down Expand Up @@ -263,14 +262,6 @@ public function getMetadata()
return $this->metadata;
}

/**
* @return Converter
*/
protected function getConverter()
{
return $this->converter;
}

/**
* Returns configured options resolver instance.
*
Expand All @@ -285,20 +276,4 @@ private function getOptionsResolver()

return $this->optionsResolver;
}

/**
* Converts array to document.
*
* @param array $document
*
* @return object
*/
private function convertDocument($document)
{
if (!$this->convertDocuments) {
return $document;
}

return $this->getConverter()->convertToDocument($document, $this->getManager());
}
}
57 changes: 57 additions & 0 deletions Tests/Functional/Command/IndexImportCommandTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,20 @@ public function bulkSizeProvider()
];
}

/**
* Compressed Data provider for testIndexImport.
*
* @return array
*/
public function compressedDataProvider()
{
return [
[10, 9, 'command_import_9.json.gz'],
[10, 10, 'command_import_10.json.gz'],
[10, 11, 'command_import_11.json.gz'],
];
}

/**
* Test for index import command.
*
Expand Down Expand Up @@ -76,6 +90,49 @@ public function testIndexImport($bulkSize, $realSize, $filename)
$this->assertEquals($data, $ids);
}

/**
* Test for index import command with gzip option.
*
* @param int $bulkSize
* @param int $realSize
* @param string $filename
*
* @dataProvider compressedDataProvider
*/
public function testIndexImportWithGzipOption($bulkSize, $realSize, $filename)
{
$app = new Application();
$app->add($this->getImportCommand());

$command = $app->find('ongr:es:index:import');
$commandTester = new CommandTester($command);
$commandTester->execute(
[
'command' => $command->getName(),
'filename' => __DIR__ . '/../../app/fixture/data/' . $filename,
'--bulk-size' => $bulkSize,
'--gzip' => null,
]
);

$manager = $this->getManager();
$manager->dropIndex();
$repo = $manager->getRepository('AcmeBarBundle:Product');
$search = $repo
->createSearch()
->addQuery(new MatchAllQuery())
->setSize($realSize);
$results = $repo->execute($search);

$ids = [];
foreach ($results as $doc) {
$ids[] = substr($doc->id, 3);
}
sort($ids);
$data = range(1, $realSize);
$this->assertEquals($data, $ids);
}

/**
* Returns import index command with assigned container.
*
Expand Down
Binary file added Tests/app/fixture/data/command_import_10.json.gz
Binary file not shown.
Binary file added Tests/app/fixture/data/command_import_11.json.gz
Binary file not shown.
Binary file added Tests/app/fixture/data/command_import_9.json.gz
Binary file not shown.

0 comments on commit 8f7e2a0

Please sign in to comment.