Cron queue workers should receive queued item creation data

Created on 10 August 2017, over 7 years ago
Updated 28 February 2024, 9 months ago

QueueInterface::claimItem returns useful data about the queued item in addition to the actual data that was queued:

   *   If returned, the object will have at least the following properties:
   *   - data: the same as what what passed into createItem().
   *   - item_id: the unique ID returned from createItem().
   *   - created: timestamp when the item was put into the queue.

In particular, the 'created' timestamp is potentially useful.

However, the cron queue processor throws this away and gives us just the data:

        while (time() < $end && ($item = $queue->claimItem($lease_time))) {
          try {
            $queue_worker->processItem($item->data);

This is nice DX, as queue worker plugin don't need to unserialize it and you get the same thing in your queue worker's processItem() as you put into createItem(), but information is lost.

One way to fix this would be for processItem() to receive the complete item as as second parameter.

Feature request
Status

Active

Version

11.0 🔥

Component
Cron 

Last updated about 1 month ago

No maintainer
Created by

🇬🇧United Kingdom joachim

Live updates comments and jobs are added and updated live.
Sign in to follow issues

Comments & Activities

Not all content is available!

It's likely this issue predates Contrib.social: some issue and comment data are missing.

  • I agree. I have a use-case where it could be solved if I had the entire $item as a second parameter.

    I have implemented a MergeDatabaseQueue to avoid duplicate queue items:

    class MergeDatabaseQueue extends DatabaseQueue {
    
      /**
       * {@inheritdoc}
       */
      protected function doCreateItem($data): ?int {
        // This is copied and modified from DatabaseQueue::doCreateItem().
        // Merge instead of insert to prevent duplicates. An item is updated if it
        // already exists.
        $serialized = serialize($data);
        $query = $this->connection->merge(static::TABLE_NAME)
          ->keys([
            'name' => $this->name,
            'data' => $serialized,
          ])
          ->fields([
            'name' => $this->name,
            'data' => $serialized,
            // Note that this will update the 'created' field for an existing item.
            // We cannot rely on REQUEST_TIME because many items might be created
            // by a single request which takes longer than 1 second.
            'created' => \Drupal::time()->getCurrentTime(),
          ]);
        return $query->execute();
      }
    
    }
    

    However, in one case, I also need to track the time the item was queued, so that the item is only processed if it is older than a week. If I track that as part of the data that is serialized, the item will no longer be mergeable with any other item having the rest of the data the same. What I really need is to be able to view the created field inside the worker's processItem function, rather than having to include it in the serialized data.

  • If you use ultimate_cron, this is a hacky workaround:

    namespace Drupal\my_module\Queue;
    
    use Drupal\Core\Config\ConfigFactory;
    use Drupal\Core\Queue\DelayableQueueInterface;
    use Drupal\Core\Queue\DelayedRequeueException;
    use Drupal\Core\Queue\QueueFactory;
    use Drupal\Core\Queue\QueueWorkerManager;
    use Drupal\Core\Queue\RequeueException;
    use Drupal\Core\Queue\SuspendQueueException;
    use Drupal\Core\Utility\Error;
    use Drupal\ultimate_cron\CronJobInterface;
    use Drupal\ultimate_cron\QueueWorker;
    use Psr\Log\LoggerInterface;
    
    /**
     * A modified copy of \Drupal\ultimate_cron\QueueWorker.
     *
     * @hack This class allows workers to access $item by calling processItem() with
     * $item as the second parameter.
     */
    class ItemAwareQueueWorker extends QueueWorker {
    
      /**
       * The logger.
       */
      protected LoggerInterface $logger;
    
      /**
       * The constructor.
       *
       * @param \Drupal\Core\Queue\QueueWorkerManager $plugin_manager_queue_worker
       *   The queue worker manager.
       * @param \Drupal\Core\Queue\QueueFactory $queue
       *   The queue factory.
       * @param \Drupal\Core\Config\ConfigFactory $config_factory
       *   The config factory.
       * @param \Psr\Log\LoggerInterface $logger
       *   The logger.
       */
      public function __construct(QueueWorkerManager $plugin_manager_queue_worker, QueueFactory $queue, ConfigFactory $config_factory, LoggerInterface $logger) {
        parent::__construct($plugin_manager_queue_worker, $queue, $config_factory);
        // @hack The logger was added for using calls to Error::logException.
        $this->logger = $logger;
      }
    
      /**
       * {@inheritdoc}
       */
      public function queueCallback(CronJobInterface $job): void {
        $queue_name = str_replace(CronJobInterface::QUEUE_ID_PREFIX, '', $job->id());
    
        $queue_manager = $this->pluginManagerQueueWorker;
        $queue_factory = $this->queue;
    
        $config = $this->configFactory->get('ultimate_cron.settings');
    
        $info = $queue_manager->getDefinition($queue_name);
    
        // Make sure every queue exists. There is no harm in trying to recreate
        // an existing queue.
        $queue_factory->get($queue_name)->createQueue();
    
        /** @var \Drupal\Core\Queue\QueueWorkerInterface $queue_worker */
        $queue_worker = $queue_manager->createInstance($queue_name);
        // @hack This line was modified for coding standards.
        $end = microtime(TRUE) + $info['cron']['time'] ?? $config->get('queue.timeouts.time');
    
        /** @var \Drupal\Core\Queue\QueueInterface $queue */
        $queue = $queue_factory->get($queue_name);
        $items = 0;
        while (microtime(TRUE) < $end) {
          // Check kill signal.
          if ($job->getSignal('kill')) {
            \Drupal::logger('ultimate_cron')->warning('Kill signal received for job @job_id', ['@job_id' => $job->id()]);
            break;
          }
    
          $item = $queue->claimItem($config->get('queue.timeouts.lease_time'));
    
          // If there is no item, check the empty delay setting and wait if
          // configured.
          if (!$item) {
            if ($config->get('queue.delays.empty_delay')) {
              usleep($config->get('queue.delays.empty_delay') * 1000000);
              continue;
            }
            else {
              break;
            }
          }
    
          try {
            // We have an item, check if we need to wait.
            if ($config->get('queue.delays.item_delay')) {
              if ($items == 0) {
                // Move the boundary if using a throttle,
                // to avoid waiting for nothing.
                $end -= $config->get('queue.delays.item_delay');
              }
              else {
                // Sleep before retrieving.
                usleep($config->get('queue.delays.item_delay') * 1000000);
              }
            }
    
            // @hack This line was modified.
            $queue_worker->processItem($item->data, $item);
            $queue->deleteItem($item);
            $items++;
          }
          catch (RequeueException $e) {
            // The worker requested the task be immediately requeued.
            $queue->releaseItem($item);
          }
          catch (DelayedRequeueException $e) {
            if ($queue instanceof DelayableQueueInterface) {
              // This queue can handle a custom delay; use the duration provided
              // by the exception.
              $queue->delayItem($item, $e->getDelay());
            }
          }
          catch (SuspendQueueException $e) {
            // If the worker indicates there is a problem with the whole queue,
            // release the item and skip to the next queue.
            $queue->releaseItem($item);
    
            // @hack This line was modified for coding standards.
            Error::logException($this->logger, $e);
    
            // Rethrow the SuspendQueueException, so that the queue is correctly
            // suspended for the current cron run to avoid infinite loops.
            throw $e;
    
          }
          catch (\Exception $e) {
            // In case of any other kind of exception, log it and leave the item
            // in the queue to be processed again later.
            // @hack This line was modified for coding standards.
            Error::logException($this->logger, $e);
          }
        }
      }
    
    }
    

    Services entry:

      my_module.item_aware_queue_worker:
        class: Drupal\my_module\Queue\ItemAwareQueueWorker
        arguments: ["@plugin.manager.queue_worker", "@queue", "@config.factory", '@logger.channel.my_module']
    
Production build 0.71.5 2024