I am running this worker:
php -d memory_limit=-1 bin/console messenger:consume sqs_channel_manager -vv
And worker fails:
14:29:32 INFO [messenger] Received message AppDomainEventChannelManagerChannelManagerEventHasReceived ["class" => "AppDomainEventChannelManagerChannelManagerEventHasReceived"]
14:29:32 WARNING [messenger] Error thrown while handling message AppDomainEventChannelManagerChannelManagerEventHasReceived. Sending for retry #1 using 922 ms delay. Error: "Handling "AppDomainEventChannelManagerChannelManagerEventHasReceived" failed: " ["class" => "AppDomainEventChannelManagerChannelManagerEventHasReceived","message_id" => null,"retryCount" => 1,"delay" => 922,"error" => "Handling "AppDomainEventChannelManagerChannelManagerEventHasReceived" failed: ","exception" => SymfonyComponentMessengerExceptionHandlerFailedException^ { …}]
14:29:37 INFO [messenger] Received message AppDomainEventChannelManagerChannelManagerEventHasReceived ["class" => "AppDomainEventChannelManagerChannelManagerEventHasReceived"]
14:29:37 WARNING [messenger] Error thrown while handling message AppDomainEventChannelManagerChannelManagerEventHasReceived. Sending for retry #3 using 3676 ms delay. Error: "Handling "AppDomainEventChannelManagerChannelManagerEventHasReceived" failed: " ["class" => "AppDomainEventChannelManagerChannelManagerEventHasReceived","message_id" => null,"retryCount" => 3,"delay" => 3676,"error" => "Handling "AppDomainEventChannelManagerChannelManagerEventHasReceived" failed: ","exception" => SymfonyComponentMessengerExceptionHandlerFailedException^ { …}]
14:30:08 INFO [messenger] Received message AppDomainEventChannelManagerChannelManagerEventHasReceived ["class" => "AppDomainEventChannelManagerChannelManagerEventHasReceived"]
14:30:08 CRITICAL [messenger] Error thrown while handling message AppDomainEventChannelManagerChannelManagerEventHasReceived. Removing from transport after 5 retries. Error: "Handling "AppDomainEventChannelManagerChannelManagerEventHasReceived" failed: " ["class" => "AppDomainEventChannelManagerChannelManagerEventHasReceived","message_id" => null,"retryCount" => 5,"error" => "Handling "AppDomainEventChannelManagerChannelManagerEventHasReceived" failed: ","exception" => SymfonyComponentMessengerExceptionHandlerFailedException^ { …}]
14:30:08 INFO [messenger] Rejected message AppDomainEventChannelManagerChannelManagerEventHasReceived will be sent to the failure transport SymfonyComponentMessengerBridgeDoctrineTransportDoctrineTransport. ["class" => "AppDomainEventChannelManagerChannelManagerEventHasReceived","transport" => "SymfonyComponentMessengerBridgeDoctrineTransportDoctrineTransport"]
The transports sqs_channel_manager
is configured like this:
framework:
messenger:
failure_transport: failed
transports:
failed: 'doctrine://default?table_name=failed_messages'
sqs_channel_manager:
dsn: 'https://sqs.eu-north-1.amazonaws.com/XXXXXX/sqs_channel_manager_test'
serializer: AppInfrastructureMessengerChannelManagerSerializer
options:
access_key: '%env(AWS_ACCESS_KEY_ID)%'
secret_key: '%env(AWS_SECRET_ACCESS_KEY)%'
region: '%env(AWS_REGION)%'
queue_name: '%env(CHANNEL_MANAGER_QUEUE_NAME)%'
And is an SQS queue The queue has a redrive policy:
aws sqs get-queue-attributes --queue-url https://sqs.eu-north-1.amazonaws.com/XXXXXX/sqs_channel_manager_test --attribute-names RedrivePolicy
{
"Attributes": {
"RedrivePolicy": "{"deadLetterTargetArn":"arn:aws:sqs:eu-north-1:XXXXX:sqs_channel_manager_test_dlq","maxReceiveCount":1}"
}
}
But upon failure message is not placed upon Deal letter queue:
aws sqs get-queue-attributes --queue-url https://sqs.eu-north-1.amazonaws.com/XXXXXXXX/sqs_channel_manager_test_dlq --attribute-names ApproximateNumberOfMessages
{
"Attributes": {
"ApproximateNumberOfMessages": "0"
}
}
Does failure_transport
prevents the message to be placed upon DLQ? Setting failure_transport
has no effect:
sqs_channel_manager:
failure_transport: null
dsn: '%env(SQS_CHANNEL_MANAGER_TRANSPORT_DSN)%'
serializer: AppInfrastructureMessengerChannelManagerSerializer
options:
access_key: '%env(AWS_ACCESS_KEY_ID)%'
secret_key: '%env(AWS_SECRET_ACCESS_KEY)%'
region: '%env(AWS_REGION)%'
queue_name: '%env(CHANNEL_MANAGER_QUEUE_NAME)%'
Also I am using custom serializer:
<?php
declare(strict_types=1);
namespace AppInfrastructureMessenger;
use AppDomainEventChannelManagerChannelManagerEventHasReceived;
use SymfonyComponentMessengerEnvelope;
use SymfonyComponentMessengerStampRedeliveryStamp;
use SymfonyComponentMessengerStampTransportMessageIdStamp;
use SymfonyComponentMessengerTransportSerializationSerializerInterface;
class ChannelManagerSerializer implements SerializerInterface
{
public function decode(array $encodedEnvelope): Envelope
{
$data = json_decode($encodedEnvelope['body'], true);
if (json_last_error() !== JSON_ERROR_NONE) {
throw new InvalidArgumentException(
'Invalid JSON received from SQS: ' . json_last_error_msg()
);
}
$envelope = new Envelope(new ChannelManagerEventHasReceived($data));
if (isset($encodedEnvelope['headers']['stamps'])) {
$stamps = unserialize(base64_decode($encodedEnvelope['headers']['stamps']));
foreach ($stamps as $stamp) {
$envelope = $envelope->with($stamp);
}
}
return $envelope;
}
public function encode(Envelope $envelope): array
{
$event = $envelope->getMessage();
$stampsToSerialize = [];
foreach ($envelope->all() as $stampArray) {
foreach ($stampArray as $stamp) {
if ($stamp instanceof RedeliveryStamp || $stamp instanceof TransportMessageIdStamp) {
$stampsToSerialize[] = $stamp;
}
}
}
return [
'body' => json_encode($event->channelManagerData, JSON_THROW_ON_ERROR),
'headers' => [
'stamps' => base64_encode(serialize($stampsToSerialize)),
],
];
}
}
Does it affect a message entering DLQ? I am using Symfony 7.2, it this a known issue?