Skip to content

Commit

Permalink
feat(topicdata): kafka header of type int, long and short (#1352)
Browse files Browse the repository at this point in the history
close #1342
  • Loading branch information
gschmutz authored and tchiotludo committed Apr 4, 2023
1 parent 0be6762 commit e372458
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 2 deletions.
4 changes: 2 additions & 2 deletions src/main/java/org/akhq/models/Record.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.json.JsonSchema;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import io.micronaut.context.annotation.Value;
import kafka.coordinator.group.GroupMetadataManager;
import kafka.coordinator.transaction.TransactionLog;
import kafka.coordinator.transaction.TxnKey;
import lombok.*;
import org.akhq.configs.SchemaRegistryType;
import org.akhq.utils.AvroToJsonDeserializer;
import org.akhq.utils.AvroToJsonSerializer;
import org.akhq.utils.ContentUtils;
import org.akhq.utils.ProtobufToJsonDeserializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand Down Expand Up @@ -115,7 +115,7 @@ public Record(SchemaRegistryClient client, ConsumerRecord<byte[], byte[]> record
this.bytesValue = bytesValue;
this.valueSchemaId = getAvroSchemaId(this.bytesValue);
for (Header header: record.headers()) {
String headerValue = header.value() != null ? new String(header.value()) : null;
String headerValue = String.valueOf(ContentUtils.convertToObject(header.value()));
this.headers.add(new KeyValue<>(header.key(), headerValue));
}

Expand Down
93 changes: 93 additions & 0 deletions src/main/java/org/akhq/utils/ContentUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package org.akhq.utils;

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.regex.Pattern;

public class ContentUtils {

/**
* Detects if bytes contain a UTF-8 string or something else
* Source: https://stackoverflow.com/questions/1193200/how-can-i-check-whether-a-byte-array-contains-a-unicode-string-in-java
* @param value the bytes to test for a UTF-8 encoded {@code java.lang.String} value
* @return true, if the byte[] contains a UTF-8 encode {@code java.lang.String}, false if it hold something else (e.g. a {@code int)
* @throws UnsupportedEncodingException
*/
private static boolean isValidUTF8(byte[] value) throws UnsupportedEncodingException
{
Pattern p = Pattern.compile("\\A(\n" +
" [\\x09\\x0A\\x0D\\x20-\\x7E] # ASCII\\n" +
"| [\\xC2-\\xDF][\\x80-\\xBF] # non-overlong 2-byte\n" +
"| \\xE0[\\xA0-\\xBF][\\x80-\\xBF] # excluding overlongs\n" +
"| [\\xE1-\\xEC\\xEE\\xEF][\\x80-\\xBF]{2} # straight 3-byte\n" +
"| \\xED[\\x80-\\x9F][\\x80-\\xBF] # excluding surrogates\n" +
"| \\xF0[\\x90-\\xBF][\\x80-\\xBF]{2} # planes 1-3\n" +
"| [\\xF1-\\xF3][\\x80-\\xBF]{3} # planes 4-15\n" +
"| \\xF4[\\x80-\\x8F][\\x80-\\xBF]{2} # plane 16\n" +
")*\\z", Pattern.COMMENTS);

String phonyString = new String(value, "ISO-8859-1");
return p.matcher(phonyString).matches();
}

/**
* Converts bytes to long.
*
* @param value the bytes to convert in to a long
* @return the long build from the given bytes
*/
private static Long asLong(byte[] value) {
return value != null ? ByteBuffer.wrap(value).getLong() : null;
}

/**
* Converts the given bytes to {@code int}.
*
* @param value the bytes to convert into a {@code int}
* @return the {@code int} build from the given bytes
*/
private static Integer asInt(byte[] value) {
return value != null ? ByteBuffer.wrap(value).getInt() : null;
}

/**
* Converts the given bytes to {@code short}.
*
* @param value the bytes to convert into a {@code short}
* @return the {@code short} build from the given bytes
*/
private static Short asShort(byte[] value) {
return value != null ? ByteBuffer.wrap(value).getShort() : null;
}

/**
* Converts the given bytes either into a {@code java.lang.string}, {@code int}, {@code long} or {@code short} depending on the content it contains.
* @param value the bytes to convert
* @return the value as an {@code java.lang.string}, {@code int}, {@code long} or {@code short}
*/
public static Object convertToObject(byte[] value) {
Object valueAsObject = null;

if (value != null) {
try {
if (ContentUtils.isValidUTF8(value)) {
valueAsObject = new String(value);
} else {
try {
valueAsObject = ContentUtils.asLong(value);
} catch (Exception e) {
try {
valueAsObject = ContentUtils.asInt(value);
} catch (Exception ex) {
valueAsObject = ContentUtils.asShort(value);
}
}
}
} catch(UnsupportedEncodingException ex) {
valueAsObject = "[encoding error]";
}
}
return valueAsObject;
}

}
73 changes: 73 additions & 0 deletions src/test/java/org/akhq/utils/ContentUtilsTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package org.akhq.utils;

import org.akhq.models.Record;
import org.apache.kafka.common.header.Header;
import org.junit.jupiter.api.Test;

import javax.swing.text.AbstractDocument;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class ContentUtilsTest {

private static byte[] toBytes(Short value) {
ByteBuffer buffer = ByteBuffer.allocate(Short.BYTES);
buffer.putShort(value);
return buffer.array();
}

private static byte[] toBytes(Integer value) {
ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES);
buffer.putInt(value);
return buffer.array();
}

private static byte[] toBytes(Long value) {
ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
buffer.putLong(value);
return buffer.array();
}

private static byte[] toBytes(Float value) {
ByteBuffer buffer = ByteBuffer.allocate(Float.BYTES);
buffer.putFloat(value);
return buffer.array();
}

private static byte[] toBytes(Double value) {
ByteBuffer buffer = ByteBuffer.allocate(Double.BYTES);
buffer.putDouble(value);
return buffer.array();
}

@Test
void testHeaderValueStringUTF8() {
String testValue = "Test";

assertEquals(testValue, ContentUtils.convertToObject(testValue.getBytes(StandardCharsets.UTF_8)));
}

@Test
void testHeaderValueInteger() {
int testValue = 1;

assertEquals(testValue, ContentUtils.convertToObject(toBytes(testValue)));
}

@Test
void testHeaderValueLong() {
long testValue = 111l;

assertEquals(testValue, ContentUtils.convertToObject(toBytes(testValue)));
}

@Test
void testHeaderValueShort() {
short testValue = 10;

assertEquals(testValue, ContentUtils.convertToObject(toBytes(testValue)));
}

}

0 comments on commit e372458

Please sign in to comment.