Skip to content

Commit

Permalink
AVRO-4010: [Rust] Avoid re-resolving schema on every read() (#2995)
Browse files Browse the repository at this point in the history
Co-authored-by: Michael Spector <[email protected]>
  • Loading branch information
spektom and Michael Spector committed Jul 3, 2024
1 parent b976076 commit f3b6ee2
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 52 deletions.
26 changes: 19 additions & 7 deletions lang/rust/avro/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ use crate::{
decode::{decode, decode_internal},
from_value,
rabin::Rabin,
schema::{AvroSchema, Names, ResolvedOwnedSchema, ResolvedSchema, Schema},
schema::{
resolve_names, resolve_names_with_schemata, AvroSchema, Names, ResolvedOwnedSchema,
ResolvedSchema, Schema,
},
types::Value,
util, AvroResult, Codec, Error,
};
Expand All @@ -47,6 +50,7 @@ struct Block<'r, R> {
writer_schema: Schema,
schemata: Vec<&'r Schema>,
user_metadata: HashMap<String, Vec<u8>>,
names_refs: Names,
}

impl<'r, R: Read> Block<'r, R> {
Expand All @@ -61,6 +65,7 @@ impl<'r, R: Read> Block<'r, R> {
message_count: 0,
marker: [0; 16],
user_metadata: Default::default(),
names_refs: Default::default(),
};

block.read_header()?;
Expand Down Expand Up @@ -179,13 +184,18 @@ impl<'r, R: Read> Block<'r, R> {

let mut block_bytes = &self.buf[self.buf_idx..];
let b_original = block_bytes.len();
let schemata = if self.schemata.is_empty() {
vec![&self.writer_schema]
} else {
self.schemata.clone()

let item = decode_internal(
&self.writer_schema,
&self.names_refs,
&None,
&mut block_bytes,
)?;
let item = match read_schema {
Some(schema) => item.resolve(schema)?,
None => item,
};
let item =
from_avro_datum_schemata(&self.writer_schema, schemata, &mut block_bytes, read_schema)?;

if b_original == block_bytes.len() {
// from_avro_datum did not consume any bytes, so return an error to avoid an infinite loop
return Err(Error::ReadBlock);
Expand Down Expand Up @@ -214,8 +224,10 @@ impl<'r, R: Read> Block<'r, R> {
.map(|(name, schema)| (name.clone(), (*schema).clone()))
.collect();
self.writer_schema = Schema::parse_with_names(&json, names)?;
resolve_names_with_schemata(&self.schemata, &mut self.names_refs, &None)?;
} else {
self.writer_schema = Schema::parse(&json)?;
resolve_names(&self.writer_schema, &mut self.names_refs, &None)?;
}
Ok(())
}
Expand Down
101 changes: 56 additions & 45 deletions lang/rust/avro/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ impl TryFrom<Schema> for ResolvedOwnedSchema {
names,
root_schema: schema,
};
Self::from_internal(&rs.root_schema, &mut rs.names, &None)?;
resolve_names(&rs.root_schema, &mut rs.names, &None)?;
Ok(rs)
}
}
Expand All @@ -557,57 +557,68 @@ impl ResolvedOwnedSchema {
pub(crate) fn get_names(&self) -> &Names {
&self.names
}
}

fn from_internal(
schema: &Schema,
names: &mut Names,
enclosing_namespace: &Namespace,
) -> AvroResult<()> {
match schema {
Schema::Array(schema) => Self::from_internal(&schema.items, names, enclosing_namespace),
Schema::Map(schema) => Self::from_internal(&schema.types, names, enclosing_namespace),
Schema::Union(UnionSchema { schemas, .. }) => {
for schema in schemas {
Self::from_internal(schema, names, enclosing_namespace)?
}
Ok(())
pub(crate) fn resolve_names(
schema: &Schema,
names: &mut Names,
enclosing_namespace: &Namespace,
) -> AvroResult<()> {
match schema {
Schema::Array(schema) => resolve_names(&schema.items, names, enclosing_namespace),
Schema::Map(schema) => resolve_names(&schema.types, names, enclosing_namespace),
Schema::Union(UnionSchema { schemas, .. }) => {
for schema in schemas {
resolve_names(schema, names, enclosing_namespace)?
}
Schema::Enum(EnumSchema { name, .. }) | Schema::Fixed(FixedSchema { name, .. }) => {
let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
if names
.insert(fully_qualified_name.clone(), schema.clone())
.is_some()
{
Err(Error::AmbiguousSchemaDefinition(fully_qualified_name))
} else {
Ok(())
}
Ok(())
}
Schema::Enum(EnumSchema { name, .. }) | Schema::Fixed(FixedSchema { name, .. }) => {
let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
if names
.insert(fully_qualified_name.clone(), schema.clone())
.is_some()
{
Err(Error::AmbiguousSchemaDefinition(fully_qualified_name))
} else {
Ok(())
}
Schema::Record(RecordSchema { name, fields, .. }) => {
let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
if names
.insert(fully_qualified_name.clone(), schema.clone())
.is_some()
{
Err(Error::AmbiguousSchemaDefinition(fully_qualified_name))
} else {
let record_namespace = fully_qualified_name.namespace;
for field in fields {
Self::from_internal(&field.schema, names, &record_namespace)?
}
Ok(())
}
Schema::Record(RecordSchema { name, fields, .. }) => {
let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
if names
.insert(fully_qualified_name.clone(), schema.clone())
.is_some()
{
Err(Error::AmbiguousSchemaDefinition(fully_qualified_name))
} else {
let record_namespace = fully_qualified_name.namespace;
for field in fields {
resolve_names(&field.schema, names, &record_namespace)?
}
Ok(())
}
Schema::Ref { name } => {
let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
names
.get(&fully_qualified_name)
.map(|_| ())
.ok_or(Error::SchemaResolutionError(fully_qualified_name))
}
_ => Ok(()),
}
Schema::Ref { name } => {
let fully_qualified_name = name.fully_qualified_name(enclosing_namespace);
names
.get(&fully_qualified_name)
.map(|_| ())
.ok_or(Error::SchemaResolutionError(fully_qualified_name))
}
_ => Ok(()),
}
}

pub(crate) fn resolve_names_with_schemata(
schemata: &Vec<&Schema>,
names: &mut Names,
enclosing_namespace: &Namespace,
) -> AvroResult<()> {
for schema in schemata {
resolve_names(schema, names, enclosing_namespace)?;
}
Ok(())
}

/// Represents a `field` in a `record` Avro schema.
Expand Down

0 comments on commit f3b6ee2

Please sign in to comment.