Could you please explain why, when launching the indexing process using the command $this->bus->dispatch(new ProductIndexMessage($productIds))
(with hundreds of such commands being executed at once), they are evenly distributed across all transports and queues? According to the design, they should only be launched on the searchIndex
queue.
PHP: 8.4.5, Symfony 7.2
/usr/local/bin/php -f /var/www/html/app/bin/console messenger:stats
---------------- -------
Transport Count
---------------- -------
async 40
searchIndex 40
genImageThumbs 40
failed 6
---------------- -------
File:
<?php
declare(strict_types=1);
namespace AppMessageSearch;
use AppMessageSearchIndexMessageInterface;
use AppUtilsUtils;
final readonly class ProductIndexMessage implements SearchIndexMessageInterface
{
/**
* @var int[]
*/
public array $productIds;
public function __construct(int|array $productIds)
{
$this->productIds = Utils::asPosAndNotZeroNotDupIntegerArray($productIds);
}
}
There is a configured messanger.yaml
:
framework:
messenger:
failure_transport: failed
transports:
async:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%/async'
retry_strategy:
max_retries: 20
multiplier: 2
searchIndex:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%/searchIndex'
options:
exchange:
name: 'searchIndex'
retry_strategy:
max_retries: 20
multiplier: 2
genImageThumbs:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%/genImageThumbs'
options:
exchange:
name: 'genImageThumbs'
retry_strategy:
max_retries: 1
multiplier: 2
failed: 'doctrine://default?queue_name=failed'
# sync: 'sync://'
routing:
SymfonyComponentMailerMessengerSendEmailMessage: async
SymfonyComponentNotifierMessageChatMessage: async
SymfonyComponentNotifierMessageSmsMessage: async
# Route your messages to the transports
AppMessageAsyncMessageInterface: async
AppMessageSearchIndexMessageInterface: searchIndex
AppMessageImageGenThumbsMessageInterface: genImageThumbs
Commands to start workers:
/usr/local/bin/php /var/www/html/app/bin/console messenger:consume genImageThumbs --limit=1500 --time-limit=86400
/usr/local/bin/php /var/www/html/app/bin/console messenger:consume searchIndex --limit=1500 --time-limit=86400
/usr/local/bin/php /var/www/html/app/bin/console messenger:consume async --limit=1500 --time-limit=86400