- last update
over 1 year ago Patch Failed to Apply - π»π³Vietnam tannguyenhn
I created a patch to process amount of dataset per request.
- last update
over 1 year ago Patch Failed to Apply - last update
over 1 year ago Patch Failed to Apply - last update
over 1 year ago Patch Failed to Apply - last update
over 1 year ago 47 pass, 4 fail - last update
over 1 year ago 47 pass, 4 fail The last submitted patch, 20: timeout-processing-large-file-3260509-20.patch, failed testing. View results β
- Status changed to Needs work
over 1 year ago 2:46pm 20 April 2023 - last update
over 1 year ago 57 pass - Status changed to Needs review
over 1 year ago 2:55pm 20 April 2023 - last update
over 1 year ago Patch Failed to Apply - π΅π°Pakistan jibran Sydney, Australia
I have moved the queuing bug to π Fix the queuing logic for indexing Closed: duplicate and created π Do not loop over the generator in Dataset::data() instead use the validate date iterator Closed: duplicate for looping over the generator issue.
For π Fix the queuing logic for indexing Closed: duplicate , let's move the patch from #23 there.
For π Do not loop over the generator in Dataset::data() instead use the validate date iterator Closed: duplicate , let's move the patch from #15 there. - π΅π°Pakistan jibran Sydney, Australia
We'll comeback to this once the child issues are fixed and see if this is still a bug.
- last update
over 1 year ago 62 pass - π¦πΊAustralia larowlan π¦πΊπ.au GMT+10
Here's a patch that combines both approaches, retains the queue, and limits the amount of times we seek over the data or call ->extractDataFromSource
Works locally for large datasets and those with tricky validation
No interdiff as its a redo from scratch copying ideas from both approaches.
With this patch we:
* queue items in chunks
* process the queue
* validation happens during processing
* invalid items are skippedOpen to making the chunksize configurable per @tannguyenhn's patch if that's desired
- π¦πΊAustralia larowlan π¦πΊπ.au GMT+10
We could even make the chunk size a field on the dataset, so smaller datasets (less complex rows) could use a large number for speed, and big datasets (lots of data in each row) could use a smaller number for memory overhead
- π¦πΊAustralia larowlan π¦πΊπ.au GMT+10
-
+++ b/src/Entity/DatasetInterface.php @@ -16,8 +19,12 @@ interface DatasetInterface extends ContentEntityInterface { + const VALIDATION_QUEUE_PREFIX = 'data_pipelines_validate:';
this is dead code and can be removed
-
+++ b/src/Plugin/QueueWorker/ProcessValidation.php @@ -0,0 +1,75 @@ + +declare(strict_types=1); + +namespace Drupal\data_pipelines\Plugin\QueueWorker; + +use Drupal\Core\Entity\EntityTypeManagerInterface; +use Drupal\Core\Plugin\ContainerFactoryPluginInterface; +use Drupal\Core\Queue\QueueWorkerBase; +use Drupal\data_pipelines\DatasetData; +use Drupal\data_pipelines\Entity\DatasetInterface; +use Symfony\Component\DependencyInjection\ContainerInterface; +use Symfony\Component\Validator\ConstraintViolation; + +/** + * Defines a class for processing validation. + * + * @QueueWorker( + * id = "data_pipelines_validate", + * title = @Translation("Validation"), + * deriver = + * \Drupal\data_pipelines\Plugin\Derivative\DatasetQueueWorkerDeriver::class, + * ) + */ +final class ProcessValidation extends QueueWorkerBase implements ContainerFactoryPluginInterface {
this is dead code now and can be removed
-
+++ b/src/TransformValidDataIterator.php @@ -0,0 +1,67 @@ + $this->position++;
We probably need an implementation of ::rewind here to reset the pointer, I'll add a test and a fix for that
-
- π¦πΊAustralia larowlan π¦πΊπ.au GMT+10
We probably need an implementation of ::rewind here to reset the pointer, I'll add a test and a fix for that
This isn't needed because you can't rewind a generator (the inner iterator) so we're good
- π¦πΊAustralia larowlan π¦πΊπ.au GMT+10
Worth noting that the use of queues here means if for whatever reason the batch process dies part way through indexing you can just use
drush queue-run
with the queue name to pick up where it left off - last update
over 1 year ago 63 pass - π¦πΊAustralia larowlan π¦πΊπ.au GMT+10
- π¦πΊAustralia larowlan π¦πΊπ.au GMT+10
Screenshot of new field
- π΅π°Pakistan jibran Sydney, Australia
The patch looks really good overall. Just some minor feedback.
-
+++ b/data_pipelines.install @@ -266,3 +266,31 @@ function data_pipelines_update_8013() { + 10 => 10, + 100 => 100, + 1000 => 1000, + 10000 => 10000, + 100000 => 100000,
Can we add ',' on the values side for sanity? :D
-
+++ b/src/Entity/Dataset.php @@ -223,13 +250,71 @@ class Dataset extends ContentEntityBase implements DatasetInterface { + public function queueProcessing(QueueInterface $queue, int $from = 0, int $limit = 1000): int {
This is too convoluted for my understanding can we please add more docs around the logic?
-
+++ b/src/Entity/Dataset.php @@ -223,13 +250,71 @@ class Dataset extends ContentEntityBase implements DatasetInterface { + foreach ($destinations as $destination_id => $destination) {
Do we have a test to check it works with multiple destinations?
-
+++ b/src/Entity/Dataset.php @@ -248,15 +334,12 @@ class Dataset extends ContentEntityBase implements DatasetInterface { + public function getDataIterator(int $seek_from = 0): TransformValidDataIterator {
How is seeking working? Can we add a comment
-
+++ b/src/Form/DatasetBatchOperations.php @@ -108,50 +86,24 @@ class DatasetBatchOperations implements ContainerInjectionInterface { + $indexed = $dataset->queueProcessing(\Drupal::queue(DatasetInterface::QUEUE_NAME_PREFIX . $dataset->id()), $context['sandbox']['from'], $chunk_size);
Let's use dataset machine name instead of id for queues. Do we need a follow up for that?
-
- π¦πΊAustralia mortim07
@larowlan Great work, looks solid! Just a few comments.
@@ -223,13 +250,71 @@ class Dataset extends ContentEntityBase implements DatasetInterface { + $destinations = []; + $chunk_sizes = []; + $buffers = []; + foreach ($this->get('destinations') as $destination) { + if (!$entity = $destination->entity) { + continue; + }
You can just use the method `getDestinations()`.
@@ -105,35 +103,23 @@ final class DestinationWorker extends QueueWorkerBase implements ContainerFactor - $this->logger->error("Unknown destination request of type %type", ['%type' => get_class($data)]); + $result = $data->getOperation()->process($destinationPlugin, $dataset, $data->getChunk());
Just pass the data and let the operation determine if it needs the chunk.
@@ -0,0 +1,67 @@ + $errors = $this->pipeline->validate($this->getInnerIterator()->current()); + if ($errors->count() > 0) { + foreach ($errors as $validation_error) { + assert($validation_error instanceof ConstraintViolation); + $this->dataset->addLogMessage(sprintf('Validation error in row %d: %s', $this->position, (string) $validation_error->getMessage()), FALSE); + } + $this->dataset->save(); + return FALSE; + }
Is it possible to encapsulate this within the valid method as part of the filter iterator?
- last update
over 1 year ago 64 pass - π¦πΊAustralia larowlan π¦πΊπ.au GMT+10
- last update
over 1 year ago 64 pass - Status changed to RTBC
over 1 year ago 1:15am 30 May 2023 - π΅π°Pakistan jibran Sydney, Australia
The docs update looks good. Happy to mark it RTBC.
- Status changed to Fixed
over 1 year ago 1:23am 30 May 2023 - π¦πΊAustralia larowlan π¦πΊπ.au GMT+10
Committed to 1.x - thanks!
-
larowlan β
committed 8b960a99 on 1.x
Issue #3316432 by jibran, tannguyenhn, larowlan, gordon, mortim07:...
-
larowlan β
committed 8b960a99 on 1.x
Automatically closed - issue fixed for 2 weeks with no activity.