Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support sqlite #185

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,4 @@ bimap = { version = "0.6.0", features = [ "std" ] }
humantime-serde = "1.0.1"
bloomfilter = "1.0.5"
dynfmt = { version = "0.1.5", features = [ "curly" ] }
sqlx = { version = "0.5.7", features = [ "postgres", "mysql", "runtime-async-std-native-tls", "decimal", "chrono" ] }
sqlx = { version = "0.5.7", features = [ "sqlite", "postgres", "mysql", "runtime-async-std-native-tls", "decimal", "chrono" ] }
100 changes: 99 additions & 1 deletion core/src/graph/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use anyhow::{Context, Result};

use chrono::{DateTime, FixedOffset, NaiveDate, NaiveDateTime, NaiveTime, Utc};
use sqlx::mysql::MySqlTypeInfo;
use sqlx::{MySql, Postgres};
use sqlx::sqlite::SqliteTypeInfo;
use sqlx::{Sqlite, MySql, Postgres};
use sqlx::postgres::{PgArgumentBuffer, PgTypeInfo};
use sqlx::{Type, Encode, encode::IsNull};

Expand Down Expand Up @@ -202,6 +203,16 @@ impl Type<MySql> for Value {
}
}

impl Type<Sqlite> for Value {
fn type_info() -> SqliteTypeInfo {
<serde_json::value::Value as Type<Sqlite>>::type_info()
}

fn compatible(_ty: &SqliteTypeInfo) -> bool {
unreachable!("This should never happen. Please reach out to https://github.com/getsynth/synth/issues if it does.")
}
}

impl Encode<'_, Postgres> for Value {
fn encode_by_ref(
&self,
Expand Down Expand Up @@ -322,6 +333,93 @@ impl Encode<'_, MySql> for Value {
}
}

impl Encode<'_, Sqlite> for Value {
fn encode_by_ref(
&self,
buf: &mut Vec<sqlx::sqlite::SqliteArgumentValue>
) -> IsNull {
match self {
Value::Null(_) => IsNull::Yes,
Value::Bool(b) => <bool as Encode<'_, Sqlite>>::encode_by_ref(b, buf),
Value::Number(num) => {
match *num {
Number::I8(i) => <i8 as Encode<'_, Sqlite>>::encode_by_ref(&i, buf),
Number::I16(i) => <i16 as Encode<'_, Sqlite>>::encode_by_ref(&i, buf),
Number::I32(i) => <i32 as Encode<'_, Sqlite>>::encode_by_ref(&i, buf),
Number::I64(i) => <i64 as Encode<'_, Sqlite>>::encode_by_ref(&i, buf),
Number::I128(_i) => unreachable!("not supported in sqlite"),
Number::U8(i) => <i8 as Encode<'_, Sqlite>>::encode_by_ref(&(i as i8), buf),
Number::U16(i) => <i16 as Encode<'_, Sqlite>>::encode_by_ref(&(i as i16), buf),
Number::U32(i) => <u32 as Encode<'_, Sqlite>>::encode_by_ref(&i, buf),
Number::U64(i) => {
if let Ok(i) = u32::try_from(i) {
<u32 as Encode<'_, Sqlite>>::encode_by_ref(&i, buf)
} else {
panic!("sqlx (the lib for Sqlite) does not support u64. As a workaround we currently use u32 as a fallback, \
but it appears that the supplied number `{}` could not fit in an u32.", i);
}
},
Number::U128(_i) => unreachable!("not supported in sqlite"),
Number::F32(f) => <f32 as Encode<'_, Sqlite>>::encode_by_ref(&f, buf),
Number::F64(f) => <f64 as Encode<'_, Sqlite>>::encode_by_ref(&f, buf),
}
},
Value::String(s) => <String as Encode<'_, Sqlite>>::encode_by_ref(s, buf),
Value::DateTime(ChronoValueAndFormat { value, .. }) => {
match value {
ChronoValue::NaiveDate(nd) => <NaiveDate as Encode<'_, Sqlite>>::encode_by_ref(nd, buf),
ChronoValue::NaiveTime(nt) => <NaiveTime as Encode<'_, Sqlite>>::encode_by_ref(nt, buf),
ChronoValue::NaiveDateTime(ndt) => <NaiveDateTime as Encode<'_, Sqlite>>::encode_by_ref(ndt, buf),
ChronoValue::DateTime(dt) => <DateTime<Utc> as Encode<'_, Sqlite>>::encode_by_ref(&dt.with_timezone(&Utc), buf),
}
}
Value::Object(_) => {
<serde_json::Value as Encode<'_, Sqlite>>::encode(json::synth_val_to_json(self.clone()), buf)
},
Value::Array(_arr) => todo!()//<Vec<Value> as Encode<'_, Sqlite>>::encode_by_ref(arr, buf), //TODO special-case for u8 arrays?
}
}

