Skip to content

Commit

Permalink
AVRO-3904: [Rust] schema_compatibility module refactor (#2742)
Browse files Browse the repository at this point in the history
  • Loading branch information
marcosschroh committed Feb 20, 2024
1 parent 258571b commit 9f4f84e
Showing 1 changed file with 70 additions and 114 deletions.
184 changes: 70 additions & 114 deletions lang/rust/avro/src/schema_compatibility.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,15 +181,14 @@ impl Checker {
// Check whether any of the possible fields names are in the writer schema.
// If the field was found, then it must have the exact same name with the writer field,
// otherwise we would have a false positive with the writers aliases
let mut position = None;
for field_name in fields_names {
if let Some(pos) = w_lookup.get(field_name) {
if &w_fields[*pos].name == field_name {
position = Some(pos);
break;
let position = fields_names.iter().find_map(|field_name| {
if let Some(pos) = w_lookup.get(*field_name) {
if &w_fields[*pos].name == *field_name {
return Some(pos);
}
}
}
None
});

match position {
Some(pos) => {
Expand Down Expand Up @@ -324,10 +323,10 @@ impl SchemaCompatibility {

fn check_writer_type(
writers_schema: &Schema,
allowed_schema: Schema,
allowed_schema: &Schema,
expected_schema_types: Vec<SchemaKind>,
) -> Result<(), CompatibilityError> {
if allowed_schema == *writers_schema {
if *allowed_schema == *writers_schema {
Ok(())
} else {
Err(CompatibilityError::TypeExpected {
Expand Down Expand Up @@ -355,24 +354,24 @@ impl SchemaCompatibility {
if let Schema::Record(RecordSchema { name: r_name, .. }) = readers_schema {
if w_name.name == r_name.name {
return Ok(());
} else {
return Err(CompatibilityError::NameMismatch {
writer_name: w_name.name.clone(),
reader_name: r_name.name.clone(),
});
}
} else {
return Err(CompatibilityError::TypeExpected {
schema_type: String::from("readers_schema"),
expected_type: vec![SchemaKind::Record],

return Err(CompatibilityError::NameMismatch {
writer_name: w_name.name.clone(),
reader_name: r_name.name.clone(),
});
}
} else {

return Err(CompatibilityError::TypeExpected {
schema_type: String::from("writers_schema"),
expected_type: vec![SchemaKind::Record],
schema_type: String::from("readers_schema"),
expected_type: vec![r_type],
});
}

return Err(CompatibilityError::TypeExpected {
schema_type: String::from("writers_schema"),
expected_type: vec![r_type],
});
}
SchemaKind::Fixed => {
if let Schema::Fixed(FixedSchema {
Expand All @@ -394,144 +393,101 @@ impl SchemaCompatibility {
return (w_name.name == r_name.name && w_size == r_size)
.then_some(())
.ok_or(CompatibilityError::FixedMismatch);
} else {
return Err(CompatibilityError::TypeExpected {
schema_type: String::from("writers_schema"),
expected_type: vec![SchemaKind::Fixed],
});
}

return Err(CompatibilityError::TypeExpected {
schema_type: String::from("writers_schema"),
expected_type: vec![r_type],
});
}
}
SchemaKind::Enum => {
if let Schema::Enum(EnumSchema { name: w_name, .. }) = writers_schema {
if let Schema::Enum(EnumSchema { name: r_name, .. }) = readers_schema {
if w_name.name == r_name.name {
return Ok(());
} else {
return Err(CompatibilityError::NameMismatch {
writer_name: w_name.name.clone(),
reader_name: r_name.name.clone(),
});
}
} else {
return Err(CompatibilityError::TypeExpected {
schema_type: String::from("readers_schema"),
expected_type: vec![SchemaKind::Enum],

return Err(CompatibilityError::NameMismatch {
writer_name: w_name.name.clone(),
reader_name: r_name.name.clone(),
});
}
} else {

return Err(CompatibilityError::TypeExpected {
schema_type: String::from("writers_schema"),
expected_type: vec![SchemaKind::Enum],
schema_type: String::from("readers_schema"),
expected_type: vec![r_type],
});
}

return Err(CompatibilityError::TypeExpected {
schema_type: String::from("writers_schema"),
expected_type: vec![r_type],
});
}
SchemaKind::Map => {
if let Schema::Map(w_m) = writers_schema {
if let Schema::Map(r_m) = readers_schema {
return SchemaCompatibility::match_schemas(&w_m.types, &r_m.types);
} else {
return Err(CompatibilityError::TypeExpected {
schema_type: String::from("readers_schema"),
expected_type: vec![SchemaKind::Map],
});
}
} else {

return Err(CompatibilityError::TypeExpected {
schema_type: String::from("writers_schema"),
expected_type: vec![SchemaKind::Map],
schema_type: String::from("readers_schema"),
expected_type: vec![r_type],
});
}

return Err(CompatibilityError::TypeExpected {
schema_type: String::from("writers_schema"),
expected_type: vec![r_type],
});
}
SchemaKind::Array => {
if let Schema::Array(w_a) = writers_schema {
if let Schema::Array(r_a) = readers_schema {
return SchemaCompatibility::match_schemas(&w_a.items, &r_a.items);
} else {
return Err(CompatibilityError::TypeExpected {
schema_type: String::from("readers_schema"),
expected_type: vec![SchemaKind::Array],
});
}
} else {

return Err(CompatibilityError::TypeExpected {
schema_type: String::from("writers_schema"),
expected_type: vec![SchemaKind::Array],
schema_type: String::from("readers_schema"),
expected_type: vec![r_type],
});
}

return Err(CompatibilityError::TypeExpected {
schema_type: String::from("writers_schema"),
expected_type: vec![r_type],
});
}
SchemaKind::Date => {
return check_writer_type(
writers_schema,
Schema::Date,
vec![SchemaKind::Date, SchemaKind::Int],
);
}
SchemaKind::TimeMillis => {
return check_writer_type(
writers_schema,
Schema::TimeMillis,
vec![SchemaKind::Date, SchemaKind::Int],
);
}
SchemaKind::TimeMicros => {
return check_writer_type(
writers_schema,
Schema::TimeMicros,
vec![SchemaKind::TimeMicros, SchemaKind::Long],
);
}
SchemaKind::TimestampNanos => {
return check_writer_type(
writers_schema,
Schema::TimestampNanos,
vec![SchemaKind::TimeMicros, SchemaKind::Long],
);
}
SchemaKind::TimestampMillis => {
return check_writer_type(
writers_schema,
Schema::TimestampMillis,
vec![SchemaKind::TimeMicros, SchemaKind::Long],
);
}
SchemaKind::TimestampMicros => {
return check_writer_type(
writers_schema,
Schema::TimestampMicros,
vec![SchemaKind::TimeMicros, SchemaKind::Long],
);
}
SchemaKind::LocalTimestampMillis => {
return check_writer_type(
writers_schema,
Schema::LocalTimestampMillis,
vec![SchemaKind::TimeMicros, SchemaKind::Long],
);
}
SchemaKind::LocalTimestampMicros => {
SchemaKind::Date | SchemaKind::TimeMillis => {
return check_writer_type(
writers_schema,
Schema::LocalTimestampMicros,
vec![SchemaKind::TimeMicros, SchemaKind::Long],
readers_schema,
vec![r_type, SchemaKind::Int],
);
}
SchemaKind::LocalTimestampNanos => {
SchemaKind::TimeMicros
| SchemaKind::TimestampNanos
| SchemaKind::TimestampMillis
| SchemaKind::TimestampMicros
| SchemaKind::LocalTimestampMillis
| SchemaKind::LocalTimestampMicros
| SchemaKind::LocalTimestampNanos => {
return check_writer_type(
writers_schema,
Schema::TimeMicros,
vec![SchemaKind::TimeMicros, SchemaKind::Long],
readers_schema,
vec![r_type, SchemaKind::Long],
);
}
SchemaKind::Duration => {
if let Schema::Duration = writers_schema {
return Ok(());
} else {
return Err(CompatibilityError::TypeExpected {
schema_type: String::from("writers_schema"),
expected_type: vec![SchemaKind::Duration, SchemaKind::Fixed],
});
}

return Err(CompatibilityError::TypeExpected {
schema_type: String::from("writers_schema"),
expected_type: vec![r_type, SchemaKind::Fixed],
});
}
_ => {
return Err(CompatibilityError::Inconclusive(String::from(
Expand Down

0 comments on commit 9f4f84e

Please sign in to comment.