Add support for pre-delayed queue items

Created on 18 November 2020, almost 4 years ago
Updated 22 February 2023, over 1 year ago

Problem/Motivation

The queues allow storing any serializable data (serialization is only a limitation for the non-memory queues, e.g. \Drupal\Core\Queue\DatabaseQueue). Here is how the item creation can look:

class QueueItem {
  public function __toString(): string {
    return serialize($this);
  }
}

\Drupal::queue('queue_name')->createItem(new QueueItem());

The issue occurs for the storage-based queues in case of throwing any of the following exceptions during the worker's processItem():

  1. \Drupal\Core\Queue\DelayedRequeueException
  2. \Drupal\Core\Queue\RequeueException
  3. \Drupal\Core\Queue\SuspendQueueException

The in-memory queues do not serialize the instance of QueueItem so it is always the same, can mutate inside the processItem() and preserve the mutated state to the next attempt. The queues like \Drupal\Core\Queue\DatabaseQueue serializes the instance of QueueItem on creation and unserializes on claiming the item meaning the object state can be set only during its instantiation and cannot mutate across multiple attempts.

Steps to reproduce

  1. Implement the class that will play the queue item.
    namespace Drupal\my_module\Component\Queue;
    
    class QueueItem {
      public $processingAttempt = 1;
    
      public function __toString(): string {
        return serialize($this);
      }
    }
    
  2. Implement the queue worker.
    namespace Drupal\my_module\Plugin\QueueWorker;
    
    use Drupal\Core\Queue\DelayedRequeueException;
    use Drupal\my_module\Component\Queue\QueueItem;
    
    /**
     * @QueueWorker(
     *   id = "queue_name",
     *   title = @Translation("The Queue Worker"),
     *   cron = {
     *     "time" = 30,
     *   },
     * )
     *
     * @see \Drupal\Core\Cron::processQueues()
     */
    class TheQueueWorker extends QueueWorkerBase {
      public function processItem($item): void {
        assert($item instanceof QueueItem);
        // The default (initial) value is 1.
        if ($item->processingAttempt === 1) {
          // Increase the value to avoid throwing the exception.
          $item->processingAttempt++;
          throw new DelayedRequeueException(60);
        }
      }
    }
    
  3. Create the queue item.
    \Drupal::queue('queue_name')->createItem(new QueueItem());
    
  4. Run the cron to start processing the queues. The item will disappear from the in-memory queues because its state mutates across attempts and stay forever in the database queues because the processingAttempt change is never committed.

Proposed resolution

Update the data field in the \Drupal\Core\Queue\DatabaseQueue::releaseItem() and \Drupal\Core\Queue\DatabaseQueue::delayItem() (called when one of the mentioned exceptions occurs).

Remaining tasks

  1. #3184170: The `releaseItem()` and `delayItem()` of `Drupal\Core\Queue\DatabaseQueue` violates interfaces return type specifications
Feature request
Status

Needs work

Version

9.5

Component
Cron 

Last updated 15 days ago

No maintainer
Created by

🇺🇦Ukraine BR0kEN Dnipro

Live updates comments and jobs are added and updated live.
  • Needs subsystem maintainer review

    It is used to alert the maintainer(s) of a particular core subsystem that an issue significantly impacts their subsystem, and their signoff is needed (see the governance policy draft for more information). Also, if you use this tag, make sure the issue component is set to the correct subsystem. If an issue significantly impacts more than one subsystem, use needs framework manager review instead.

  • Needs issue summary update

    Issue summaries save everyone time if they are kept up-to-date. See Update issue summary task instructions.

Sign in to follow issues

Comments & Activities

Not all content is available!

It's likely this issue predates Contrib.social: some issue and comment data are missing.

  • 🇬🇧United Kingdom catch

    I think mutable queue items is going a bit too far away from what queue implementations do in general.

    Delayed (re-)queuing is something we already support in the API though, this would be just allowing that to be specified when creating the item.

    Changing the issue title, but we'll also need an issue summary update and patch if we go that way.

    @BR0kEN do you have any use cases that aren't delayed requeueing that you intended to use this for?

  • 🇬🇧United Kingdom james.williams

    I think delayed requeuing works for my needs, but only as long as the number of retries can be tracked. If that's stored as a property on the items, isn't that just the same as making the items mutable anyway, which could still be useful on its own? (Especially as that's how in-memory queue items already behave.)

  • 🇦🇺Australia dpi Perth, Australia

    but only as long as the number of retries can be tracked.

    I think you'd want to use one of the keyvalue* services for this.

    I think mutable queue items is going a bit too far away from what queue implementations do in general.

    It does sound a lot like Symfony Envelope Stamps though. However we don't implement anything like it in Drupal. I'd like to see it personally.

    » Add support for pre-delayed queue items

    Which is basically \Symfony\Component\Messenger\Stamp\DelayStamp. In the case of database backed messages, is a simple timestamp column on the message table indicating minimum time to execute the message.

Production build 0.71.5 2024