Skip to content

Commit

Permalink
fix(topicdata): fix null key and value (#1261)
Browse files Browse the repository at this point in the history
Co-authored-by: alozano3 <[email protected]>
  • Loading branch information
2 people authored and tchiotludo committed Apr 4, 2023
1 parent 1c11ac1 commit 2ab8506
Showing 1 changed file with 3 additions and 2 deletions.
5 changes: 3 additions & 2 deletions src/main/java/org/akhq/repositories/RecordRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import jakarta.inject.Inject;
Expand Down Expand Up @@ -606,7 +607,7 @@ public RecordMetadata produce(
SchemaSerializer keySerializer = serializerFactory.createSerializer(clusterId, keySchemaId.get());
keyAsBytes = keySerializer.serialize(key.get());
} else {
keyAsBytes = key.get().getBytes();
keyAsBytes = key.filter(Predicate.not(String::isEmpty)).map(String::getBytes).orElse(null);
}
} else {
try {
Expand All @@ -622,7 +623,7 @@ public RecordMetadata produce(
SchemaSerializer valueSerializer = serializerFactory.createSerializer(clusterId, valueSchemaId.get());
valueAsBytes = valueSerializer.serialize(value.get());
} else {
valueAsBytes = value.map(String::getBytes).orElse(null);
valueAsBytes = value.filter(Predicate.not(String::isEmpty)).map(String::getBytes).orElse(null);
}

return produce(clusterId, topic, valueAsBytes, headers, keyAsBytes, partition, timestamp);
Expand Down

0 comments on commit 2ab8506

Please sign in to comment.