241 lines
9 KiB
Rust
241 lines
9 KiB
Rust
|
|
use deno_core::futures::StreamExt;
|
||
|
|
use deno_core::{Extension, OpState};
|
||
|
|
use elm_project_utils::ElmResult;
|
||
|
|
use serde::{Deserialize, Serialize};
|
||
|
|
use serde_json::value::Number;
|
||
|
|
use serde_json::Value;
|
||
|
|
use sqlx::sqlite::SqlitePool;
|
||
|
|
use sqlx::Row;
|
||
|
|
use sqlx::{Column, TypeInfo, ValueRef};
|
||
|
|
use std::thread::JoinHandle;
|
||
|
|
use std::time::Instant;
|
||
|
|
use tokio;
|
||
|
|
use tracing::{info_span, Instrument};
|
||
|
|
|
||
|
|
type SQLWorkerMailbox = std::sync::mpsc::Sender<(
|
||
|
|
oneshot::Sender<ElmResult<Vec<Vec<String>>, AstridQueryError>>,
|
||
|
|
Vec<(bool, String, Vec<String>)>,
|
||
|
|
)>;
|
||
|
|
|
||
|
|
pub(crate) fn init(db_pool: SqlitePool) -> Result<(Extension, JoinHandle<()>), ()> {
|
||
|
|
let (worker_mailbox, rx) = std::sync::mpsc::channel::<(
|
||
|
|
oneshot::Sender<ElmResult<_, _>>,
|
||
|
|
Vec<(bool, String, Vec<String>)>,
|
||
|
|
)>();
|
||
|
|
|
||
|
|
let sql_worker_thread = std::thread::spawn(move || {
|
||
|
|
let worker = tokio::runtime::Builder::new_current_thread()
|
||
|
|
.enable_all()
|
||
|
|
.build()
|
||
|
|
.unwrap();
|
||
|
|
loop {
|
||
|
|
if let Ok((response, queries)) = rx.recv() {
|
||
|
|
// I am not sure if I should only work on one database task at a time, or
|
||
|
|
// submit as many takes as possible. Just spawning the future onto this
|
||
|
|
// exectutor does not seem to work, even though the docs say the thread pool
|
||
|
|
// will poll the future until it completes.
|
||
|
|
let db_pool_clone = db_pool.clone();
|
||
|
|
let span = info_span!("inside sql queries futures");
|
||
|
|
let f = async move {
|
||
|
|
let _start = Instant::now();
|
||
|
|
let db_pool = db_pool_clone;
|
||
|
|
let mut result: Vec<Vec<String>> = vec![];
|
||
|
|
let mut failure: Option<AstridQueryError> = None;
|
||
|
|
for (fetch_all, sql, _args) in queries {
|
||
|
|
let mut acc = Vec::new();
|
||
|
|
if fetch_all {
|
||
|
|
let mut stream = sqlx::query(&sql).fetch(&db_pool);
|
||
|
|
loop {
|
||
|
|
match stream.next().await {
|
||
|
|
None => break,
|
||
|
|
Some(Ok(row)) => {
|
||
|
|
// Try to auto marshall the row into a javascript
|
||
|
|
// object to make it easier on our no-code users.
|
||
|
|
match try_marshal(&row) {
|
||
|
|
Ok(json) => acc.push(json.to_string()),
|
||
|
|
Err(err) => {
|
||
|
|
failure = Some(AstridQueryError::Execute {
|
||
|
|
sql: sql.clone(),
|
||
|
|
message: err.to_string(),
|
||
|
|
});
|
||
|
|
break;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
Some(Err(err)) => {
|
||
|
|
eprintln!("got fetch_all sql error {:?}", err);
|
||
|
|
failure = Some(AstridQueryError::Execute {
|
||
|
|
sql: sql.clone(),
|
||
|
|
message: err.to_string(),
|
||
|
|
});
|
||
|
|
break;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
result.push(acc);
|
||
|
|
} else {
|
||
|
|
match sqlx::query(&sql)
|
||
|
|
.fetch_one(&db_pool)
|
||
|
|
.await
|
||
|
|
.and_then(|row| try_marshal(&row).map(|json| json.to_string()))
|
||
|
|
{
|
||
|
|
Ok(s) => result.push(vec![s]),
|
||
|
|
Err(sqlx::Error::RowNotFound) => {
|
||
|
|
failure = Some(AstridQueryError::NotFound { sql });
|
||
|
|
}
|
||
|
|
Err(err) => {
|
||
|
|
eprintln!("got fetchOne sql error {:?}", err);
|
||
|
|
failure = Some(AstridQueryError::Execute {
|
||
|
|
sql,
|
||
|
|
message: err.to_string(),
|
||
|
|
});
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
if failure.is_some() {
|
||
|
|
break;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
if let Some(msg) = failure {
|
||
|
|
response.send(ElmResult::err(msg))
|
||
|
|
} else {
|
||
|
|
response.send(ElmResult::ok(result))
|
||
|
|
}
|
||
|
|
};
|
||
|
|
|
||
|
|
// I found it interesting that the runtime of the future from the viewpoint of
|
||
|
|
// tracing was around 230us for a trivial select 2 rows query, but walltime I
|
||
|
|
// measured was around 700us. So polling the future or waiting for file IO is
|
||
|
|
// more expensive than I thought.
|
||
|
|
worker.block_on(f.instrument(span)).unwrap();
|
||
|
|
} else {
|
||
|
|
break;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
});
|
||
|
|
let worker_mailbox_clone = worker_mailbox.clone();
|
||
|
|
let extension = Extension::builder()
|
||
|
|
.ops(vec![op_starmelon_batch_queries::decl()])
|
||
|
|
.state(move |state| {
|
||
|
|
state.put(worker_mailbox_clone.clone());
|
||
|
|
|
||
|
|
Ok(())
|
||
|
|
})
|
||
|
|
.build();
|
||
|
|
|
||
|
|
Ok((extension, sql_worker_thread))
|
||
|
|
}
|
||
|
|
|
||
|
|
#[deno_core::op]
|
||
|
|
fn op_starmelon_batch_queries(
|
||
|
|
state: &mut OpState,
|
||
|
|
queries: Vec<(bool, String, Vec<String>)>,
|
||
|
|
) -> Result<ElmResult<Vec<Vec<String>>, AstridQueryError>, deno_core::error::AnyError> {
|
||
|
|
let worker_mailbox_clone = state.borrow::<SQLWorkerMailbox>();
|
||
|
|
|
||
|
|
let worker_mailbox = worker_mailbox_clone.clone();
|
||
|
|
let (sender, receiver) = oneshot::channel::<ElmResult<Vec<Vec<String>>, _>>();
|
||
|
|
|
||
|
|
let span = info_span!("run sql");
|
||
|
|
let timing_guard = span.enter();
|
||
|
|
|
||
|
|
worker_mailbox.send((sender, queries)).unwrap();
|
||
|
|
let elm_result = receiver.recv().unwrap();
|
||
|
|
|
||
|
|
drop(timing_guard);
|
||
|
|
|
||
|
|
Ok(elm_result)
|
||
|
|
}
|
||
|
|
|
||
|
|
#[derive(Debug, Serialize, Deserialize)]
|
||
|
|
#[serde(tag = "$")]
|
||
|
|
enum AstridQueryError {
|
||
|
|
Execute {
|
||
|
|
#[serde(rename = "a")]
|
||
|
|
sql: String,
|
||
|
|
#[serde(rename = "b")]
|
||
|
|
message: String,
|
||
|
|
},
|
||
|
|
NotFound {
|
||
|
|
#[serde(rename = "a")]
|
||
|
|
sql: String,
|
||
|
|
},
|
||
|
|
}
|
||
|
|
|
||
|
|
fn try_marshal(row: &sqlx::sqlite::SqliteRow) -> Result<serde_json::Value, sqlx::Error> {
|
||
|
|
use serde_json::value::Map;
|
||
|
|
use serde_json::Value;
|
||
|
|
use sqlx::{Column, TypeInfo};
|
||
|
|
|
||
|
|
let mut object = Map::new();
|
||
|
|
for i in 0..row.len() {
|
||
|
|
let value = try_decode_column(row, i)?;
|
||
|
|
let key = row.column(i).name().to_owned();
|
||
|
|
object.insert(key, value);
|
||
|
|
}
|
||
|
|
|
||
|
|
Ok(Value::Object(object))
|
||
|
|
}
|
||
|
|
|
||
|
|
fn try_decode_column(
|
||
|
|
row: &sqlx::sqlite::SqliteRow,
|
||
|
|
i: usize,
|
||
|
|
) -> Result<serde_json::Value, sqlx::Error> {
|
||
|
|
let value = row
|
||
|
|
.try_get_raw(i)
|
||
|
|
.map_err(|err| sqlx::Error::ColumnDecode {
|
||
|
|
index: row.column(i).name().to_owned(),
|
||
|
|
source: Box::new(StringError(format!("{}", err))),
|
||
|
|
})?;
|
||
|
|
match value.type_info().name() {
|
||
|
|
"NULL" => {
|
||
|
|
if let Ok(_x) = row.try_get::<i64, _>(i) {
|
||
|
|
eprintln!("it was actually an int");
|
||
|
|
};
|
||
|
|
Ok(Value::Null)
|
||
|
|
}
|
||
|
|
"TEXT" => Ok(Value::String(row.try_get::<String, _>(i)?)),
|
||
|
|
"REAL" => {
|
||
|
|
let x = row.try_get::<f64, _>(i)?;
|
||
|
|
match Number::from_f64(x) {
|
||
|
|
Some(n) => Ok(Value::Number(n)),
|
||
|
|
None => {
|
||
|
|
Err(sqlx::Error::ColumnDecode {
|
||
|
|
index: row.column(i).name().to_owned(),
|
||
|
|
source: Box::new(StringError(format!("While parsing a SQL type `REAL` I expected a finite number but got {} instead ", x))),
|
||
|
|
})
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
//"BLOB" =>
|
||
|
|
"INTEGER" => Ok(Value::Number(row.try_get::<i64, _>(i)?.into())),
|
||
|
|
//"NUMERIC" =>
|
||
|
|
"BOOLEAN" => Ok(Value::Bool(row.try_get::<bool, _>(i)?)),
|
||
|
|
//"DATE" =>
|
||
|
|
//"TIME" =>
|
||
|
|
//"DATETIME" =>
|
||
|
|
unknown => Err(sqlx::Error::ColumnDecode {
|
||
|
|
index: row.column(i).name().to_owned(),
|
||
|
|
source: Box::new(StringError(format!(
|
||
|
|
"I don't know how to automatically convert the SQL type `{}`` into a JSON value.",
|
||
|
|
unknown
|
||
|
|
))),
|
||
|
|
}),
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
#[derive(Clone, Debug, PartialEq, Eq)]
|
||
|
|
pub struct StringError(String);
|
||
|
|
|
||
|
|
impl ::std::fmt::Display for StringError {
|
||
|
|
fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
|
||
|
|
self.0.fmt(f)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
impl std::error::Error for StringError {
|
||
|
|
#[inline]
|
||
|
|
fn description(&self) -> &str {
|
||
|
|
&self.0
|
||
|
|
}
|
||
|
|
}
|