I’m creating a kafka client consumer (kafkajs 2.2.4) for a microservice hosted in a kubernetes cluster in GCP. I’ve created a producer in another microservice with the same configuration without problems. My issue is that after the ssl/sasl authentication handshake my consumer attempts to authenticate again and I assume that’s the reason it disconnects itself. After disconnecting the service attempts to connect again several times until it crashes. Logging doesn’t provide a lot of information on how to debug this but it happens right after the second sasl handshake according to the logs. Any information on how to approach this would be awesome. I’m using NestJS for the backend.
Here is my configuration:
const brokers = [`${process.env.KAFKA_B1}:${process.env.KAFKA_PORT}`];
console.log(brokers)
let kafkaConfig: any = {
clientId: 'consumer-client',
brokers: brokers,
connectionTimeout: 300000,
enforceRequestTimeout: false,
logLevel: logLevel.ERROR,
// retry: {
// initialRetryTime: 1000,
// retries: 5,
// },
};
if (process.env.KAFKA_SASL_ENABLED) {
kafkaConfig = {
...kafkaConfig,
ssl: true,
sasl: {
mechanism: 'plain', // scram-sha-256 or scram-sha-512
username: process.env.KAFKA_USERNAME,
password: process.env.KAFKA_PASSWORD,
},
};
}
console.log('START creating kafka instance');
const kafka = new Kafka({
...kafkaConfig,
clientId: `microservice-${process.env.HOSTNAME}`,
});
console.log('FINISHED creating kafka instance');
console.log('START creating kafka consumer');
const consumer = kafka.consumer({
groupId: 'microservice',
sessionTimeout: 90000,
heartbeatInterval: 30000,
retry: {
retries: 5,
initialRetryTime: 1000,
maxRetryTime: 60000,
},
});
const topic: ConsumerSubscribeTopics = {
topics: ['MyTopicStream'],
fromBeginning: false,
};
try {
console.log('START kafka consumer connection');
await this.kafkaConsumer.connect();
await this.kafkaConsumer.subscribe(topic);
console.log('FINISHED kafka consumer connection');
console.log('START kafka consumer run');
await this.kafkaConsumer.run({
eachMessage: async (messagePayload: EachMessagePayload) => {
const { topic, partition, message } = messagePayload;
const prefix = `${topic}[${partition} | ${message.offset}] / ${message.timestamp}`;
console.log(`- ${prefix} ${message.key}#${message.value}`);
},
});
console.log('FINISHED kafka consumer run');
} catch (error) {
console.log('Error: ', error);
}
Logs I get on disconnect:
network request
InstrumentationEvent {
id: 22,
type: 'consumer.network.request',
timestamp: 1727916074589,
payload: {
broker: 'broker-1.kafka-sandbox.us-central1.managedkafka.carbon-sandbox-434719.cloud.goog:9092',
clientId: 'black-box-black-box-88668575b-pvsp5',
correlationId: 12,
size: 52,
createdAt: 1727916074586,
sentAt: 1727916074586,
pendingDuration: 0,
duration: 3,
apiName: 'ListOffsets',
apiKey: 2,
apiVersion: 3
}
}
[NestWinston] Error 10/2/2024, 8:41:14 PM [ClientKafka] [ClientKafka] ERROR [BrokerPool] Closed connection {"timestamp":"2024-10-03T00:41:14.593Z","logger":"kafkajs","retryCount":0,"retryTime":50} - {"stack":[null]}
network request
InstrumentationEvent {
id: 23,
type: 'consumer.network.request',
timestamp: 1727916074609,
payload: {
broker: 'broker-0.kafka-sandbox.us-central1.managedkafka.carbon-sandbox-434719.cloud.goog:9092',
clientId: 'black-box-black-box-88668575b-pvsp5',
correlationId: 0,
size: 30,
createdAt: 1727916074602,
sentAt: 1727916074602,
pendingDuration: 0,
duration: 7,
apiName: 'SaslHandshake',
apiKey: 17,
apiVersion: 1
}
}
network request
InstrumentationEvent {
id: 24,
type: 'consumer.network.request',
timestamp: 1727916074610,
payload: {
broker: 'broker-2.kafka-sandbox.us-central1.managedkafka.carbon-sandbox-434719.cloud.goog:9092',
clientId: 'black-box-black-box-88668575b-pvsp5',
correlationId: 0,
size: 30,
createdAt: 1727916074608,
sentAt: 1727916074608,
pendingDuration: 0,
duration: 2,
apiName: 'SaslHandshake',
apiKey: 17,
apiVersion: 1
}
}
[NestWinston] Error 10/2/2024, 8:41:14 PM [ClientKafka] [ClientKafka] ERROR [BrokerPool] Closed connection {"timestamp":"2024-10-03T00:41:14.650Z","logger":"kafkajs","retryCount":1,"retryTime":75} - {"stack":[null]}