I agree. I have a use-case where it could be solved if I had the entire
$item
as a second parameter.I have implemented a MergeDatabaseQueue to avoid duplicate queue items:
class MergeDatabaseQueue extends DatabaseQueue { /** * {@inheritdoc} */ protected function doCreateItem($data): ?int { // This is copied and modified from DatabaseQueue::doCreateItem(). // Merge instead of insert to prevent duplicates. An item is updated if it // already exists. $serialized = serialize($data); $query = $this->connection->merge(static::TABLE_NAME) ->keys([ 'name' => $this->name, 'data' => $serialized, ]) ->fields([ 'name' => $this->name, 'data' => $serialized, // Note that this will update the 'created' field for an existing item. // We cannot rely on REQUEST_TIME because many items might be created // by a single request which takes longer than 1 second. 'created' => \Drupal::time()->getCurrentTime(), ]); return $query->execute(); } }
However, in one case, I also need to track the time the item was queued, so that the item is only processed if it is older than a week. If I track that as part of the data that is serialized, the item will no longer be mergeable with any other item having the rest of the data the same. What I really need is to be able to view the
created
field inside the worker'sprocessItem
function, rather than having to include it in the serialized data.If you use ultimate_cron, this is a hacky workaround:
namespace Drupal\my_module\Queue; use Drupal\Core\Config\ConfigFactory; use Drupal\Core\Queue\DelayableQueueInterface; use Drupal\Core\Queue\DelayedRequeueException; use Drupal\Core\Queue\QueueFactory; use Drupal\Core\Queue\QueueWorkerManager; use Drupal\Core\Queue\RequeueException; use Drupal\Core\Queue\SuspendQueueException; use Drupal\Core\Utility\Error; use Drupal\ultimate_cron\CronJobInterface; use Drupal\ultimate_cron\QueueWorker; use Psr\Log\LoggerInterface; /** * A modified copy of \Drupal\ultimate_cron\QueueWorker. * * @hack This class allows workers to access $item by calling processItem() with * $item as the second parameter. */ class ItemAwareQueueWorker extends QueueWorker { /** * The logger. */ protected LoggerInterface $logger; /** * The constructor. * * @param \Drupal\Core\Queue\QueueWorkerManager $plugin_manager_queue_worker * The queue worker manager. * @param \Drupal\Core\Queue\QueueFactory $queue * The queue factory. * @param \Drupal\Core\Config\ConfigFactory $config_factory * The config factory. * @param \Psr\Log\LoggerInterface $logger * The logger. */ public function __construct(QueueWorkerManager $plugin_manager_queue_worker, QueueFactory $queue, ConfigFactory $config_factory, LoggerInterface $logger) { parent::__construct($plugin_manager_queue_worker, $queue, $config_factory); // @hack The logger was added for using calls to Error::logException. $this->logger = $logger; } /** * {@inheritdoc} */ public function queueCallback(CronJobInterface $job): void { $queue_name = str_replace(CronJobInterface::QUEUE_ID_PREFIX, '', $job->id()); $queue_manager = $this->pluginManagerQueueWorker; $queue_factory = $this->queue; $config = $this->configFactory->get('ultimate_cron.settings'); $info = $queue_manager->getDefinition($queue_name); // Make sure every queue exists. There is no harm in trying to recreate // an existing queue. $queue_factory->get($queue_name)->createQueue(); /** @var \Drupal\Core\Queue\QueueWorkerInterface $queue_worker */ $queue_worker = $queue_manager->createInstance($queue_name); // @hack This line was modified for coding standards. $end = microtime(TRUE) + $info['cron']['time'] ?? $config->get('queue.timeouts.time'); /** @var \Drupal\Core\Queue\QueueInterface $queue */ $queue = $queue_factory->get($queue_name); $items = 0; while (microtime(TRUE) < $end) { // Check kill signal. if ($job->getSignal('kill')) { \Drupal::logger('ultimate_cron')->warning('Kill signal received for job @job_id', ['@job_id' => $job->id()]); break; } $item = $queue->claimItem($config->get('queue.timeouts.lease_time')); // If there is no item, check the empty delay setting and wait if // configured. if (!$item) { if ($config->get('queue.delays.empty_delay')) { usleep($config->get('queue.delays.empty_delay') * 1000000); continue; } else { break; } } try { // We have an item, check if we need to wait. if ($config->get('queue.delays.item_delay')) { if ($items == 0) { // Move the boundary if using a throttle, // to avoid waiting for nothing. $end -= $config->get('queue.delays.item_delay'); } else { // Sleep before retrieving. usleep($config->get('queue.delays.item_delay') * 1000000); } } // @hack This line was modified. $queue_worker->processItem($item->data, $item); $queue->deleteItem($item); $items++; } catch (RequeueException $e) { // The worker requested the task be immediately requeued. $queue->releaseItem($item); } catch (DelayedRequeueException $e) { if ($queue instanceof DelayableQueueInterface) { // This queue can handle a custom delay; use the duration provided // by the exception. $queue->delayItem($item, $e->getDelay()); } } catch (SuspendQueueException $e) { // If the worker indicates there is a problem with the whole queue, // release the item and skip to the next queue. $queue->releaseItem($item); // @hack This line was modified for coding standards. Error::logException($this->logger, $e); // Rethrow the SuspendQueueException, so that the queue is correctly // suspended for the current cron run to avoid infinite loops. throw $e; } catch (\Exception $e) { // In case of any other kind of exception, log it and leave the item // in the queue to be processed again later. // @hack This line was modified for coding standards. Error::logException($this->logger, $e); } } } }
Services entry:
my_module.item_aware_queue_worker: class: Drupal\my_module\Queue\ItemAwareQueueWorker arguments: ["@plugin.manager.queue_worker", "@queue", "@config.factory", '@logger.channel.my_module']