Skip to content

Commit

Permalink
feat(schema): add support for GLUE schema registry (only deserializat…
Browse files Browse the repository at this point in the history
…ion) (#1650)

close #777
---------
Co-authored-by: apatra <[email protected]>
  • Loading branch information
arindampatra33 committed Jan 24, 2024
1 parent 879cb91 commit dfc70c6
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 8 deletions.
4 changes: 4 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ dependencies {
//AWS MSK IAM Auth
implementation group: 'software.amazon.msk', name: 'aws-msk-iam-auth', version: '2.0.0'

// AWS Glue serde
implementation ("software.amazon.glue:schema-registry-serde:1.1.15")


implementation group: 'io.projectreactor', name: 'reactor-core', version: '3.5.11'

implementation 'io.jsonwebtoken:jjwt-impl:0.12.3'
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/org/akhq/configs/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ public static class SchemaRegistry {
String basicAuthUsername;
String basicAuthPassword;
SchemaRegistryType type = SchemaRegistryType.CONFLUENT;

String glueSchemaRegistryName;
String awsRegion;
@MapFormat(transformation = MapFormat.MapTransformation.FLAT)
Map<String, String> properties;
}
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/org/akhq/configs/SchemaRegistryType.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
@Getter
public enum SchemaRegistryType {
CONFLUENT((byte) 0x0),
TIBCO((byte) 0x80);
TIBCO((byte) 0x80),
GLUE((byte) 0x0);

private byte magicByte;

Expand Down
5 changes: 3 additions & 2 deletions src/main/java/org/akhq/controllers/TopicController.java
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public List<Record> produce(
key.map(String::getBytes).orElse(null),
value.map(String::getBytes).orElse(null),
headers,
targetTopic))
targetTopic, null))
.collect(Collectors.toList());
}

Expand Down Expand Up @@ -369,7 +369,8 @@ public Record deleteRecordApi(String cluster, String topicName, Integer partitio
Base64.getDecoder().decode(key),
null,
new ArrayList<>(),
topicRepository.findByName(cluster, topicName)
topicRepository.findByName(cluster, topicName),
null
);
}

