use deno_core::futures::StreamExt; use deno_core::{Extension, OpState}; use elm_project_utils::ElmResult; use serde::{Deserialize, Serialize}; use serde_json::{value::Map, value::Number, Value}; use sqlx::{sqlite::SqlitePool, Column, Row, TypeInfo, ValueRef}; use std::thread::JoinHandle; use std::time::Instant; use tokio; use tracing::{info_span, Instrument}; type SQLWorkerMailbox = std::sync::mpsc::Sender<( oneshot::Sender>, AstridQueryError>>, Vec<(bool, String, Vec)>, )>; pub(crate) fn init(db_pool: SqlitePool) -> Result<(Extension, JoinHandle<()>), ()> { let (worker_mailbox, rx) = std::sync::mpsc::channel::<( oneshot::Sender>, Vec<(bool, String, Vec)>, )>(); let sql_worker_thread = std::thread::spawn(move || { let worker = tokio::runtime::Builder::new_current_thread() .enable_all() .build() .unwrap(); loop { if let Ok((response, queries)) = rx.recv() { // I am not sure if I should only work on one database task at a time, or // submit as many takes as possible. Just spawning the future onto this // exectutor does not seem to work, even though the docs say the thread pool // will poll the future until it completes. let db_pool_clone = db_pool.clone(); let span = info_span!("inside sql queries futures"); let f = async move { let _start = Instant::now(); let db_pool = db_pool_clone; let mut result: Vec> = vec![]; let mut failure: Option = None; for (fetch_all, sql, _args) in queries { let mut acc = Vec::new(); if fetch_all { let mut stream = sqlx::query(&sql).fetch(&db_pool); loop { match stream.next().await { None => break, Some(Ok(row)) => { // Try to auto marshall the row into a javascript // object to make it easier on our no-code users. match try_marshal(&row) { Ok(json) => acc.push(json.to_string()), Err(err) => { failure = Some(AstridQueryError::Execute { sql: sql.clone(), message: err.to_string(), }); break; } } } Some(Err(err)) => { eprintln!("got fetch_all sql error {:?}", err); failure = Some(AstridQueryError::Execute { sql: sql.clone(), message: err.to_string(), }); break; } } } result.push(acc); } else { match sqlx::query(&sql) .fetch_one(&db_pool) .await .and_then(|row| try_marshal(&row).map(|json| json.to_string())) { Ok(s) => result.push(vec![s]), Err(sqlx::Error::RowNotFound) => { failure = Some(AstridQueryError::NotFound { sql }); } Err(err) => { eprintln!("got fetchOne sql error {:?}", err); failure = Some(AstridQueryError::Execute { sql, message: err.to_string(), }); } } } if failure.is_some() { break; } } if let Some(msg) = failure { response.send(ElmResult::err(msg)) } else { response.send(ElmResult::ok(result)) } }; // I found it interesting that the runtime of the future from the viewpoint of // tracing was around 230us for a trivial select 2 rows query, but walltime I // measured was around 700us. So polling the future or waiting for file IO is // more expensive than I thought. worker.block_on(f.instrument(span)).unwrap(); } else { break; } } }); let worker_mailbox_clone = worker_mailbox.clone(); let extension = Extension::builder() .ops(vec![op_starmelon_batch_queries::decl()]) .state(move |state| { state.put(worker_mailbox_clone.clone()); Ok(()) }) .build(); Ok((extension, sql_worker_thread)) } #[deno_core::op] fn op_starmelon_batch_queries( state: &mut OpState, queries: Vec<(bool, String, Vec)>, ) -> Result>, AstridQueryError>, deno_core::error::AnyError> { let worker_mailbox_clone = state.borrow::(); let worker_mailbox = worker_mailbox_clone.clone(); let (sender, receiver) = oneshot::channel::>, _>>(); let span = info_span!("run sql"); let timing_guard = span.enter(); worker_mailbox.send((sender, queries)).unwrap(); let elm_result = receiver.recv().unwrap(); drop(timing_guard); Ok(elm_result) } #[derive(Debug, Serialize, Deserialize)] #[serde(tag = "$")] enum AstridQueryError { Execute { #[serde(rename = "a")] sql: String, #[serde(rename = "b")] message: String, }, NotFound { #[serde(rename = "a")] sql: String, }, } fn try_marshal(row: &sqlx::sqlite::SqliteRow) -> Result { let mut object = Map::new(); for i in 0..row.len() { let value = try_decode_column(row, i)?; let key = row.column(i).name().to_owned(); object.insert(key, value); } Ok(Value::Object(object)) } fn try_decode_column( row: &sqlx::sqlite::SqliteRow, i: usize, ) -> Result { let value = row .try_get_raw(i) .map_err(|err| sqlx::Error::ColumnDecode { index: row.column(i).name().to_owned(), source: Box::new(StringError(format!("{}", err))), })?; match value.type_info().name() { "NULL" => { if let Ok(_x) = row.try_get::(i) { eprintln!("it was actually an int"); }; Ok(Value::Null) } "TEXT" => Ok(Value::String(row.try_get::(i)?)), "REAL" => { let x = row.try_get::(i)?; match Number::from_f64(x) { Some(n) => Ok(Value::Number(n)), None => { Err(sqlx::Error::ColumnDecode { index: row.column(i).name().to_owned(), source: Box::new(StringError(format!("While parsing a SQL type `REAL` I expected a finite number but got {} instead ", x))), }) } } } //"BLOB" => "INTEGER" => Ok(Value::Number(row.try_get::(i)?.into())), //"NUMERIC" => "BOOLEAN" => Ok(Value::Bool(row.try_get::(i)?)), //"DATE" => //"TIME" => //"DATETIME" => unknown => Err(sqlx::Error::ColumnDecode { index: row.column(i).name().to_owned(), source: Box::new(StringError(format!( "I don't know how to automatically convert the SQL type `{}`` into a JSON value.", unknown ))), }), } } #[derive(Clone, Debug, PartialEq, Eq)] pub struct StringError(String); impl ::std::fmt::Display for StringError { fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { self.0.fmt(f) } } impl std::error::Error for StringError { #[inline] fn description(&self) -> &str { &self.0 } }