Skip to content

Commit

Permalink
fix(topci): create topics with configuration in one call (#1273)
Browse files Browse the repository at this point in the history
close #1272
  • Loading branch information
ebrard authored and tchiotludo committed Apr 4, 2023
1 parent ca12e00 commit bc44f7d
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 9 deletions.
11 changes: 9 additions & 2 deletions src/main/java/org/akhq/modules/AbstractKafkaWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,17 @@ public Map<String, TopicDescription> describeTopics(String clusterId, List<Strin
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

public void createTopics(String clusterId, String name, int partitions, short replicationFactor) throws ExecutionException {
public void createTopics(String clusterId, String name, int partitions, short replicationFactor,
List<org.akhq.models.Config> configs) throws ExecutionException {
Map<String, String> kafkaTopicConfigs = new HashMap<>();

configs.forEach(c-> kafkaTopicConfigs.put(c.getName(), c.getValue()));

NewTopic topic = new NewTopic(name, partitions, replicationFactor).configs(kafkaTopicConfigs);

Logger.call(kafkaModule
.getAdminClient(clusterId)
.createTopics(Collections.singleton(new NewTopic(name, partitions, replicationFactor)))
.createTopics(Collections.singleton(topic))
.all(),
"Create Topics",
Collections.singletonList(name)
Expand Down
4 changes: 1 addition & 3 deletions src/main/java/org/akhq/repositories/TopicRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,7 @@ private boolean isStream(String name) {
}

public void create(String clusterId, String name, int partitions, short replicationFactor, List<org.akhq.models.Config> configs) throws ExecutionException, InterruptedException {
kafkaWrapper.createTopics(clusterId, name, partitions, replicationFactor);
checkIfTopicExists(clusterId, name);
configRepository.updateTopic(clusterId, name, configs);
kafkaWrapper.createTopics(clusterId, name, partitions, replicationFactor, configs);
}

public void delete(String clusterId, String name) throws ExecutionException, InterruptedException {
Expand Down
18 changes: 14 additions & 4 deletions src/test/java/org/akhq/repositories/TopicRepositoryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,20 +130,30 @@ void findByNameWithTopicRegex() throws ExecutionException, InterruptedException

@Test
void create() throws ExecutionException, InterruptedException {
topicRepository.create(KafkaTestCluster.CLUSTER_ID, "create", 8, (short) 1, Collections.singletonList(
topicRepository.create(KafkaTestCluster.CLUSTER_ID, "createEmptyConfig", 8, (short) 1, Collections.emptyList()
);

assertEquals(8, topicRepository.findByName(KafkaTestCluster.CLUSTER_ID, "createEmptyConfig").getPartitions().size());

topicRepository.delete(KafkaTestCluster.CLUSTER_ID, "createEmptyConfig");
}

@Test
void createWithConfig() throws ExecutionException, InterruptedException {
topicRepository.create(KafkaTestCluster.CLUSTER_ID, "createWithConfig", 8, (short) 1, Collections.singletonList(
new Config(TopicConfig.SEGMENT_MS_CONFIG, "1000")
));

Optional<String> option = configRepository.findByTopic(KafkaTestCluster.CLUSTER_ID, "create")
Optional<String> option = configRepository.findByTopic(KafkaTestCluster.CLUSTER_ID, "createWithConfig")
.stream()
.filter(r -> r.getName().equals(TopicConfig.SEGMENT_MS_CONFIG))
.findFirst()
.map(Config::getValue);

assertEquals(8, topicRepository.findByName(KafkaTestCluster.CLUSTER_ID, "create").getPartitions().size());
assertEquals(8, topicRepository.findByName(KafkaTestCluster.CLUSTER_ID, "createWithConfig").getPartitions().size());
assertEquals("1000", option.get());

topicRepository.delete(KafkaTestCluster.CLUSTER_ID, "create");
topicRepository.delete(KafkaTestCluster.CLUSTER_ID, "createWithConfig");
}

@Test
Expand Down

0 comments on commit bc44f7d

Please sign in to comment.