fn produces(&self) -> Option<SqliteTypeInfo> {
Some(match self {
Value::Null(_) => return <serde_json::Value as Encode<'_, Sqlite>>::produces(&serde_json::Value::Null),
Value::Bool(_) => <bool as Type<Sqlite>>::type_info(),
Value::Number(num) => match num {
Number::I8(_) => <i8 as Type<Sqlite>>::type_info(),
Number::I16(_) => <i16 as Type<Sqlite>>::type_info(),
Number::I32(_) => <i32 as Type<Sqlite>>::type_info(),
Number::I64(_) => <i64 as Type<Sqlite>>::type_info(),
Number::I128(_) => unreachable!("not supported in sqlite"),
Number::U8(_) => <u8 as Type<Sqlite>>::type_info(),
Number::U16(_) => <u16 as Type<Sqlite>>::type_info(),
Number::U32(_) => <u32 as Type<Sqlite>>::type_info(),
// FIXME:(rasvi) [2021-10-03]
Number::U64(_) => <u32 as Type<Sqlite>>::type_info(),
Number::U128(_) => unreachable!("not supported in sqlite"),
Number::F32(_) => <f32 as Type<Sqlite>>::type_info(),
Number::F64(_) => <f64 as Type<Sqlite>>::type_info(),
},
Value::DateTime(ChronoValueAndFormat { value, .. }) => {
match value {
ChronoValue::NaiveDate(_) => <NaiveDate as Type<Sqlite>>::type_info(),
ChronoValue::NaiveTime(_) => <NaiveTime as Type<Sqlite>>::type_info(),
ChronoValue::NaiveDateTime(_) => <NaiveDateTime as Type<Sqlite>>::type_info(),
ChronoValue::DateTime(_) => <DateTime<Utc> as Type<Sqlite>>::type_info(),
}
},
Value::String(_) => <String as Type<Sqlite>>::type_info(),
Value::Object(_) => return None, //TODO: Use JSON here?
Value::Array(elems) => if elems.is_empty() {
return None
} else if let Value::Number(Number::U8(_) | Number::I8(_)) = elems[0] {
<Vec<u8> as Type<Sqlite>>::type_info()
} else {
return None //TODO: other variants that would make sense?
}
})
}
}

