Well, to give you some context. We have a system that captures telemetry data in real time. The broker we hire sends telemetry data in real time from the device they place inside our cars. This operation has been running for two years, but we’ve been trying to troubleshoot and improve it for two years.
It turns out that after fixing numerous flaws, we started comparing what leaves their broker with what arrives at our database. We noticed that some packets simply don’t arrive, either in the data lake (raw message) or in the processed database. So we added more logs and started investigating. I noticed recurring Queue Error: Broken Pipe Or Closed Connection errors. But at the same time, when we see this error and go to the database, packets still arrive, so the consumer doesn’t crash and stop acquiring packets, losing that one. We set no_ack to true because we use the consumer in the Laravel schedule, running with cron 8x daily. And setting no_ack to false would trigger a race condition for the packet that ended up giving an error.
I’d like to know what’s the best context for the consumer? Should it run in the supervisor, with auto-restart policies and running 100% in the background?
$schedule->command('pad:cron')->weekdays()->daily();
$schedule->command('x:consumir')->cron('00 04 * * *');
$schedule->command('x:consumir')->cron('00 07 * * *');
$schedule->command('x:consumir')->cron('00 10 * * *');
$schedule->command('x:consumir')->cron('01 12 * * *');
$schedule->command('x:consumir')->cron('0 15 * * *');
$schedule->command('x:consumir')->cron('0 17 * * *');
$schedule->command('x:consumir')->cron('00 20 * * *');
$schedule->command('x:consumir')->cron('00 00 * * *');
if run command on server – px aux | grep “x:consumir”
1195277 0.0 0.0 7124 3200 ? S 00:00 0:00 sh -c '/usr/bin/php' 'artisan' x:consumir > '/dev/null' 2>&1
1195278 0.5 0.4 277348 66736 ? S 00:00 3:56 /usr/bin/php artisan x:consumir
1199299 0.0 0.0 7124 3328 ? S 04:00 0:00 sh -c '/usr/bin/php' 'artisan' x:consumir > '/dev/null' 2>&1
1199300 0.7 0.4 277352 67100 ? S 04:00 3:38 /usr/bin/php artisan x:consumir
1203839 0.0 0.0 7124 3328 ? S 10:00 0:00 sh -c '/usr/bin/php' 'artisan' x:consumir > '/dev/null' 2>&1
1203840 1.2 0.4 277360 66964 ? S 10:00 1:45 /usr/bin/php artisan x:consumir
1310719 0.0 0.0 7124 3328 ? S 12:01 0:00 sh -c '/usr/bin/php' 'artisan' x:consumir > '/dev/null' 2>&1
1310720 0.9 0.4 277360 64436 ? S 12:01 0:11 /usr/bin/php artisan x:consumir
1332376 0.0 0.0 6436 2432 pts/1 S+ 12:21 0:00 grep --color=auto x:consumir
And this is the method to create the consumer:
/**
* Execute the console command.
*
* @return mixed
*/
public function handle(){
try{
$connection = new AMQPStreamConnection('xxx.xxx.xxx', xxxx, env('user_rabbit'), env('password_rabbit'),'/', env('parameter1_rabbit'), 'AMQPLAIN', null, 'en_US', 3.0, 30.0, null, false, 60, 0);
$channel = $connection->channel();
echo " [*] Waiting for messages. To exit press CTRL+Cn";
$callback = function ($msg) {
$retorno = json_decode($msg->body);
//dd($retorno);
$this->processar($retorno);
};
$channel->basic_consume(env('user_rabbit'), '', false, true, false, false, $callback);
//consumer_tag =
//no_local=
//no_ack = this is true
//exclusive =
//nowait =
while ($channel->is_consuming()) {
$channel->wait();
}
$channel->close();
//$connection->close();
return true;
}catch(Throwable $e){
Log::debug('QUEUE ERROR: ' . $e->getMessage());
Log::debug($e->getTrace());
return true;
//dd("parar2");
}
}
What do you say? Where else can I look? I must say the accuracy of the data is about 80%. But we need to get to at least 95%.