How to invoke durable function activity from within a callback function?

I’m trying to consume messages from kafka topic using Azure durable function and as the new messages arrive I want to invoke the activity function to process the messages. The problem here is that the callback function isn’t able to access the context and other local variables which are needed to invoke the activity function.

Please suggest if I’m dong it correctly and if there are any better/alternate approaches.

Here is my function code:

/* the orchestrator is invoked by a timer trigger */
/* using kafka javascript SDK (@confluentinc/kafka-javascript) to create a consumer */

const kafkaConsumerOrchestratorName = 'kafka_consumer_orchestrator';
const kafkaConsumerActivityName = 'kafka_consumer_activity';

df.app.orchestration(kafkaConsumerOrchestratorName, function* (context) {
    yield kafkaService.consumer.run({
        eachMessage: async ({ topic, partition, message }) => {
            /* context not accessible here */
            yield context.df.callActivity(kafkaConsumerActivityName, { topic, partition, message });
        }
    });
});

df.app.activity(kafkaConsumerActivityName, {
    handler: async (input, context) => {
        /* process messages */
        console.log(input);
    }
});