Skip to content

Commit

Permalink
AVRO-4010: [Rust] Avoid re-resolving schema on every read()
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael Spector committed Jul 2, 2024
1 parent b976076 commit a5cca39
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 13 deletions.
32 changes: 19 additions & 13 deletions lang/rust/avro/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ struct Block<'r, R> {
message_count: usize,
marker: [u8; 16],
codec: Codec,
writer_schema: Schema,
writer_schema: ResolvedOwnedSchema,
schemata: Vec<&'r Schema>,
user_metadata: HashMap<String, Vec<u8>>,
}
Expand All @@ -54,7 +54,7 @@ impl<'r, R: Read> Block<'r, R> {
let mut block = Block {
reader,
codec: Codec::Null,
writer_schema: Schema::Null,
writer_schema: Default::default(),
schemata,
buf: vec![],
buf_idx: 0,
Expand Down Expand Up @@ -179,13 +179,18 @@ impl<'r, R: Read> Block<'r, R> {

let mut block_bytes = &self.buf[self.buf_idx..];
let b_original = block_bytes.len();
let schemata = if self.schemata.is_empty() {
vec![&self.writer_schema]
} else {
self.schemata.clone()

let item = decode_internal(
self.writer_schema.get_root_schema(),
self.writer_schema.get_names(),
&None,
&mut block_bytes,
)?;
let item = match read_schema {
Some(schema) => item.resolve(schema)?,
None => item,
};
let item =
from_avro_datum_schemata(&self.writer_schema, schemata, &mut block_bytes, read_schema)?;

if b_original == block_bytes.len() {
// from_avro_datum did not consume any bytes, so return an error to avoid an infinite loop
return Err(Error::ReadBlock);
Expand All @@ -206,17 +211,18 @@ impl<'r, R: Read> Block<'r, R> {
}
})
.ok_or(Error::GetAvroSchemaFromMap)?;
if !self.schemata.is_empty() {
let writer_schema = if !self.schemata.is_empty() {
let rs = ResolvedSchema::try_from(self.schemata.clone())?;
let names: Names = rs
.get_names()
.iter()
.map(|(name, schema)| (name.clone(), (*schema).clone()))
.collect();
self.writer_schema = Schema::parse_with_names(&json, names)?;
Schema::parse_with_names(&json, names)?
} else {
self.writer_schema = Schema::parse(&json)?;
}
Schema::parse(&json)?
};
self.writer_schema = ResolvedOwnedSchema::try_from(writer_schema)?;
Ok(())
}

Expand Down Expand Up @@ -341,7 +347,7 @@ impl<'a, R: Read> Reader<'a, R> {
/// Get a reference to the writer `Schema`.
#[inline]
pub fn writer_schema(&self) -> &Schema {
&self.block.writer_schema
self.block.writer_schema.get_root_schema()
}

/// Get a reference to the optional reader `Schema`.
Expand Down
10 changes: 10 additions & 0 deletions lang/rust/avro/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,11 +531,21 @@ impl<'s> ResolvedSchema<'s> {
}
}

#[derive(Debug, Clone)]
pub(crate) struct ResolvedOwnedSchema {
names: Names,
root_schema: Schema,
}

impl Default for ResolvedOwnedSchema {
fn default() -> Self {
Self {
names: Default::default(),
root_schema: Schema::Null,
}
}
}

impl TryFrom<Schema> for ResolvedOwnedSchema {
type Error = Error;

Expand Down

0 comments on commit a5cca39

Please sign in to comment.