Skip to content

Commit

Permalink
AVRO-3666 [Java] Use the new schema parser (#2642)
Browse files Browse the repository at this point in the history
* AVRO-3666: Redo schema parsing code

This undoes the split schema parsing to allow forward references, which
is to be handles by the SchemaParser & ParseContext classes. It uses the
new ParseContext for the classic schema parser to accommodate this.

Next step: use the new SchemaParser and resolve unresolved / forward
references after parsing. This will also resolve "forward" references
that were parsed in subsequent files.

* AVRO-3666: Resolve references after parsing

By resolving references after parsing, we allow both forward references
within a file as between subsequent files.

This change also includes using the new SchemaParser everywhere, as
using it is the best way to flush out bugs.

* AVRO-3666: Remove wrong test

* AVRO-1535: Fix aliases as well

* AVRO-3666: Re-enable disabled test

Also includes changes necessary to debug.

* AVRO-3666: Fix RAT exclusion

The wrong exclusion was removed.

* AVRO-3666: Remove unused field

* AVRO-3666: Introduce SchemaParser.ParseResult

This ensures the SchemaParser never returns unresolved schemata.

* AVRO-3666: Use SchemaParser for documentation

* AVRO-3666: Refactor after review

* AVRO-3666: Fix javadoc

* AVRO-3666: Fix merge bug

* AVRO-3666: Fix CodeQL warnings

* AVRO-3666: Increase test coverage

* AVRO-3666: Fix tests

* AVRO-3666: Refactor schema parsing for readability

The JSON schema parser is quite complex (it is a large method). This
change splits it in multiple methods, naming the various stages.

* AVRO-3666: rename method to avoid confusion

* AVRO-3666: Reduce PR size

This change reduces the PR size, but does require some extra work after
merging: the new SchemaParser class is hardly used, and the (now)
obsolete Schema.Parser class is used heavily.

* AVRO-3666: Reduce PR size more

* AVRO-3666: Reduce PR size again

* AVRO-3666: Spotless

* Update lang/java/avro/src/main/java/org/apache/avro/Schema.java

Co-authored-by: Fokko Driesprong <[email protected]>

* AVRO-3666: Spotless

---------

Co-authored-by: Fokko Driesprong <[email protected]>
  • Loading branch information
opwvhk and Fokko committed Apr 4, 2024
1 parent 44a2355 commit 876eae3
Show file tree
Hide file tree
Showing 23 changed files with 809 additions and 784 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ Let's go over the same example as in the previous section, but without using cod
First, we use a SchemaParser to read our schema definition and create a Schema object.

```java
Schema schema = new SchemaParser().parse(new File("user.avsc"));
Schema schema = new SchemaParser().parse(new File("user.avsc")).mainSchema();
```

Using this schema, let's create some users.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public interface FormattedSchemaParser {
* when expecting JSON), it is a good idea not to do anything (especially
* calling methods on the @code ParseContext}).</li>
* <li>The parameter {@code parseContext} is not thread-safe.</li>
* <li>When parsing, all parsed schema definitions should be added to the
* <li>All named schema definitions that are parsed should be added to the
* provided {@link ParseContext}.</li>
* <li>Optionally, you may return a "main" schema. Some schema definitions have
* one, for example the schema defined by the root of the JSON document in a
Expand All @@ -62,9 +62,9 @@ public interface FormattedSchemaParser {
* the parsing process, so reserve that for rethrowing exceptions.</li>
* </ul>
*
* @param parseContext the current parse context: all parsed schemata should
* be added here to resolve names with; contains all
* previously known types
* @param parseContext the current parse context: all named schemata that are
* parsed should be added here, otherwise resolving
* schemata can fail; contains all previously known types
* @param baseUri the base location of the schema, or {@code null} if
* not known
* @param formattedSchema the text of the schema definition(s) to parse
Expand Down
28 changes: 17 additions & 11 deletions lang/java/avro/src/main/java/org/apache/avro/JsonSchemaParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,26 +57,32 @@ public static Schema parseInternal(String... fragments) {
for (String fragment : fragments) {
buffer.append(fragment);
}
return new JsonSchemaParser().parse(new ParseContext(NameValidator.NO_VALIDATION), buffer, null);

boolean saved = Schema.getValidateDefaults();
try {
Schema.setValidateDefaults(false);
ParseContext context = new ParseContext(NameValidator.NO_VALIDATION);
Schema schema = new JsonSchemaParser().parse(context, buffer, true);
context.commit();
context.resolveAllSchemas();
return context.resolve(schema);
} finally {
Schema.setValidateDefaults(saved);
}
}

@Override
public Schema parse(ParseContext parseContext, URI baseUri, CharSequence formattedSchema)
throws IOException, SchemaParseException {
return parse(parseContext, formattedSchema, parseContext.nameValidator);
return parse(parseContext, formattedSchema, false);
}

private Schema parse(ParseContext parseContext, CharSequence formattedSchema, NameValidator nameValidator)
private Schema parse(ParseContext parseContext, CharSequence formattedSchema, boolean allowInvalidDefaults)
throws SchemaParseException {
Schema.Parser parser = new Schema.Parser(nameValidator);
if (nameValidator == NameValidator.NO_VALIDATION) {
Schema.Parser parser = new Schema.Parser(parseContext);
if (allowInvalidDefaults) {
parser.setValidateDefaults(false);
} else {
parser = new Schema.Parser(nameValidator);
}
parser.addTypes(parseContext.typesByName().values());
Schema schema = parser.parse(formattedSchema.toString());
parser.getTypes().values().forEach(parseContext::put);
return schema;
return parser.parseInternal(formattedSchema.toString());
}
}
198 changes: 158 additions & 40 deletions lang/java/avro/src/main/java/org/apache/avro/ParseContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,36 @@
package org.apache.avro;

import org.apache.avro.util.SchemaResolver;
import org.apache.avro.util.Schemas;

import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static java.util.Objects.requireNonNull;

/**
* Class to define a name context, useful to reference schemata with. This
* allows for the following:
*
* <ul>
* <li>Provide a default namespace for nested contexts, as found for example in
* JSON based schema definitions.</li>
* <li>Find schemata by name, including primitives.</li>
* <li>Collect new named schemata.</li>
* <li>Find schemata by name, including primitives.</li>
* <li>Find schemas that do not exist yet.</li>
* <li>Resolve references to schemas that didn't exist yet when first used.</li>
* </ul>
*
* <p>
* This class is NOT thread-safe.
* </p>
*
* <p>
* Note: this class has no use for most Avro users, but is a key component when
* implementing a schema parser.
* </p>
Expand All @@ -60,10 +71,27 @@ public class ParseContext {

private static final Set<Schema.Type> NAMED_SCHEMA_TYPES = EnumSet.of(Schema.Type.RECORD, Schema.Type.ENUM,
Schema.Type.FIXED);
/**
* Collection of old schemata. Can contain unresolved references if !isResolved.
*/
private final Map<String, Schema> oldSchemas;
/**
* Collection of new schemata. Can contain unresolved references.
*/
private final Map<String, Schema> newSchemas;
/**
* The name validator to use.
*/
// Visible for use in JsonSchemaParser
final NameValidator nameValidator;
/**
* Visitor that was used to resolve schemata with. If not available, some
* schemata in {@code oldSchemas} may not be fully resolved. If available, all
* schemata in {@code oldSchemas} are resolved, and {@code newSchemas} is empty.
* After visiting a schema, it can return the corresponding resolved schema for
* a schema that possibly contains unresolved references.
*/
private SchemaResolver.ResolvingVisitor resolvingVisitor;

/**
* Create a {@code ParseContext} for the default/{@code null} namespace, using
Expand All @@ -78,22 +106,14 @@ public ParseContext() {
* schemata.
*/
public ParseContext(NameValidator nameValidator) {
this(nameValidator, new LinkedHashMap<>(), new LinkedHashMap<>());
this(requireNonNull(nameValidator), new LinkedHashMap<>(), new LinkedHashMap<>());
}

private ParseContext(NameValidator nameValidator, Map<String, Schema> oldSchemas, Map<String, Schema> newSchemas) {
this.nameValidator = nameValidator;
this.oldSchemas = oldSchemas;
this.newSchemas = newSchemas;
}

/**
* Create a derived context using a different fallback namespace.
*
* @return a new context
*/
public ParseContext namespace() {
return new ParseContext(nameValidator, oldSchemas, newSchemas);
resolvingVisitor = null;
}

/**
Expand All @@ -109,56 +129,71 @@ public boolean contains(String name) {

/**
* <p>
* Resolve a schema by name.
* Find a schema by name and namespace.
* </p>
*
* <p>
* That is:
* </p>
*
* <ul>
* <li>If {@code fullName} is a primitive name, return a (new) schema for
* it</li>
* <li>Otherwise: resolve the schema in its own namespace and in the null
* namespace (the former takes precedence)</li>
* </ul>
* <ol>
* <li>If {@code name} is a primitive name, return a (new) schema for it</li>
* <li>Otherwise, determine the full schema name (using the given
* {@code namespace} if necessary), and find it</li>
* <li>If no schema was found and {@code name} is a simple name, find the schema
* in the default (null) namespace</li>
* <li>If still no schema was found, return an unresolved reference for the full
* schema name (see step 2)</li>
* </ol>
*
* Resolving means that the schema is returned if known, and otherwise an
* unresolved schema (a reference) is returned.
* <p>
* Note: as an unresolved reference might be returned, the schema is not
* directly usable. Please {@link #put(Schema)} the schema using it in the
* context. The {@link SchemaParser} and protocol parsers will ensure you'll
* only get a resolved schema that is usable.
* </p>
*
* @param fullName the full schema name to resolve
* @return the schema
* @throws SchemaParseException when the schema does not exist
* @param name the schema name to find
* @param namespace the namespace to find the schema against
* @return the schema, or an unresolved reference
*/
public Schema resolve(String fullName) {
Schema.Type type = PRIMITIVES.get(fullName);
public Schema find(String name, String namespace) {
Schema.Type type = PRIMITIVES.get(name);
if (type != null) {
return Schema.create(type);
}

Schema schema = getSchema(fullName);
String fullName = fullName(name, namespace);
Schema schema = getNamedSchema(fullName);
if (schema == null) {
// Not found; attempt to resolve in the default namespace
int lastDot = fullName.lastIndexOf('.');
String name = fullName.substring(lastDot + 1);
schema = getSchema(name);
schema = getNamedSchema(name);
}

return schema != null ? schema : SchemaResolver.unresolvedSchema(fullName);
}

private Schema getSchema(String fullName) {
private String fullName(String name, String namespace) {
if (namespace != null && name.lastIndexOf('.') < 0) {
return namespace + "." + name;
}
return name;
}

/**
* Get a schema by name. Note that the schema might not (yet) be resolved/usable
* until {@link #resolveAllSchemas()} has been called.
*
* @param fullName a full schema name
* @return the schema, if known
*/
public Schema getNamedSchema(String fullName) {
Schema schema = oldSchemas.get(fullName);
if (schema == null) {
schema = newSchemas.get(fullName);
}
return schema;
}

private boolean notEmpty(String str) {
return str != null && !str.isEmpty();
}

/**
* Put the schema into this context. This is an idempotent operation: it only
* fails if this context already has a different schema with the same name.
Expand All @@ -184,6 +219,7 @@ public void put(Schema schema) {
throw new SchemaParseException("Can't redefine: " + fullName);
}
} else {
resolvingVisitor = null;
Schema previouslyAddedSchema = newSchemas.putIfAbsent(fullName, schema);
if (previouslyAddedSchema != null && !previouslyAddedSchema.equals(schema)) {
throw new SchemaParseException("Can't redefine: " + fullName);
Expand All @@ -200,10 +236,10 @@ private String requireValidFullName(String fullName) {
return fullName;
}

private void validateName(String name, String what) {
private void validateName(String name, String typeOfName) {
NameValidator.Result result = nameValidator.validate(name);
if (!result.isOK()) {
throw new SchemaParseException(what + " \"" + name + "\" is invalid: " + result.getErrors());
throw new SchemaParseException(typeOfName + " \"" + name + "\" is invalid: " + result.getErrors());
}
}

Expand All @@ -216,12 +252,94 @@ public void commit() {
newSchemas.clear();
}

public SchemaParser.ParseResult commit(Schema mainSchema) {
Collection<Schema> parsedNamedSchemas = newSchemas.values();
SchemaParser.ParseResult parseResult = new SchemaParser.ParseResult() {
@Override
public Schema mainSchema() {
return mainSchema == null ? null : resolve(mainSchema);
}

@Override
public List<Schema> parsedNamedSchemas() {
return parsedNamedSchemas.stream().map(ParseContext.this::resolve).collect(Collectors.toList());
}
};
commit();
return parseResult;
}

public void rollback() {
newSchemas.clear();
}

/**
* Return all known types by their fullname.
* Resolve all (named) schemas that were parsed. This resolves all forward
* references, even if parsed from different files. Note: the context must be
* committed for this method to work.
*
* @return all parsed schemas, in the order they were parsed
* @throws AvroTypeException if a schema reference cannot be resolved
*/
public List<Schema> resolveAllSchemas() {
ensureSchemasAreResolved();

return new ArrayList<>(oldSchemas.values());
}

private void ensureSchemasAreResolved() {
if (hasNewSchemas()) {
throw new IllegalStateException("Schemas cannot be resolved unless the ParseContext is committed.");
}
if (resolvingVisitor == null) {
NameValidator saved = Schema.getNameValidator();
try {
// Ensure we use the same validation when copying schemas as when they were
// defined.
Schema.setNameValidator(nameValidator);
SchemaResolver.ResolvingVisitor visitor = new SchemaResolver.ResolvingVisitor(oldSchemas::get);
oldSchemas.values().forEach(schema -> Schemas.visit(schema, visitor));
// Before this point is where we can get exceptions due to resolving failures.
for (Map.Entry<String, Schema> entry : oldSchemas.entrySet()) {
entry.setValue(visitor.getResolved(entry.getValue()));
}
resolvingVisitor = visitor;
} finally {
Schema.setNameValidator(saved);
}
}
}

/**
* Resolve unresolved references in a schema <em>that was parsed for this
* context</em> using the types known to this context. Note: this method will
* ensure all known schemas are resolved, or throw, and thus requires the
* context to be committed.
*
* @param schema the schema resolve
* @return the fully resolved schema
* @throws AvroTypeException if a schema reference cannot be resolved
*/
public Schema resolve(Schema schema) {
ensureSchemasAreResolved();

// As all (named) schemas are resolved now, we know:
// — All named types are either in oldSchemas or unknown.
// — All unnamed types can be visited&resolved without validation.

if (NAMED_SCHEMA_TYPES.contains(schema.getType()) && schema.getFullName() != null) {
return requireNonNull(oldSchemas.get(schema.getFullName()), () -> "Unknown schema: " + schema.getFullName());
} else {
// Unnamed or anonymous schema
// (protocol message request parameters are anonymous records)
Schemas.visit(schema, resolvingVisitor); // This field is set, as ensureSchemasAreResolved(); was called.
return resolvingVisitor.getResolved(schema);
}
}

/**
* Return all known types by their fullname. Warning: this returns all types,
* even uncommitted ones, including unresolved references!
*
* @return a map of all types by their name
*/
Expand Down
Loading

0 comments on commit 876eae3

Please sign in to comment.