Problem: I have an object attribute .status
that is updated with a Kafka topic, then I send it through websocket. My problem is that each time I ask from the Client side (javascript), then the Server (websockets + asyncio in Python) will start a Kafka consumer from the beginning.
Question: Is it possible to have the Kafka for loop (for msg in consumer:
) updating my custom_obj
object and send its .status
value only when asked for it?
What I’ve tried
This is what I have so far on the server side:
import asyncio
import websockets
from kafka import KafkaConsumer
import Custom_obj
async def test(websocket):
consumer = KafkaConsumer(
'kafka-topic',
bootstrap_servers=['kafka.server.com:1234'],
auto_offset_reset='earliest', #Must start from the beginning to build the object correctly
enable_auto_commit=True,
)
custom_obj = Custom_obj()
for msg in consumer:
msg_dec = msg.value.decode()
custom_obj.update(msg_dec)
await websocket.send(custom_obj.status)
async def main():
async with websockets.serve(test, "localhost", 1234):
await asyncio.Future() # run forever
if __name__ == "__main__":
asyncio.run(main())
Client side (javascript in Vue component) code:
created() {
const ws = new WebSocket('ws://localhost:1234');
ws.onopen = function(e) {
ws.send('Got here!')
this.connectionStatus = 'Connected.'
}
ws.onerror = function(e) {
ws.close()
}
ws.onclose = function(e) {
this.connectionStatus = 'Disconnected.'
}
ws.onmessage = (e) => {
console.log(1)
}