Expand Down
12 changes: 10 additions & 2 deletions src/main/java/org/akhq/models/Record.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,10 @@ public class Record {

@JsonIgnore
private Boolean truncated;
private Deserializer awsGlueKafkaDeserializer;

public Record(RecordMetadata record, SchemaRegistryType schemaRegistryType, byte[] bytesKey, byte[] bytesValue, List<KeyValue<String, String>> headers, Topic topic) {

public Record(RecordMetadata record, SchemaRegistryType schemaRegistryType, byte[] bytesKey, byte[] bytesValue, List<KeyValue<String, String>> headers, Topic topic, Deserializer awsGlueKafkaDeserializer) {
this.MAGIC_BYTE = schemaRegistryType.getMagicByte();
this.topic = topic;
this.partition = record.partition();
Expand All @@ -102,11 +104,12 @@ public Record(RecordMetadata record, SchemaRegistryType schemaRegistryType, byte
this.valueSubject = getAvroSchemaSubject(this.valueSchemaId);
this.headers = headers;
this.truncated = false;
this.awsGlueKafkaDeserializer = awsGlueKafkaDeserializer;
}

public Record(SchemaRegistryClient client, ConsumerRecord<byte[], byte[]> record, SchemaRegistryType schemaRegistryType, Deserializer kafkaAvroDeserializer,
Deserializer kafkaJsonDeserializer, Deserializer kafkaProtoDeserializer, AvroToJsonSerializer avroToJsonSerializer,
ProtobufToJsonDeserializer protobufToJsonDeserializer, AvroToJsonDeserializer avroToJsonDeserializer, byte[] bytesValue, Topic topic) {
ProtobufToJsonDeserializer protobufToJsonDeserializer, AvroToJsonDeserializer avroToJsonDeserializer, byte[] bytesValue, Topic topic, Deserializer awsGlueKafkaDeserializer) {
if (schemaRegistryType == SchemaRegistryType.TIBCO) {
this.MAGIC_BYTE = (byte) 0x80;
} else {
Expand Down Expand Up @@ -136,6 +139,7 @@ public Record(SchemaRegistryClient client, ConsumerRecord<byte[], byte[]> record
this.avroToJsonSerializer = avroToJsonSerializer;
this.kafkaJsonDeserializer = kafkaJsonDeserializer;
this.truncated = false;
this.awsGlueKafkaDeserializer = awsGlueKafkaDeserializer;
}

public String getKey() {
Expand Down Expand Up @@ -178,6 +182,10 @@ public void setTruncated(Boolean truncated) {
private String convertToString(byte[] payload, Integer schemaId, boolean isKey) {
if (payload == null) {
return null;
}
else if (this.awsGlueKafkaDeserializer != null) {
return this.awsGlueKafkaDeserializer.deserialize(this.topic.getName(), payload).toString();

} else if (schemaId != null) {
try {

Expand Down
6 changes: 4 additions & 2 deletions src/main/java/org/akhq/repositories/RecordRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,8 @@ private Record newRecord(ConsumerRecord<byte[], byte[]> record, String clusterId
this.customDeserializerRepository.getAvroToJsonDeserializer(clusterId),
avroWireFormatConverter.convertValueToWireFormat(record, client,
this.schemaRegistryRepository.getSchemaRegistryType(clusterId)),
topic
topic,
schemaRegistryType == SchemaRegistryType.GLUE ? schemaRegistryRepository.getAwsGlueKafkaDeserializer(clusterId): null
));
}

Expand All @@ -490,7 +491,8 @@ private Record newRecord(ConsumerRecord<byte[], byte[]> record, BaseOptions opti
this.customDeserializerRepository.getAvroToJsonDeserializer(options.clusterId),
avroWireFormatConverter.convertValueToWireFormat(record, client,
this.schemaRegistryRepository.getSchemaRegistryType(options.clusterId)),
topic
topic,
schemaRegistryType == SchemaRegistryType.GLUE ? schemaRegistryRepository.getAwsGlueKafkaDeserializer(options.getClusterId()): null
));
}

Expand Down
27 changes: 27 additions & 0 deletions src/main/java/org/akhq/repositories/SchemaRegistryRepository.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package org.akhq.repositories;

import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer;
import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
import com.amazonaws.services.schemaregistry.utils.AvroRecordType;
import com.fasterxml.jackson.databind.DeserializationFeature;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.client.rest.RestService;
Expand All @@ -23,6 +26,9 @@

import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.apache.kafka.common.serialization.StringDeserializer;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.*;
Expand All @@ -38,6 +44,8 @@ public class SchemaRegistryRepository extends AbstractRepository {
private final Map<String, Deserializer> kafkaAvroDeserializers = new HashMap<>();
private final Map<String, Deserializer> kafkaJsonDeserializers = new HashMap<>();
private final Map<String, Deserializer> kafkaProtoDeserializers = new HashMap<>();
private final Map<String, Deserializer> awsGlueKafkaDeserializers = new HashMap<>();


public PagedList<Schema> list(String clusterId, Pagination pagination, Optional<String> search, List<String> filters) throws IOException, RestClientException, ExecutionException, InterruptedException {
return PagedList.of(all(clusterId, search, filters), pagination, list -> this.toSchemasLatestVersion(list, clusterId));
Expand Down Expand Up @@ -310,6 +318,25 @@ public SchemaRegistryType getSchemaRegistryType(String clusterId) {
}
return schemaRegistryType;
}
public Deserializer getAwsGlueKafkaDeserializer(String clusterId) {

if (!this.awsGlueKafkaDeserializers.containsKey(clusterId)){
Connection.SchemaRegistry schemaRegistry = kafkaModule.getConnection(clusterId).getSchemaRegistry();
Map<String, Object> params = new HashMap<>();
params.put(AWSSchemaRegistryConstants.REGISTRY_NAME, schemaRegistry.getGlueSchemaRegistryName());
params.put(AWSSchemaRegistryConstants.AWS_REGION,schemaRegistry.getAwsRegion());
params.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());

// Adding secondary deserializer so that messages that aren't serialized using avro,proto or json are deserialized using StringDeserializer
params.put(AWSSchemaRegistryConstants.SECONDARY_DESERIALIZER, StringDeserializer.class.getName());
Map<String, String> otherParams = schemaRegistry.getProperties();
if (otherParams != null) {
params.putAll(otherParams);
}
this.awsGlueKafkaDeserializers.put(clusterId, new GlueSchemaRegistryKafkaDeserializer(DefaultCredentialsProvider.builder().build(), params));
}
return this.awsGlueKafkaDeserializers.get(clusterId);
}

static {
JacksonMapper.INSTANCE.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
Expand Down

0 comments on commit dfc70c6

Please sign in to comment.