#[allow(unused)]
impl Value {
pub fn is_null(&self) -> bool {
Expand Down
21 changes: 10 additions & 11 deletions core/src/graph/number.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ macro_rules! any_range_int_impl {
fn is_empty(&self) -> bool {
match (&self.low, &self.high) {
(Bound::Excluded(low), Bound::Included(high))
| (Bound::Included(low), Bound::Excluded(high)) => low >= high,
| (Bound::Included(low), Bound::Excluded(high)) => low > high,
(Bound::Included(low), Bound::Included(high)) => low > high,
(Bound::Excluded(low), Bound::Excluded(high)) => *low + 1 >= *high,
_ => false
Expand All @@ -30,16 +30,15 @@ macro_rules! any_range_int_impl {

impl SampleRange<$target> for AnyRange<$target> {
fn sample_single<R: rand::RngCore + ?Sized>(self, rng: &mut R) -> $target {
let low = match self.low {
Bound::Unbounded => panic!("cannot sample {} range unbounded on the left", stringify!($target)),
Bound::Included(low) => low,
Bound::Excluded(low) => low + 1
};

match self.high {
Bound::Excluded(high) => rng.gen_range(low..high),
Bound::Included(high) => rng.gen_range(low..=high),
Bound::Unbounded => panic!("cannot sample {} range unbounded on the right", stringify!($target))
match (self.low, self.high) {
(Bound::Included(low), Bound::Included(high) | Bound::Excluded(high)) if low == high => low,
(Bound::Included(low) | Bound::Excluded(low), Bound::Included(high)) if low == high => high,
(Bound::Included(low), Bound::Excluded(high)) => {rng.gen_range(low..high)},
(Bound::Included(low), Bound::Included(high)) => {rng.gen_range(low..=high)},
(Bound::Excluded(low), Bound::Included(high)) => {rng.gen_range(low + 1..=high)},
(Bound::Excluded(low), Bound::Excluded(high)) => {rng.gen_range(low + 1..high)},
(Bound::Unbounded, _) => panic!("cannot sample {} range unbounded on the left", stringify!($target)),
(_, Bound::Unbounded) => panic!("cannot sample {} range unbounded on the right", stringify!($target)),
}
}

Expand Down
2 changes: 1 addition & 1 deletion synth/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ indicatif = "0.15.0"
dirs = "3.0.2"
mongodb = {version = "2.0.0-beta.3", features = ["sync", "bson-chrono-0_4"] , default-features = false}

sqlx = { version = "0.5.7", features = [ "postgres", "mysql", "runtime-async-std-native-tls", "decimal", "chrono" ] }
sqlx = { version = "0.5.7", features = [ "sqlite", "postgres", "mysql", "runtime-async-std-native-tls", "decimal", "chrono" ] }

beau_collector = "0.2.1"

Expand Down
7 changes: 6 additions & 1 deletion synth/src/cli/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::convert::TryFrom;

use crate::cli::mongo::MongoExportStrategy;
use crate::cli::mysql::MySqlExportStrategy;
use crate::cli::sqlite::SqliteExportStrategy;
use crate::datasource::DataSource;
use crate::sampler::{Sampler, SamplerOutput};
use async_std::task;
Expand Down Expand Up @@ -47,9 +48,13 @@ impl TryFrom<DataSourceParams> for Box<dyn ExportStrategy> {
Box::new(MySqlExportStrategy {
uri
})
} else if uri.starts_with("sqlite://") {
Box::new(SqliteExportStrategy {
uri
})
} else {
return Err(anyhow!(
"Data sink not recognized. Was expecting one of 'mongodb' or 'postgres' or 'mysql' or 'mariadb'"
"Data sink not recognized. Was expecting one of 'mongodb' or 'postgres' or 'mysql' or 'sqlite' or 'mariadb'"
));
};
Ok(export_strategy)
Expand Down
6 changes: 6 additions & 0 deletions synth/src/cli/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ use synth_core::schema::Namespace;
use crate::cli::db_utils::DataSourceParams;
use crate::cli::mongo::MongoImportStrategy;
use crate::cli::mysql::MySqlImportStrategy;
use crate::cli::sqlite::SqliteImportStrategy;
use crate::cli::postgres::PostgresImportStrategy;
use crate::cli::stdf::{FileImportStrategy, StdinImportStrategy};


pub trait ImportStrategy {
fn import(&self) -> Result<Namespace> {
ns_from_value(self.as_value()?)
Expand Down Expand Up @@ -45,6 +47,10 @@ impl TryFrom<DataSourceParams> for Box<dyn ImportStrategy> {
Box::new(MySqlImportStrategy {
uri,
})
} else if uri.starts_with("sqlite://") {
Box::new(SqliteImportStrategy {
uri,
})
} else if let Ok(path) = PathBuf::from_str(&uri) {
Box::new(FileImportStrategy {
from_file: path,
Expand Down
5 changes: 3 additions & 2 deletions synth/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mod import;
mod import_utils;
mod mongo;
mod mysql;
mod sqlite;
mod postgres;
mod stdf;
mod store;
Expand Down Expand Up @@ -137,9 +138,9 @@ impl Cli {
self.store.save_collection_path(&path, collection, content)?;
Ok(())
}
} else if self.store.ns_exists(&path) {
} else if self.store.ns_exists(&path) && !self.store.ns_is_empty_dir(&path) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a separate PR.

Err(anyhow!(
"The directory at `{}` already exists. Will not import into an existing directory.",
"The directory at `{}` already exists and is not empty. Will not import into an existing directory.",
path.display()
))
} else {
Expand Down
46 changes: 46 additions & 0 deletions synth/src/cli/sqlite.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use crate::cli::export::{create_and_insert_values, ExportParams, ExportStrategy};
use crate::cli::import::ImportStrategy;
use crate::cli::import_utils::build_namespace_import;
use crate::datasource::sqlite_datasource::SqliteDataSource;
use crate::datasource::DataSource;
use anyhow::Result;
use serde_json::Value;
use synth_core::schema::Namespace;
use synth_core::{Content, Name};

#[derive(Clone, Debug)]
pub struct SqliteExportStrategy {
pub uri: String,
}

impl ExportStrategy for SqliteExportStrategy {
fn export(&self, params: ExportParams) -> Result<()> {
let datasource = SqliteDataSource::new(&self.uri)?;

create_and_insert_values(params, &datasource)
}
}

#[derive(Clone, Debug)]
pub struct SqliteImportStrategy {
pub uri: String,
}

impl ImportStrategy for SqliteImportStrategy {
fn import(&self) -> Result<Namespace> {
let datasource = SqliteDataSource::new(&self.uri)?;

build_namespace_import(&datasource)
}

fn import_collection(&self, name: &Name) -> Result<Content> {
self.import()?
.collections
.remove(name)
.ok_or_else(|| anyhow!("Could not find table '{}' in Sqlite database.", name))
}

fn as_value(&self) -> Result<Value> {
bail!("Sqlite import doesn't support conversion into value")
}
}
5 changes: 5 additions & 0 deletions synth/src/cli/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ impl Store {
self.ns_path(namespace).exists()
}

pub fn ns_is_empty_dir(&self, namespace: &Path) -> bool {
self.ns_path(namespace).is_dir()
&& self.ns_path(namespace).read_dir().map(|mut dir| dir.next().is_none()).unwrap_or_default()
}

pub fn collection_exists(&self, namespace: &Path, collection: &Name) -> bool {
self.collection_path(namespace, collection).exists()
}
Expand Down
1 change: 1 addition & 0 deletions synth/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use anyhow::Result;
use async_trait::async_trait;
use synth_core::Value;

pub(crate) mod sqlite_datasource;
pub(crate) mod mysql_datasource;
pub(crate) mod postgres_datasource;
pub(crate) mod relational_datasource;
Expand Down
Loading