i have a problem with this cronjob function its related to a chatbot The problem is when user start broadcast campaigns other users have to wait until next one can start
public function subscriber_broadcaster($api_key="") // braodcast_message
{
// $this->api_key_check($api_key);
$broadcaster_number_of_message_to_be_sent_in_try=$this->config->item("broadcaster_number_of_message_to_be_sent_in_try");
if($broadcaster_number_of_message_to_be_sent_in_try==0) $broadcaster_number_of_message_to_be_sent_in_try="";
$broadcaster_update_report_after_time=$this->config->item("broadcaster_update_report_after_time");
if($broadcaster_update_report_after_time=="" || $broadcaster_update_report_after_time==0) $broadcaster_update_report_after_time=10;
$number_of_campaign_to_be_processed = 1; // max number of campaign that can be processed by this cron job
// $number_of_message_tob_be_sent = 50000; // max number of message that can be sent in an hour
$subscriber_broadcaster_hold_after_number_of_errors=$this->config->item("subscriber_broadcaster_hold_after_number_of_errors");
if($subscriber_broadcaster_hold_after_number_of_errors=="" || $subscriber_broadcaster_hold_after_number_of_errors==0)
$subscriber_broadcaster_hold_after_number_of_errors=30; // default 10
/****** Get all campaign from database where status=0 means pending ******/
$where_str = " (posting_status='0' OR is_try_again='1') AND posting_status!='3'";
$this->db->where($where_str);
$join = array('users'=>'messenger_bot_broadcast_serial.user_id=users.id,left');
$campaign_info= $this->basic->get_data("messenger_bot_broadcast_serial",$where='',$select=array("messenger_bot_broadcast_serial.*","users.deleted as user_deleted","users.status as user_status","users.user_type as user_type"),$join,$limit=50, $start=0, $order_by='schedule_time ASC');
$page_ids_names=array();
$access_token_database_database=array();
$facebook_rx_fb_user_info_id_database=array();
$campaign_id_array=array(); // all selected campaign id array
$campaign_info_fildered = array(); // valid for process, campign info array
$valid_campaign_count = 1;
foreach($campaign_info as $info)
{
if($this->is_demo=='1' && $info['user_type']=="Admin")
{
$this->db->where("id",$info['id']);
$this->db->update("messenger_bot_broadcast_serial",array("posting_status"=>"1","is_try_again"=>"0"));
continue;
}
if($info['user_deleted'] == '1' || $info['user_status']=="0")
{
$this->db->where("id",$info['id']);
$this->db->update("messenger_bot_broadcast_serial",array("posting_status"=>"1","is_try_again"=>"0"));
continue;
}
$campaign_id= $info['id'];
$time_zone= $info['timezone'];
$schedule_time= $info['schedule_time'];
$total_thread = $info["total_thread"];
$page_id =$info["page_id"]; // auto ids
$fb_page_id =$info["fb_page_id"];
$user_id = $info["user_id"];
if($time_zone) date_default_timezone_set($time_zone);
$now_time = date("Y-m-d H:i:s");
if((strtotime($now_time) < strtotime($schedule_time)) && $time_zone!="") continue;
if($valid_campaign_count > $number_of_campaign_to_be_processed) break;
// get access token and fb user id
$token_info = $this->basic->get_data('facebook_rx_fb_page_info',array("where"=>array('id'=>$page_id,'user_id'=>$user_id)));
foreach ($token_info as $key => $value)
{
$access_token_database_database[$campaign_id][$value["id"]] = $value['page_access_token'];
$facebook_rx_fb_user_info_id = $value["facebook_rx_fb_user_info_id"];
$facebook_rx_fb_user_info_id_database[$campaign_id] = $facebook_rx_fb_user_info_id;
$page_ids_names[$value["id"]] = $value["page_name"];
}
// valid campaign info and campig ids
$campaign_info_fildered[] = $info;
$campaign_id_array[] = $info['id'];
$valid_campaign_count++;
}
if(count($campaign_id_array)==0) exit();
$this->db->where_in("id",$campaign_id_array);
$this->db->update("messenger_bot_broadcast_serial",array("posting_status"=>"1","is_try_again"=>"0"));
// get config id
$getdata= $this->basic->get_data("facebook_rx_fb_user_info",array("where_in"=>array("id"=>$facebook_rx_fb_user_info_id_database)),array("id","facebook_rx_config_id"));
foreach ($getdata as $key => $value)
{
$facebook_rx_config_id_database[$value["id"]] = $value["facebook_rx_config_id"];
}
$this->load->library("fb_rx_login");
// send message
foreach($campaign_info_fildered as $info)
{
$campaign_id= $info['id'];
$user_id = $info["user_id"];
$catch_error_count=$info["last_try_error_count"];
$successfully_sent=$info["successfully_sent"];
$successfully_delivered=$info["successfully_delivered"];
$fb_rx_fb_user_info_id = $facebook_rx_fb_user_info_id_database[$campaign_id]; // find gb user id for this campaign
$this->fb_rx_login->app_initialize($facebook_rx_config_id_database[$fb_rx_fb_user_info_id]);
$i=0;
$campaign_lead=$this->basic->get_data("messenger_bot_broadcast_serial_send",array("where"=>array("campaign_id"=>$campaign_id,"processed"=>"0")),'','',$broadcaster_number_of_message_to_be_sent_in_try);
foreach($campaign_lead as $key => $value)
{
if($catch_error_count>$subscriber_broadcaster_hold_after_number_of_errors) // if 30 catch block error then stop sending, mark as complete
{
$this->basic->update_data("messenger_bot_broadcast_serial",array("id"=>$campaign_id),array("posting_status"=>'4','successfully_sent'=>$successfully_sent,'completed_at'=>date("Y-m-d H:i:s"),"error_message"=>$error_msg,"is_try_again"=>"0","last_try_error_count"=>$catch_error_count));
break;
}
$campaign_message_send=$info["message"];
$page_id_send = $value["page_id"];
$send_table_id = $value['id'];
$subscribe_id = $value['subscribe_id'];
$subscribeauto_id = $value['subscriber_auto_id'];
$client_first_name = $value['subscriber_name'];
$client_last_name = $value['subscriber_lastname'];
$client_otn_token=$value['otn_token']; // if OTN Campaign
$error_msg="";
$message_error_code = "";
$message_sent_id = "";
if(!isset($access_token_database_database[$campaign_id][$page_id_send])) continue;
$page_access_token_send = $access_token_database_database[$campaign_id][$page_id_send]; // get access toke from our access token database
// generating message
$campaign_message_send = str_replace('{{first_name}}',$client_first_name,$campaign_message_send);
$campaign_message_send = str_replace('{{last_name}}',$client_last_name,$campaign_message_send);
$replace_search=array('PUT_SUBSCRIBER_ID','#SUBSCRIBER_ID_REPLACE#');
$campaign_message_send=str_replace($replace_search, $subscribe_id, $campaign_message_send);
if($client_otn_token!="")
$campaign_message_send = str_replace('PUT_OTN_TOKEN',$client_otn_token,$campaign_message_send);
// print_r($campaign_message_send); continue;
$message_sent_id="";
$now_sent_time=date("Y-m-d H:i:s");
$deliveryTime='';
$isDelivered='0';
$successfully_sent++;
try
{
// $campaign_message_send = spintax_process($campaign_message_send);
$response = $this->fb_rx_login->send_non_promotional_message_subscription($campaign_message_send,$page_access_token_send);
if(isset($response['message_id']))
{
$message_sent_id = $response['message_id'];
$successfully_delivered++;
$deliveryTime=date("Y-m-d H:i:s");
$isDelivered="1";
}
else
{
if(isset($response["error"]["message"])) $message_sent_id = $response["error"]["message"];
if(isset($response["error"]["code"])) $message_error_code = $response["error"]["code"];
if($message_error_code=="551") // unvalilable user
{
$this->basic->update_data("messenger_bot_subscriber",array("id"=>$subscribeauto_id),array("unavailable"=>"1","last_error_message"=>$message_sent_id));
}
else
{
$error_msg = $message_sent_id;
$catch_error_count++;
}
}
}
catch(Exception $e)
{
$error_msg = $e->getMessage();
$catch_error_count++;
}
// generating new report with send message info
$i++;
// after 10 send update report in database
if($i%$broadcaster_update_report_after_time==0)
{
$this->basic->update_data("messenger_bot_broadcast_serial",array("id"=>$campaign_id),array('successfully_sent'=>$successfully_sent,"successfully_delivered"=>$successfully_delivered,"error_message"=>$error_msg,"last_try_error_count"=>$catch_error_count));
}
// updating a lead, marked as processed
$this->basic->update_data("messenger_bot_broadcast_serial_send",array("id"=>$send_table_id),array('processed'=>'1',"sent_time"=>$now_sent_time,"delivered"=>$isDelivered,"delivery_time"=>$deliveryTime,"message_sent_id"=>$message_sent_id));
}
// one campaign completed, now update database finally
if((count($campaign_lead)<$broadcaster_number_of_message_to_be_sent_in_try) || $broadcaster_number_of_message_to_be_sent_in_try=="" || $catch_error_count>$subscriber_broadcaster_hold_after_number_of_errors)
{
$new_posting_status = ($catch_error_count>$subscriber_broadcaster_hold_after_number_of_errors) ? '4' : '2';
$complete_update=array("posting_status"=>$new_posting_status,'successfully_sent'=>$successfully_sent,"successfully_delivered"=>$successfully_delivered,'completed_at'=>date("Y-m-d H:i:s"),"is_try_again"=>"0","last_try_error_count"=>$catch_error_count);
if(isset($error_msg))
$complete_update["error_message"]=$error_msg;
$this->basic->update_data("messenger_bot_broadcast_serial",array("id"=>$campaign_id),$complete_update);
}
else // suppose broadcaster_update_report_after_time=20 but there are 19 message to sent, need to update report in that case
{
$this->basic->update_data("messenger_bot_broadcast_serial",array("id"=>$campaign_id),array('successfully_sent'=>$successfully_sent,"successfully_delivered"=>$successfully_delivered,"is_try_again"=>"1"));
}
$this->basic->update_data('otn_postback',['id'=>$info['otn_postback_id']],['rcn_last_sent_time'=>date("Y-m-d H:i:s")]);
}
}
i want to run all campaigns once without waiting to running campaign to finish.
i tried this modification but not working for me to running it into multitask
public function subscriber_broadcaster($api_key = "") // broadcast_message
{
// $this->api_key_check($api_key);
$broadcaster_number_of_message_to_be_sent_in_try = $this->config->item("broadcaster_number_of_message_to_be_sent_in_try");
if ($broadcaster_number_of_message_to_be_sent_in_try == 0) $broadcaster_number_of_message_to_be_sent_in_try = "";
$broadcaster_update_report_after_time = $this->config->item("broadcaster_update_report_after_time");
if ($broadcaster_update_report_after_time == "" || $broadcaster_update_report_after_time == 0) $broadcaster_update_report_after_time = 10;
$number_of_campaign_to_be_processed = 20; // max number of campaign that can be processed by this cron job
$subscriber_broadcaster_hold_after_number_of_errors = $this->config->item("subscriber_broadcaster_hold_after_number_of_errors");
if ($subscriber_broadcaster_hold_after_number_of_errors == "" || $subscriber_broadcaster_hold_after_number_of_errors == 0)
$subscriber_broadcaster_hold_after_number_of_errors = 30; // default 10
/****** Get all campaign from database where status=0 means pending ******/
$where_str = " (posting_status='0' OR is_try_again='1') AND posting_status!='3'";
$this->db->where($where_str);
$join = array('users' => 'messenger_bot_broadcast_serial.user_id = users.id, left');
$campaign_info = $this->basic->get_data("messenger_bot_broadcast_serial", '', array(
"messenger_bot_broadcast_serial.*",
"users.deleted as user_deleted",
"users.status as user_status",
"users.user_type as user_type"
), $join, $limit = $number_of_campaign_to_be_processed, $start = 0, $order_by = 'schedule_time ASC');
if (count($campaign_info) == 0) exit();
// Initialize arrays for handling multi-cURL
$multiHandle = curl_multi_init();
$curlHandles = [];
$campaignHandlesMap = []; // Map to track which curl handle corresponds to which campaign
foreach ($campaign_info as $info) {
// Skip invalid campaigns
if ($info['user_deleted'] == '1' || $info['user_status'] == "0") {
$this->db->where("id", $info['id']);
$this->db->update("messenger_bot_broadcast_serial", array("posting_status" => "4", "is_try_again" => "0"));
continue;
}
// Prepare data for each campaign
$campaign_id = $info['id'];
$time_zone = $info['timezone'];
$schedule_time = $info['schedule_time'];
$user_id = $info['user_id'];
if ($time_zone) date_default_timezone_set($time_zone);
$now_time = date("Y-m-d H:i:s");
// Skip campaigns scheduled for the future
if ((strtotime($now_time) < strtotime($schedule_time)) && $time_zone != "") continue;
// Get access tokens and prepare messages
$token_info = $this->basic->get_data('facebook_rx_fb_page_info', array("where" => array('id' => $info["page_id"], 'user_id' => $user_id)));
foreach ($token_info as $key => $value) {
$access_token_database[$campaign_id][$value["id"]] = $value['page_access_token'];
}
// Fetch leads to process for the current campaign
$campaign_lead = $this->basic->get_data("messenger_bot_broadcast_serial_send", array("where" => array("campaign_id" => $campaign_id, "processed" => "0")), '', '', $broadcaster_number_of_message_to_be_sent_in_try);
foreach ($campaign_lead as $key => $value) {
$campaign_message_send = $info["message"];
$page_id_send = $value["page_id"];
$client_first_name = $value['subscriber_name'];
$client_last_name = $value['subscriber_lastname'];
$campaign_message_send = str_replace('{{first_name}}', $client_first_name, $campaign_message_send);
$campaign_message_send = str_replace('{{last_name}}', $client_last_name, $campaign_message_send);
if (!isset($access_token_database[$campaign_id][$page_id_send])) continue;
$page_access_token_send = $access_token_database[$campaign_id][$page_id_send];
// Prepare CURL request for each lead
$url = "https://graph.facebook.com/v4.0/me/messages?access_token={$page_access_token_send}";
$curl = curl_init();
curl_setopt($curl, CURLOPT_URL, $url);
curl_setopt($curl, CURLOPT_POST, 1);
curl_setopt($curl, CURLOPT_RETURNTRANSFER, true);
curl_setopt($curl, CURLOPT_POSTFIELDS, json_encode($campaign_message_send));
curl_setopt($curl, CURLOPT_HTTPHEADER, array("Content-Type: application/json"));
// Add handle to multi-cURL
curl_multi_add_handle($multiHandle, $curl);
// Track the handles and campaign info
$curlHandles[] = $curl;
$campaignHandlesMap[(string)$curl] = [
'campaign_id' => $campaign_id,
'send_table_id' => $value['id']
];
}
// Set the campaign to "in progress"
$this->db->where("id", $campaign_id);
$this->db->update("messenger_bot_broadcast_serial", array("posting_status" => "1", "is_try_again" => "0"));
}
// Execute multi-cURL to process all requests concurrently
$running = null;
do {
curl_multi_exec($multiHandle, $running);
curl_multi_select($multiHandle);
} while ($running > 0);
// Process each result
foreach ($curlHandles as $curl) {
$response_data = curl_multi_getcontent($curl);
$response_data = json_decode($response_data, true);
$campaign_data = $campaignHandlesMap[(string)$curl];
$campaign_id = $campaign_data['campaign_id'];
$send_table_id = $campaign_data['send_table_id'];
// Handle response for each request
if (isset($response_data['message_id'])) {
// Message sent successfully
$this->basic->update_data("messenger_bot_broadcast_serial_send", array("id" => $send_table_id), array('processed' => '1'));
} else {
// Handle error
$error_msg = isset($response_data["error"]["message"]) ? $response_data["error"]["message"] : "Unknown error";
$this->basic->update_data("messenger_bot_broadcast_serial_send", array("id" => $send_table_id), array('processed' => '0', "error_message" => $error_msg));
}
// Remove handle
curl_multi_remove_handle($multiHandle, $curl);
curl_close($curl);
}
// Close the multi-handle
curl_multi_close($multiHandle);
// Finalize each campaign's status based on success/failure
foreach ($campaign_info as $info) {
$campaign_id = $info['id'];
$catch_error_count = $info["last_try_error_count"];
$successfully_sent = $info["successfully_sent"];
$successfully_delivered = $info["successfully_delivered"];
$error_msg = null;
// Check if all leads are processed for this campaign, if so, mark it as completed
$remaining_leads = $this->basic->get_data("messenger_bot_broadcast_serial_send", array("where" => array("campaign_id" => $campaign_id, "processed" => "0")));
if (count($remaining_leads) == 0 || $catch_error_count > $subscriber_broadcaster_hold_after_number_of_errors) {
$new_posting_status = ($catch_error_count > $subscriber_broadcaster_hold_after_number_of_errors) ? '4' : '2';
$complete_update = array(
"posting_status" => $new_posting_status,
'successfully_sent' => $successfully_sent,
"successfully_delivered" => $successfully_delivered,
'completed_at' => date("Y-m-d H:i:s"),
"is_try_again" => "0",
"last_try_error_count" => $catch_error_count
);
if (isset($error_msg)) {
$complete_update["error_message"] = $error_msg;
}
$this->basic->update_data("messenger_bot_broadcast_serial", array("id" => $campaign_id), $complete_update);
} else {
$this->basic->update_data("messenger_bot_broadcast_serial", array("id" => $campaign_id), array(
'successfully_sent' => $successfully_sent,
"successfully_delivered" => $successfully_delivered,
"is_try_again" => "1"
));
}
$this->basic->update_data('otn_postback', ['id' => $info['otn_postback_id']], ['rcn_last_sent_time' => date("Y-m-d H:i:s")]);
}
}