From 3cf23637d5982e27d41e050d5dc8a338e8d2309d Mon Sep 17 00:00:00 2001 From: YetAnotherMinion Date: Fri, 7 Jan 2022 02:27:33 +0000 Subject: [PATCH] feat: add sqlite support to starmelon interpreter --- Cargo.toml | 8 + lib/sql/src/Astrid/Query.elm | 49 ++++++ src/elm.rs | 2 +- src/fixture.rs | 4 +- src/fixture/query.js | 306 +++++++++++++++++++++++++++++++++++ src/main.rs | 203 ++++++++++++++++++++--- src/reporting.rs | 18 ++- 7 files changed, 566 insertions(+), 24 deletions(-) create mode 100644 src/fixture/query.js diff --git a/Cargo.toml b/Cargo.toml index 3b0a72c..1c76d6e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,3 +37,11 @@ rusty_v8 = "0.32" futures = "0.3.15" serde_v8 = "0.15" +# Required to add sql query support to interpreter. Because deno expects sync +# ops to be synchronous, we have to use a second async executor to run the sqlx +# functions. I read the code for oneshot +# (https://github.com/faern/oneshot/commit/9aa237f185e1b65d61bf92c20350cf7bee0aa88b) +# and it looks reasonable. +sqlx = { version = "0.5", features = [ "sqlite", "macros", "runtime-tokio-rustls", "chrono", "json", "uuid" ] } +oneshot = "0.1.3" + diff --git a/lib/sql/src/Astrid/Query.elm b/lib/sql/src/Astrid/Query.elm index f93a33e..35724e7 100644 --- a/lib/sql/src/Astrid/Query.elm +++ b/lib/sql/src/Astrid/Query.elm @@ -1,8 +1,11 @@ module Astrid.Query exposing ( Query + , errorToString + , execute , fetch , fetchOne + , fetchOptional , map , map2 , map3 @@ -11,12 +14,54 @@ module Astrid.Query ) import Json.Decode +import Json.Encode import Array exposing (Array) {-| A value that knows how to load data from a SQL database. -} type Query a = Dummy +type Error + = Execute String String + | Decode String Int Json.Decode.Error + | Failure String + | NotFound String + +errorToString : Error -> String +errorToString error = + case error of + Execute sql message -> + "Execute `" ++ sql ++ "` failed `" ++ message ++ "`" + + Decode sql index decodeError -> + "Decode results for `" ++ sql ++ "` failed at index `" ++ (String.fromInt index) ++ "` with error " ++ Json.Decode.errorToString(decodeError) + + Failure message -> + "Failure `" ++ message ++ "`" + + NotFound sql -> + "NotFound `" ++ sql ++ "`" + +execute : Query a -> Result Error a +execute query = + dummyExecute + +{- Constructing the type inside this helper function will force Elm to generate +the javascript of the Query type constructor above the execute function +javascript. +-} +dummyExecute : Result Error a +dummyExecute = + let + query = Dummy + _ = Execute "" "" + _ = Decode "" 0 (Json.Decode.Failure "" Json.Encode.null) + _ = Failure "" + _ = NotFound "" + in + Err (Failure "This package only works inside the starmelon interpreter") + + fetch : String -> List String -> Json.Decode.Decoder a -> Query (Array a) fetch sql parameters decoder = Dummy @@ -25,6 +70,10 @@ fetchOne : String -> List String -> Json.Decode.Decoder a -> Query a fetchOne sql parameters decoder = Dummy +fetchOptional : String -> List String -> Json.Decode.Decoder a -> Query (Maybe a) +fetchOptional sql parameters decoder = + Dummy + map : (a -> value) -> Query a -> Query value map f a = Dummy diff --git a/src/elm.rs b/src/elm.rs index 9d41587..bf3afa3 100644 --- a/src/elm.rs +++ b/src/elm.rs @@ -69,7 +69,7 @@ pub fn load_interfaces( interfaces.insert(module_name, interface); } elmi::DependencyInterface::Private(package_name, unions, aliases) => { - println!("skipping private interface {}", package_name); + //println!("skipping private interface {}", package_name); //for (k, v) in unions { // println!(" {}", k); //} diff --git a/src/fixture.rs b/src/fixture.rs index 1769988..fd7d2b9 100644 --- a/src/fixture.rs +++ b/src/fixture.rs @@ -21,8 +21,6 @@ pub(crate) fn generate>( buffer.push_str("-- START CUSTOMIZED PART\n"); write!(buffer, "import {}\n\n", target_module.as_ref()).unwrap(); - // if the input type is none then we don't have to generate an apply, and the port input type - // is () match output { OutputType::String => buffer.push_str("encodeOutput = encodeString\n"), OutputType::Value => buffer.push_str("encodeOutput = encodeJson\n"), @@ -30,6 +28,8 @@ pub(crate) fn generate>( OutputType::Html => buffer.push_str("encodeOutput = encodeHtml\n"), } + // if the input type is none then we don't have to generate an apply, and the port input type + // is () match input { None => { buffer.push_str(&zero_arg_apply( diff --git a/src/fixture/query.js b/src/fixture/query.js new file mode 100644 index 0000000..55328d0 --- /dev/null +++ b/src/fixture/query.js @@ -0,0 +1,306 @@ +// CORE QUERIES + +function __Debug_print(object) { + //Deno.core.print(JSON.stringify(object)); + //Deno.core.print("\n"); +} + +function _Query_succeed(value) +{ + return { + $: 0, + a: value + }; +} + + +var _Query_fetchOptional = F3(function(sql, args, decoder) +{ + return { + $: 1, + b: sql, + c: args, + d: { $: 0, a: decoder } + }; +}); + +var _Query_fetchOne = F3(function(sql, args, decoder) +{ + return { + $: 2, + b: sql, + c: args, + d: { $: 1, a: decoder } + }; +}); + +var _Query_fetchAll = F3(function(sql, args, decoder) +{ + return { + $: 3, + b: sql, + c: args, + d: { $: 2, a: decoder } + }; +}); + +function _Query_mapMany(f, queries) +{ + return { + $: 4, + f: f, + g: queries + }; +} + +var _Query_andThen = F2(function(callback, query) +{ + return { + $: 5, + e: query, + h: callback + }; +}); + +var _Query_map1 = F2(function(f, q1) +{ + return _Query_mapMany(f, [q1]); +}); + +var _Query_map2 = F3(function(f, q1, q2) +{ + return _Query_mapMany(f, [q1, q2]); +}); + +var _Query_map3 = F4(function(f, q1, q2, q3) +{ + return _Query_mapMany(f, [q1, q2, q3]); +}); + +var _Query_map4 = F5(function(f, q1, q2, q3, q4) +{ + return _Query_mapMany(f, [q1, q2, q3, q4]); +}); + +var _Query_map5 = F6(function(f, q1, q2, q3, q4, q5) +{ + return _Query_mapMany(f, [q1, q2, q3, q4, q5]); +}); + +var _Query_map6 = F7(function(f, q1, q2, q3, q4, q5, q6) +{ + return _Query_mapMany(f, [q1, q2, q3, q4, q5, q6]); +}); + +var _Query_map7 = F8(function(f, q1, q2, q3, q4, q5, q6, q7) +{ + return _Query_mapMany(f, [q1, q2, q3, q4, q5, q6, q7]); +}); + +var _Query_map8 = F9(function(f, q1, q2, q3, q4, q5, q6, q7, q8) +{ + return _Query_mapMany(f, [q1, q2, q3, q4, q5, q6, q7, q8]); +}); + +// RUN + +function _Query_runDecoder(decoder, sql, xs) +{ + switch (decoder.$) { + case 0: + if (xs.length === 0) { + return $elm$core$Result$Ok($elm$core$Maybe$Nothing); + } + var result = _Json_runOnString.f(decoder.a, xs[0]); + + if (!$elm$core$Result$isOk(result)) + { + return $elm$core$Result$Err( + A3( + $author$project$Astrid$Query$Decode, + sql, + 0, + result.a + ) + ); + } + return $elm$core$Result$Ok($elm$core$Maybe$Just(result.a)); + + case 1: + if (xs.length === 0) { + return $elm$core$Result$Err($author$project$Astrid$Query$NotFound(sql)); + } + var result = _Json_runOnString.f(decoder.a, xs[0]); + + if (!$elm$core$Result$isOk(result)) + { + return $elm$core$Result$Err( + A3( + $author$project$Astrid$Query$Decode, + sql, + 0, + result.a + ) + ); + } + return result; + + case 2: + var len = xs.length; + var array = new Array(len); + for (var i = 0; i < len; i++) + { + var string = xs[i]; + try + { + var value = JSON.parse(string); + __Debug_print("parsed the json"); + __Debug_print(value); + __Debug_print(decoder); + var result = _Json_runHelp(decoder.a, value); + __Debug_print("result of parsing the json"); + __Debug_print(result); + if (!$elm$core$Result$isOk(result)) + { + return $elm$core$Result$Err( + A3( + $author$project$Astrid$Query$Decode, + sql, + i, + result.a + ) + ); + } + array[i] = result.a; + } + catch (e) + { + return $elm$core$Result$Err( + A3( + $author$project$Astrid$Query$Decode, + sql, + i, + A2( + $elm$json$Json$Decode$Failure, + 'This is not valid JSON! ' + e.message, _Json_wrap(string) + ) + ) + ); + } + } + return $elm$core$Result$Ok(_Json_toElmArray(array)); + } +} + +var _Query_execute = function(query) +{ + var queries = new Array; + var statements = new Array; + var decoders = new Array; + var values = new Array; + var callbacks = new Array; + + queries.push(query); + + while (true) { + var q; + while(q = queries.pop()) { + switch (q.$) { + case 0: + values.push(q.a); + break; + + case 1: + case 2: + case 3: + var moreThanOneRow = (q.$ === 3) + var sql = q.b; + var bindParameters = _List_toArray(q.c); + var decoder = q.d; + statements.push([moreThanOneRow, sql, bindParameters]); + decoders.push(decoder); + break; + + case 4: + callbacks.push({ $:'Map', a: q.f }) + // We know that the list of queries is limited to length 8, + // which is much less then browser's stack overflow limits that + // start around 100,000 elements. + Array.prototype.push.apply(queries, q.g); + break; + + case 5: + callbacks.push({ $:'AndThen', a: q.h }) + queries.push(q.e) + break; + } + } + + if (statements.length > 0) { + __Debug_print("statements = "); + __Debug_print(statements); + var queryResult = Deno.core.opSync( + 'op_starmelon_batch_queries', + statements, + ); + __Debug_print(queryResult); + // I am assuming here that the Rust code is serializing the same + // structure that the Elm compiler thinks we have. + if (!$elm$core$Result$isOk(queryResult)) { + return queryResult + } + var results = queryResult.a; + + var len = results.length; + for (var i = 0; i < len; i++) { + var result = _Query_runDecoder(decoders[i], statements[i][1], results[i]) + if (!$elm$core$Result$isOk(result)) + { + return result + } + values.push(result.a); + } + statements.length = 0; + decoders.length = 0; + } + + __Debug_print("processing callbacks stack"); + + reduce: + while(callbacks.length > 0) { + var last = callbacks[callbacks.length - 1]; + switch (last.$) { + case 'Map': + var arity = last.a.a; + if (values.length < arity) { + // This implies that queries.length > 0 because we must + // have a way to generate the missing value(s) to call + // this function. + break reduce; + } + callbacks.pop(); + // Directly call the wrapped Elm function since we know all + // the arguments + var fun = last.a.f; + var args = values.slice(-arity); + values.length = values.length - args.length; + values.push(Function.prototype.apply(fun, args)); + break; + + case 'AndThen': + callbacks.pop(); + var fun = last.a.f; + // I think that if we have an AndThen then we will always have at least one value + queries.push(fun(values.pop())); + break; + } + } + + if (queries.length == 0 && callbacks.length == 0) { + // values must have one element in it. + return $elm$core$Result$Ok(values.pop()) + } + } +}; + +var $author$project$Astrid$Query$execute = _Query_execute; diff --git a/src/main.rs b/src/main.rs index ba83aa9..89318f8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,10 +1,13 @@ extern crate naive_wadler_prettier as pretty; use crate::reporting::{CompilerError, InterpreterError, Problem, SetupError, TypeError}; use crate::timings::Timings; +use deno_core::futures::StreamExt; use elm_project_utils::ChecksumConstraint; use os_pipe::dup_stderr; use pretty::pretty; use serde::{Deserialize, Serialize}; +use sqlx::sqlite::SqlitePool; +use sqlx::Row; use std::cell::RefCell; use std::fs::{self, canonicalize}; use std::hash::Hasher; @@ -15,7 +18,7 @@ use std::sync::Arc; use std::time::Instant; use structopt::StructOpt; use tokio; -use tracing::info_span; +use tracing::{info_span, Instrument}; mod elm; mod fixture; @@ -23,6 +26,23 @@ mod reporting; mod timings; mod transpile; +#[derive(Serialize)] +#[serde(tag = "$")] +pub(crate) enum ElmResult { + Ok { a: T }, + Err { a: E }, +} + +impl ElmResult { + fn ok(a: T) -> Self { + Self::Ok { a } + } + + fn err(a: E) -> Self { + Self::Err { a } + } +} + fn main() { let args = Arguments::from_args(); @@ -38,11 +58,12 @@ fn main() { input, output, verbosity, + sqlite, } => { let start = Instant::now(); let span = info_span!("exec"); let timing_guard = span.enter(); - let result = exec(file, debug, function, input, output, verbosity); + let result = exec(file, debug, function, input, output, verbosity, sqlite); drop(timing_guard); if let Err(problem) = result { let span = info_span!("pretty print problem"); @@ -88,6 +109,7 @@ fn exec( input_source: Option, output: Option, verbosity: u64, + sqlite_path: Option, ) -> Result<(), Problem> { // Our first elm make call is where we build the users program. There is a pretty good chance // this won't work. @@ -219,7 +241,6 @@ fn exec( let final_script = (|| { let mut final_script = data .replace("'REPLACE_ME_WITH_JSON_STRINGIFY'", "JSON.stringify(x)") - //.replace("setTimeout", "(function(x, y) { globalThis.__bootstrap.timers.setTimeout(x, y)})") .replace( "$elm$json$Json$Decode$fail('REPLACE_ME_WITH_BYTES_DECODER');", r#" _Json_decodePrim(function(value) { @@ -230,6 +251,35 @@ fn exec( ) .replace(";}(this));", ";}(globalThis));"); + if sqlite_path.is_some() { + final_script = final_script + .replace( + "var $author$project$Astrid$Query$execute = function (query) {\n\treturn $author$project$Astrid$Query$dummyExecute;\n};", + include_str!("fixture/query.js"), + ) + .replace( + "var $author$project$Astrid$Query$fetch = F3(\n\tfunction (sql, parameters, decoder) {\n\t\treturn $author$project$Astrid$Query$Dummy;\n\t});", + "var $author$project$Astrid$Query$fetch = _Query_fetchAll;", + ) + .replace( + r#"var $author$project$Astrid$Query$map2 = {$: 'Dummy'};"#, + r#"var $author$project$Astrid$Query$map2 = _Query_map2;"#, + ); + + // final_script.replace("var $author$project$Astrid$Query$run = ", "JSON.stringify(x)"); + final_script.push_str("\n\n"); + //final_script.push_str(r#" + // Deno.core.print(JSON.stringify( + // Deno.core.opSync( + // 'op_starmelon_batch_queries', + // [ [true, "select json_object('id', id, 'foo', foo) from foobar", []] + // , [false, "select json_object('id', id, 'foo', foo) from foobar", []] + // ] + // ) + // )) + //"#); + } + final_script.push_str("\n\n"); // I think that when I set this script to be the main module, I am skipping the // deno/runtime/js/99_main.js script that sets up a bunch of global variables. If I @@ -237,19 +287,19 @@ fn exec( // NB. there are 706 lines of setup code that add a bunch of apis to the global window // scope. Figure out if I need to include all of them. For example, starmelon does not need // to perform http calls right now, but I eventually want to. - final_script.push_str("const { setTimeout } = globalThis.__bootstrap.timers;"); + final_script.push_str("const { setTimeout } = globalThis.__bootstrap.timers;\n"); final_script.push_str( "Deno.core.setMacrotaskCallback(globalThis.__bootstrap.timers.handleTimerMacrotask);\n", ); - final_script.push_str("globalThis.setTimeout = setTimeout;"); + final_script.push_str("globalThis.setTimeout = setTimeout;\n"); final_script.push_str(&format!("var worker = Elm.{}.init();\n", &gen_module_name)); - // add a short cut for invoking the function so I don't have to traverse so many object + // add a shortcut for invoking the function so I don't have to traverse so many object // lookups using the rust v8 API. match input_type { None => { final_script.push_str( - "globalThis.runOnInput = function() { worker.ports.onInput.send(null)) };\n", + "globalThis.runOnInput = function() { worker.ports.onInput.send(null) };\n", ); } Some(InputType::Value) => { @@ -267,8 +317,7 @@ fn exec( globalThis.runOnInput = function(data) { const dv = new DataView(data.buffer) worker.ports.onInput.send(dv) - }; - "#, + };"#, ); } } @@ -326,6 +375,17 @@ fn exec( std::fs::write(&final_file, final_script) .map_err(|io_err| CompilerError::WriteOutputFailed(io_err, final_file.clone()))?; + // Create a tokio runtime before registering ops so we can block on futures inside sync ops + let span = info_span!("create tokio runtime"); + let timing_guard = span.enter(); + let sys = tokio::runtime::Builder::new_current_thread() + // The default number of additional threads for running blocking FnOnce is 512. + .max_blocking_threads(1) + .enable_all() + .build() + .unwrap(); + drop(timing_guard); + // step 10 create a v8 isolate. We need to register a different callback depending on // the output type (string, or bytes) @@ -370,6 +430,107 @@ fn exec( }), ); + // Step 10.B setup the sqlite database feature + let sql_background_thread_handle = if let Some(database_url) = sqlite_path { + // I want to construct the connection in the initial thread so I can tell if the connection + // failed + let db_pool = sys + .block_on(async { SqlitePool::connect(&database_url.to_string_lossy()).await }) + .unwrap(); + + let (worker_mailbox, rx) = std::sync::mpsc::channel::<( + oneshot::Sender>, + Vec<(bool, String, Vec)>, + )>(); + + let sql_worker_thread = std::thread::spawn(move || { + let worker = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + loop { + if let Ok((response, queries)) = rx.recv() { + // I am not sure if I should only work on one database task at a time, or + // submit as many takes as possible. Just spawning the future onto this + // exectutor does not seem to work, even though the docs say the thread pool + // will poll the future until it completes. + let db_pool_clone = db_pool.clone(); + let span = info_span!("inside sql queries futures"); + let f = async move { + let start = Instant::now(); + let db_pool = db_pool_clone; + let mut result: Vec> = vec![]; + for (fetch_all, sql, _args) in queries { + let mut acc = Vec::new(); + if fetch_all { + let mut stream = sqlx::query(&sql).fetch(&db_pool); + loop { + match stream.next().await { + None => break, + Some(Ok(row)) => { + match row.try_get::(0) { + Ok(s) => acc.push(s), + // TODO set an error flag before returning this one + Err(_) => break, + }; + } + Some(Err(err)) => { + eprintln!("got fetch_all sql error {:?}", err); + break; + } + } + } + result.push(acc); + } else { + let s = sqlx::query(&sql) + .fetch_one(&db_pool) + .await + .and_then(|row| row.try_get::(0)) + .unwrap(); + result.push(vec![s]); + } + } + response.send(ElmResult::ok(result)) + }; + + // I found it interesting that the runtime of the future from the viewpoint of + // tracing was around 230us for a trivial select 2 rows query, but walltime I + // measured was around 700us. So polling the future or waiting for file IO is + // more expensive than I thought. + worker.block_on(f.instrument(span)); + } else { + break; + } + } + }); + let worker_mailbox_clone = worker_mailbox.clone(); + + worker.js_runtime.register_op( + "op_starmelon_batch_queries", + deno_core::op_sync( + move |_state, queries: Vec<(bool, String, Vec)>, _: ()| { + let worker_mailbox = worker_mailbox_clone.clone(); + let (sender, receiver) = + oneshot::channel::>, String>>(); + + let span = info_span!("run sql"); + let timing_guard = span.enter(); + + worker_mailbox.send((sender, queries)).unwrap(); + let elm_result = receiver.recv().unwrap(); + + drop(timing_guard); + + Ok(elm_result) + }, + ), + ); + + Some((worker_mailbox, sql_worker_thread)) + } else { + None + }; + worker.js_runtime.sync_ops_cache(); // step 11 marshal the input into the v8 isolate. If we are reading from an @@ -411,15 +572,6 @@ fn exec( } }; - let _start = Instant::now(); - let span = info_span!("create tokio runtime"); - let timing_guard = span.enter(); - let sys = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); - drop(timing_guard); - let span = info_span!("eval javascript"); let timing_guard = span.enter(); sys.block_on(async move { runtime::xyz(worker, main_module, input).await })?; @@ -451,6 +603,11 @@ fn exec( None => println!("nothing in the mailbox"), } + if let Some((tx, thread_handle)) = sql_background_thread_handle { + drop(tx); + thread_handle.join(); + } + Ok(()) } @@ -506,6 +663,8 @@ enum Arguments { output: Option, #[structopt(short = "v", parse(from_occurrences))] verbosity: u64, + #[structopt(long)] + sqlite: Option, }, Transpile { #[structopt(parse(from_os_str))] @@ -561,7 +720,12 @@ fn resolve_function_type(tipe: &elmi::Type) -> Result<(Option, Output Err(TypeError::CantEvalType(tipe.clone())) } - elmi::Type::TType(_, _, _) => Err(TypeError::CantEvalCustomType), + elmi::Type::TType(module_name, name, args) => { + if module_name == "elm/virtual-dom/VirtualDom" && name == "Node" { + return Ok((None, OutputType::Html)); + } + Err(TypeError::CantEvalCustomType) + } elmi::Type::TRecord(_, _) => Err(TypeError::CantEvalRecord), elmi::Type::TUnit => Err(TypeError::CantEvalUnit), elmi::Type::TTuple(_, _, _) => Err(TypeError::CantEvalTuple), @@ -872,7 +1036,6 @@ mod runtime { let this = v8::undefined(scope).into(); - let _start = Instant::now(); let span = info_span!("dispatch v8 call"); let timing_guard = span.enter(); match input { diff --git a/src/reporting.rs b/src/reporting.rs index 3a62f5f..bfc4f05 100644 --- a/src/reporting.rs +++ b/src/reporting.rs @@ -123,7 +123,23 @@ impl From for InterpreterError { impl TypeError { pub fn to_doc(&self) -> Doc { - Doc::text("type error") + let mut title = "TYPE ERROR"; + use TypeError::*; + let message = match self { + CantEvalRecord => Doc::text("cant eval record"), + CantEvalUnit => Doc::text("can't eval unit"), + CantEvalCustomType => reflow("can't eval custom type"), + _ => Doc::text("todo"), + // CantEvalHoleyAlias, + // CantEvalTuple, + // CantEvalGeneric, + // CantEvalType(elmi::Type), + // InputTypeNotSupported(elmi::Type), + // OutputTypeNotSupported(elmi::Type), + // EvalRequiresSingleArgument(elmi::Type), + }; + + vcat([to_message_bar(title, ""), Doc::text(""), message]) } }