We have written kafka consumer using in nodejs using Kafkajs
library. And using @kafkajs/confluent-schema-registry
to deserialize the message.
After initializing schema registry, If I call registry.decode
method to decode I facing "ConfluentSchemaRegistryValidationError: invalid payloadn at JsonSchema.validatePayload
error.
Also If I console registry
after initialisation, Its taking older schema ID.
SchemaRegistry {
cacheMissRequests: {},
api: {
_manifest: Manifest {
host: 'abc', //dummy data**your text**
allowResourceHostOverride: false,
parameterEncoder: [Function: encodeURIComponent],
bodyAttr: undefined,
headersAttr: undefined,
authAttr: undefined,
timeoutAttr: undefined,
hostAttr: undefined,
clientId: 'Confluent_Schema_Registry',
gatewayConfigs: [Object],
resources: [Object],
context: {},
middleware: [Array]
},
Schema: { find: [Function: resourceMethod] },
Subject: {
all: [Function: resourceMethod],
latestVersion: [Function: resourceMethod],
version: [Function: resourceMethod],
registered: [Function: resourceMethod],
config: [Function: resourceMethod],
updateConfig: [Function: resourceMethod],
register: [Function: resourceMethod],
compatible: [Function: resourceMethod]
}
},
cache: Cache {
getLatestRegistryId: [Function (anonymous)],
setLatestRegistryId: [Function (anonymous)],
getSchema: [Function (anonymous)],
setSchema: [Function (anonymous)],
clear: [Function (anonymous)],
registryIdBySubject: {},
**schemasByRegistryId: { '100248': [Object] }**
},
options: { JSON: { ajvInstance: [Ajv] } }
}
How can make registry to take latest schema and decode message using latest schema.
Also is there a way to decode message using local schema?
Also is there a way to decode message using local schema?