Skip to content

Commit

Permalink
fix(schema): fix subject redirection from message on large registry (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexisSouquiere committed Jan 24, 2024
1 parent dfc70c6 commit 8831744
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 10 deletions.
4 changes: 2 additions & 2 deletions client/src/containers/Topic/Topic/TopicData/TopicData.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -603,9 +603,9 @@ class TopicData extends Root {
}

_redirectToSchema(id) {
const { selectedCluster } = this.state;
const { selectedCluster, selectedTopic } = this.state;

this.getApi(uriSchemaId(selectedCluster, id)).then(response => {
this.getApi(uriSchemaId(selectedCluster, id, selectedTopic)).then(response => {
if (response.data) {
this.props.history.push({
pathname: `/ui/${selectedCluster}/schema/details/${response.data.subject}`,
Expand Down
4 changes: 2 additions & 2 deletions client/src/utils/endpoints.js
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,8 @@ export const uriSchemaRegistry = (clusterId, search, pageNumber) => {
return `${apiUrl}/${clusterId}/schema?&search=${search}&page=${pageNumber}`;
};

export const uriSchemaId = (clusterId, id) => {
return `${apiUrl}/${clusterId}/schema/id/${id}`;
export const uriSchemaId = (clusterId, id, topic) => {
return `${apiUrl}/${clusterId}/schema/id/${id}?topic=${topic}`;
};

export const uriSchemaVersions = (clusterId, subject) => {
Expand Down
16 changes: 10 additions & 6 deletions src/main/java/org/akhq/controllers/SchemaController.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.micronaut.http.HttpResponse;
import io.micronaut.http.annotation.*;
import io.swagger.v3.oas.annotations.Operation;
import jakarta.annotation.Nullable;
import org.akhq.configs.security.Role;
import org.akhq.middlewares.SchemaComparator;
import org.akhq.models.Schema;
Expand Down Expand Up @@ -157,17 +158,20 @@ private Schema registerSchema(String cluster, @Body Schema schema) throws IOExce
}

@Get("api/{cluster}/schema/id/{id}")
@Operation(tags = {"schema registry"}, summary = "Find a schema by id")
public Schema redirectId(
@Operation(tags = {"schema registry"}, summary = "Find a subject by the schema id")
public Schema getSubjectBySchemaIdAndTopic(
HttpRequest<?> request,
String cluster,
Integer id
) throws IOException, RestClientException, ExecutionException, InterruptedException {
Integer id,
@Nullable @QueryValue String topic
) throws IOException, RestClientException {
// TODO Do the check on the subject name too
checkIfClusterAllowed(cluster);

return this.schemaRepository
.getById(cluster, id)
return this.schemaRepository.getSubjectsBySchemaId(cluster, id)
.stream()
.filter(s -> topic == null || s.getSubject().contains(topic))
.findFirst()
.orElse(null);
}

Expand Down
6 changes: 6 additions & 0 deletions src/main/java/org/akhq/models/Schema.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ public class Schema {

private String exception;

public Schema(int schemaId, String subject, int version) {
this.id = schemaId;
this.subject = subject;
this.version = version;
}

public Schema(Schema schema, Schema.Config config) {
this.id = schema.id;
this.subject = schema.subject;
Expand Down
14 changes: 14 additions & 0 deletions src/main/java/org/akhq/repositories/SchemaRegistryRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,20 @@ public boolean exist(String clusterId, String subject) throws IOException, RestC
return found;
}

public List<Schema> getSubjectsBySchemaId(String clusterId, int id) throws IOException, RestClientException {
Optional<RestService> maybeRegistryRestClient = Optional.ofNullable(kafkaModule
.getRegistryRestClient(clusterId));
if(maybeRegistryRestClient.isEmpty()){
return List.of();
}

return maybeRegistryRestClient.get()
.getAllVersionsById(id)
.stream()
.map(v -> new Schema(id, v.getSubject(), v.getVersion()))
.collect(Collectors.toList());
}

public Optional<Schema> getById(String clusterId, Integer id) throws IOException, RestClientException, ExecutionException, InterruptedException {
for (String subject: this.all(clusterId, Optional.empty(), List.of())) {
for (Schema version: this.getAllVersions(clusterId, subject)) {
Expand Down
39 changes: 39 additions & 0 deletions src/test/java/org/akhq/controllers/SchemaControllerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,43 @@ void deleteNotExistApi() {
);
assertTrue(e.getMessage().contains("doesn't exist"));
}

@Test
void findSubjectBySchemaId() {
// Create 3 subjects (including 2 with the same schema)
var subject1Schema1 = new Schema("subjectTopic1-value", SchemaRegistryRepositoryTest.SCHEMA_1_V1,
Schema.Config.CompatibilityLevelConfig.FORWARD);
var subject1Schema1V2 = new Schema("subjectTopic1-value", SchemaRegistryRepositoryTest.SCHEMA_1_V2,
Schema.Config.CompatibilityLevelConfig.FORWARD);
var subject2Schema1 = new Schema("subjectTopic2-value", SchemaRegistryRepositoryTest.SCHEMA_1_V1,
Schema.Config.CompatibilityLevelConfig.FORWARD);

var subject1Schema1Response = this.retrieve(HttpRequest.POST(BASE_URL, subject1Schema1), Schema.class);
var subject1Schema1V2Response = this.retrieve(HttpRequest.POST(BASE_URL + "/subjectTopic1-value", subject1Schema1V2), Schema.class);
var subject2Schema1Response = this.retrieve(HttpRequest.POST(BASE_URL, subject2Schema1), Schema.class);

// Subject v1 and v2 should be different
assertNotEquals(subject1Schema1Response.getId(), subject1Schema1V2Response.getId());
assertNotEquals(subject1Schema1Response.getSchema(), subject1Schema1V2Response.getSchema());

// Subject 1 and 2 should have the same ID, schema but different subject
assertEquals(subject1Schema1Response.getId(), subject2Schema1Response.getId());
assertEquals(subject1Schema1Response.getSchema(), subject2Schema1Response.getSchema());
assertNotEquals(subject1Schema1Response.getSubject(), subject2Schema1Response.getSubject());

// Searching subject by schema ID should give the right subject depending on the topic
var subject1FromSchemaIdAndTopic =
this.retrieve(HttpRequest.GET(BASE_URL + "/id/" + subject1Schema1Response.getId() + "?topic=subjectTopic1"), Schema.class);
assertEquals(subject1Schema1Response.getId(), subject1FromSchemaIdAndTopic.getId());
assertEquals(subject1Schema1Response.getSubject(), subject1FromSchemaIdAndTopic.getSubject());

var subject2FromSchemaIdAndTopic =
this.retrieve(HttpRequest.GET(BASE_URL + "/id/" + subject1Schema1Response.getId() + "?topic=subjectTopic2"), Schema.class);
assertEquals(subject2Schema1Response.getId(), subject2FromSchemaIdAndTopic.getId());
assertEquals(subject2Schema1Response.getSubject(), subject2FromSchemaIdAndTopic.getSubject());

// Clean
this.exchange(HttpRequest.DELETE(BASE_URL + "/subjectTopic1-value"));
this.exchange(HttpRequest.DELETE(BASE_URL + "/subjectTopic2-value"));
}
}

0 comments on commit 8831744

Please sign in to comment.