Skip to content

Commit

Permalink
feat(connect): include worker override properties in connect configs …
Browse files Browse the repository at this point in the history
…tab (#1344)

close #1336 

Co-authored-by: alozano3 <[email protected]>
  • Loading branch information
2 people authored and tchiotludo committed Apr 4, 2023
1 parent 7392420 commit 0be6762
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@ import Joi from 'joi-browser';
import './styles.scss';
import {
uriConnectDefinitionConfigs,
uriConnectPlugin,
uriUpdateDefinition
uriUpdateDefinition,
uriValidatePluginConfigs
} from '../../../../utils/endpoints';
import constants from '../../../../utils/constants';
import Form from '../../../../components/Form/Form';
import AceEditor from 'react-ace';
import filter from 'lodash/filter';
import 'ace-builds/webpack-resolver';
import 'ace-builds/src-noconflict/mode-json';
Expand Down Expand Up @@ -49,9 +48,10 @@ class ConnectConfigs extends Form {

async getPlugin(pluginId) {
const { connectId, clusterId } = this.state;
const { configs } = this.state;
let plugin = {};

plugin = await this.getApi(uriConnectPlugin(clusterId, connectId, pluginId));
let body = { configs };
plugin = await this.putApi(uriValidatePluginConfigs(clusterId, connectId, pluginId), body);
this.setState({ plugin: plugin.data }, () => {
this.renderForm();
});
Expand All @@ -68,10 +68,6 @@ class ConnectConfigs extends Form {
definitions.forEach(definition => {
formData[definition.name] = this.getConfigValue(definition.name);
this.schema[definition.name] = this.handleDefinition(definition);
if (definition.name === 'transforms') {
formData['transformsprops'] = this.getTransformAdditionalProperties() || '{}';
this.schema['transformsprops'] = Joi.object().required();
}
});
this.setState({ formData });
};
Expand All @@ -83,17 +79,6 @@ class ConnectConfigs extends Form {
return existingConfig ? configs[existingConfig] : '';
};

getTransformAdditionalProperties() {
const { configs } = this.state;
const filtered = Object.keys(configs)
.filter(configKey => configKey.startsWith('transforms.'))
.reduce((obj, configKey) => {
obj[configKey] = configs[configKey];
return obj;
}, {});
return JSON.stringify(filtered, null, 2);
}

handleDefinition = definition => {
let def = '';
if (definition.required) {
Expand Down Expand Up @@ -256,7 +241,6 @@ class ConnectConfigs extends Form {
};

handleGroup(group) {
let { formData } = this.state;
let groupDisplay = [
<tr key={0} className="bg-primary">
<td colSpan="3">{group[0].group}</td>
Expand All @@ -265,66 +249,7 @@ class ConnectConfigs extends Form {

group.forEach(element => {
const rows = this.renderTableRows(element);
const errors = [];
const roles = this.state.roles || {};

groupDisplay.push(<tr>{rows}</tr>);
if (element.name === 'transforms') {
const errorMessage = this.validateProperty({
name: 'transformsprops',
value: formData['transformsprops']
});
if (errorMessage) {
errors['transformsprops'] = errorMessage;
}
let transform = (
<React.Fragment>
<td>
<code>Transforms additional properties</code>
<small className="form-text text-muted">
{`Json object to be added to configurations. example:
{
"transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields":"c1",
"transforms.extractInt.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractInt.field":"c1"
}`}
</small>
</td>
<td>
<AceEditor
mode="json"
id={'transformsprops'}
theme="merbivore_soft"
value={formData['transformsprops']}
onChange={value => {
let { formData } = this.state;
const errors = { ...this.state.errors };
const errorMessage = this.validateProperty({ name: 'transformsprops', value });
if (errorMessage) {
errors['transformsprops'] = errorMessage;
} else {
delete errors['transformsprops'];
}
formData['transformsprops'] = value;
this.handleData();
this.setState({ formData });
}}
name="UNIQUE_ID_OF_DIV"
readOnly={!(roles.connect && roles.connect['connect/update'])}
editorProps={{ $blockScrolling: true }}
style={{ width: '100%', minHeight: '25vh' }}
/>
{errors['transformsprops'] && (
<div id="input-error" className="alert alert-danger mt-1 p-1">
{errors['transformsprops']}
</div>
)}
</td>
</React.Fragment>
);
groupDisplay.push(transform);
}
});
return groupDisplay;
}
Expand All @@ -342,24 +267,13 @@ class ConnectConfigs extends Form {
};
let configs = {};
Object.keys(formData).forEach(key => {
if (
key !== 'subject' &&
key !== 'transformsprops' &&
key !== 'type' &&
key !== 'name' &&
formData[key] !== ''
) {
if (key !== 'subject' && key !== 'type' && key !== 'name' && formData[key] !== '') {
configs[`${key}`] = formData[key];
} else if (key === 'type') {
configs['connector.class'] = formData[key];
}
});

const transformsValue = JSON.parse(formData.transformsprops);
Object.keys(transformsValue).forEach(key => {
configs[key] = transformsValue[key];
});

body.configs = configs;

const { history } = this.props;
Expand Down
4 changes: 4 additions & 0 deletions client/src/utils/endpoints.js
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ export const uriConnectPlugin = (clusterId, connectId, pluginId) => {
return `${apiUrl}/${clusterId}/connect/${connectId}/plugins/${pluginId}`;
};

export const uriValidatePluginConfigs = (clusterId, connectId, pluginId) => {
return `${apiUrl}/${clusterId}/connect/${connectId}/plugins/${pluginId}/validate`;
};

export const uriCreateConnect = (clusterId, connectId) => {
return `${apiUrl}/${clusterId}/connect/${connectId}`;
};
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/org/akhq/controllers/ConnectController.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import io.micronaut.http.annotation.Delete;
import io.micronaut.http.annotation.Get;
import io.micronaut.http.annotation.Post;
import io.micronaut.http.annotation.Put;

import io.micronaut.security.annotation.Secured;
import io.swagger.v3.oas.annotations.Operation;
import org.akhq.configs.Role;
Expand Down Expand Up @@ -69,6 +71,12 @@ public ConnectPlugin plugins(String cluster, String connectId, String type) {
.orElseThrow();
}

@Put("/plugins/{type}/validate")
@Operation(tags = {"connect"}, summary = "Validate plugin configs")
public ConnectPlugin validatePlugin(String cluster, String connectId, String type, Map<String, String> configs) {
return connectRepository.validatePlugin(cluster, connectId, type, configs).orElseThrow();
}

@Secured(Role.ROLE_CONNECT_INSERT)
@Post
@Operation(tags = {"connect"}, summary = "Create a new connect definition")
Expand Down
36 changes: 21 additions & 15 deletions src/main/java/org/akhq/repositories/ConnectRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,15 +100,15 @@ public List<ConnectDefinition> getDefinitions(String clusterId, String connectId

return filtered;
}

public Optional<ConnectPlugin> getPlugin(String clusterId, String connectId, String className) {
public Optional<ConnectPlugin> validatePlugin(String clusterId, String connectId, String className,
Map<String, String> configs) {
return this.kafkaModule
.getConnectRestClient(clusterId)
.get(connectId)
.getConnectorPlugins()
.stream()
.filter(connectPlugin -> getShortClassName(connectPlugin.getClassName()).equals(className))
.map(s -> mapToConnectPlugin(s, clusterId, connectId))
.filter(connectPlugin -> connectPlugin.getClassName().equals(className))
.map(s -> mapToConnectPlugin(s, clusterId, connectId, configs))
.findFirst();
}

Expand Down Expand Up @@ -229,18 +229,24 @@ public static Map<String, String> validConfigs(Map<String, String> configs, Stri
}

private ConnectPlugin mapToConnectPlugin(ConnectorPlugin plugin, String clusterId, String connectId) {
Map<String,String> config = ImmutableMap.of(
"connector.class", plugin.getClassName(),
"topics", "getPlugins"
);
return this.mapToConnectPlugin(plugin, clusterId, connectId, config);
}

private ConnectPlugin mapToConnectPlugin(ConnectorPlugin plugin, String clusterId, String connectId,
Map<String,String> config) {
return new ConnectPlugin(
plugin,
this.kafkaModule
.getConnectRestClient(clusterId)
.get(connectId)
.validateConnectorPluginConfig(new ConnectorPluginConfigDefinition(
Iterables.getLast(Arrays.asList(plugin.getClassName().split("/"))),
ImmutableMap.of(
"connector.class", plugin.getClassName(),
"topics", "getPlugins"
)
)));
plugin,
this.kafkaModule
.getConnectRestClient(clusterId)
.get(connectId)
.validateConnectorPluginConfig(new ConnectorPluginConfigDefinition(
Iterables.getLast(Arrays.asList(plugin.getClassName().split("/"))),
config
)));
}

private String getShortClassName(String className) {
Expand Down
20 changes: 20 additions & 0 deletions src/test/java/org/akhq/controllers/ConnectControllerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,26 @@ void pluginsApi() {
assertTrue(result.getDefinitions().size() > 0);
}

@Test
@Order(2)
void validatePluginApi() {
ConnectPlugin result = this.retrieve(HttpRequest.PUT(
BASE_URL + "/plugins/org.apache.kafka.connect.file.FileStreamSinkConnector/validate",
ImmutableMap.of(
"configs", ImmutableMap.of(
"connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
"file", PATH2,
"topics", KafkaTestCluster.TOPIC_CONNECT,
"consumer.override.retry.backoff.ms","400"
)
)
), ConnectPlugin.class);

assertEquals("sink", result.getType());
assertTrue(result.getDefinitions().size() > 0);
assertTrue(result.getDefinitions().stream().anyMatch(definition -> definition.getName().equals("consumer.override.retry.backoff.ms")));
}

@Test
@Order(5)
void updateApi() {
Expand Down
14 changes: 0 additions & 14 deletions src/test/java/org/akhq/repositories/ConnectRepositoryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,6 @@ void getPlugins() {
assertEquals(2, all.size());
}

@Test
void getPlugin() {
Optional<ConnectPlugin> plugin = repository.getPlugin(
KafkaTestCluster.CLUSTER_ID,
"connect-1",
"FileStreamSinkConnector"
);

assertTrue(plugin.isPresent());
assertEquals("FileStreamSinkConnector", plugin.get().getShortClassName());
assertEquals("sink", plugin.get().getType());
assertTrue(plugin.get().getDefinitions().stream().anyMatch(definition -> definition.getName().equals("file")));
}

@AfterEach
void cleanup() {
try {
Expand Down

0 comments on commit 0be6762

Please sign in to comment.