Skip to content

Commit

Permalink
feat(query): add currentRun field to turbo query (#9196)
Browse files Browse the repository at this point in the history
### Description

We need the `currentRun` field to exist on `turbo query` to easily
generate types from the GraphQL server. This adds the field and also
deduplicates the server code so both `wui` and `query` use the same
`run_server` code

### Testing Instructions
  • Loading branch information
NicholasLYang authored Sep 30, 2024
1 parent 672218b commit c793376
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 62 deletions.
2 changes: 1 addition & 1 deletion crates/turborepo-lib/src/commands/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ pub async fn run(
}
}
} else {
query::run_server(run, handler).await?;
query::run_query_server(run, handler).await?;
}

Ok(0)
Expand Down
20 changes: 8 additions & 12 deletions crates/turborepo-lib/src/query/mod.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
mod file;
mod package;
mod server;

use std::{io, sync::Arc};

use async_graphql::{http::GraphiQLSource, *};
use async_graphql_axum::GraphQL;
use axum::{response, response::IntoResponse, routing::get, Router};
use axum::{response, response::IntoResponse};
use itertools::Itertools;
use miette::Diagnostic;
use package::Package;
pub use server::run_server;
use thiserror::Error;
use tokio::{net::TcpListener, select};
use tokio::select;
use turbo_trace::TraceError;
use turbopath::AbsoluteSystemPathBuf;
use turborepo_repository::package_graph::PackageName;
Expand Down Expand Up @@ -40,6 +41,8 @@ pub enum Error {
Run(#[from] crate::run::Error),
#[error(transparent)]
Path(#[from] turbopath::PathError),
#[error(transparent)]
UI(#[from] turborepo_ui::Error),
}

pub struct RepositoryQuery {
Expand Down Expand Up @@ -365,14 +368,7 @@ pub async fn graphiql() -> impl IntoResponse {
response::Html(GraphiQLSource::build().endpoint("/").finish())
}

pub async fn run_server(run: Run, signal: SignalHandler) -> Result<(), Error> {
let schema = Schema::new(
RepositoryQuery::new(Arc::new(run)),
EmptyMutation,
EmptySubscription,
);
let app = Router::new().route("/", get(graphiql).post_service(GraphQL::new(schema)));

pub async fn run_query_server(run: Run, signal: SignalHandler) -> Result<(), Error> {
let subscriber = signal.subscribe().ok_or(Error::NoSignalHandler)?;
println!("GraphiQL IDE: http://localhost:8000");
webbrowser::open("http://localhost:8000")?;
Expand All @@ -382,7 +378,7 @@ pub async fn run_server(run: Run, signal: SignalHandler) -> Result<(), Error> {
println!("Shutting down GraphQL server");
return Ok(());
}
result = axum::serve(TcpListener::bind("127.0.0.1:8000").await?, app) => {
result = server::run_server(None, Arc::new(run)) => {
result?;
}
}
Expand Down
45 changes: 45 additions & 0 deletions crates/turborepo-lib/src/query/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use std::sync::Arc;

use async_graphql::{EmptyMutation, EmptySubscription, MergedObject, Schema};
use async_graphql_axum::GraphQL;
use axum::{http::Method, routing::get, Router};
use tokio::net::TcpListener;
use tower_http::cors::{Any, CorsLayer};
use turborepo_ui::wui::query::SharedState;

use crate::{query, query::graphiql, run::Run};

#[derive(MergedObject)]
struct Query(turborepo_ui::wui::RunQuery, query::RepositoryQuery);

pub async fn run_server(
state: Option<SharedState>,
run: Arc<Run>,
) -> Result<(), turborepo_ui::Error> {
let cors = CorsLayer::new()
// allow `GET` and `POST` when accessing the resource
.allow_methods([Method::GET, Method::POST])
.allow_headers(Any)
// allow requests from any origin
.allow_origin(Any);

let web_ui_query = turborepo_ui::wui::RunQuery::new(state.clone());
let turbo_query = query::RepositoryQuery::new(run);
let combined_query = Query(web_ui_query, turbo_query);

let schema = Schema::new(combined_query, EmptyMutation, EmptySubscription);
let app = Router::new()
.route("/", get(graphiql).post_service(GraphQL::new(schema)))
.layer(cors);

axum::serve(
TcpListener::bind("127.0.0.1:8000")
.await
.map_err(turborepo_ui::wui::Error::Server)?,
app,
)
.await
.map_err(turborepo_ui::wui::Error::Server)?;

Ok(())
}
43 changes: 3 additions & 40 deletions crates/turborepo-lib/src/run/ui.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,8 @@
use std::sync::Arc;

use async_graphql::{EmptyMutation, EmptySubscription, MergedObject, Schema};
use async_graphql_axum::GraphQL;
use axum::{http::Method, routing::get, Router};
use tokio::net::TcpListener;
use tower_http::cors::{Any, CorsLayer};
use turborepo_ui::wui::{event::WebUIEvent, server::SharedState};
use turborepo_ui::wui::{event::WebUIEvent, query::SharedState};

use crate::{query, query::graphiql, run::Run};
use crate::{query, run::Run};

pub async fn start_web_ui_server(
rx: tokio::sync::mpsc::UnboundedReceiver<WebUIEvent>,
Expand All @@ -17,39 +12,7 @@ pub async fn start_web_ui_server(
let subscriber = turborepo_ui::wui::subscriber::Subscriber::new(rx);
tokio::spawn(subscriber.watch(state.clone()));

run_server(state.clone(), run).await?;

Ok(())
}

#[derive(MergedObject)]
struct Query(turborepo_ui::wui::RunQuery, query::RepositoryQuery);

async fn run_server(state: SharedState, run: Arc<Run>) -> Result<(), turborepo_ui::Error> {
let cors = CorsLayer::new()
// allow `GET` and `POST` when accessing the resource
.allow_methods([Method::GET, Method::POST])
.allow_headers(Any)
// allow requests from any origin
.allow_origin(Any);

let web_ui_query = turborepo_ui::wui::RunQuery::new(state.clone());
let turbo_query = query::RepositoryQuery::new(run);
let combined_query = Query(web_ui_query, turbo_query);

let schema = Schema::new(combined_query, EmptyMutation, EmptySubscription);
let app = Router::new()
.route("/", get(graphiql).post_service(GraphQL::new(schema)))
.layer(cors);

axum::serve(
TcpListener::bind("127.0.0.1:8000")
.await
.map_err(turborepo_ui::wui::Error::Server)?,
app,
)
.await
.map_err(turborepo_ui::wui::Error::Server)?;
query::run_server(Some(state.clone()), run).await?;

Ok(())
}
4 changes: 2 additions & 2 deletions crates/turborepo-ui/src/wui/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
//! by a web client to display the status of tasks.

pub mod event;
pub mod query;
pub mod sender;
pub mod server;
pub mod subscriber;

use event::WebUIEvent;
pub use server::RunQuery;
pub use query::RunQuery;
use thiserror::Error;

#[derive(Debug, Error)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,22 @@ pub type SharedState = Arc<Mutex<WebUIState>>;

/// The query for actively running tasks (as opposed to the query for general
/// repository state `RepositoryQuery` in `turborepo_lib::query`)
/// This is `None` when we're not actually running a task (e.g. `turbo query`)
pub struct RunQuery {
state: SharedState,
state: Option<SharedState>,
}

impl RunQuery {
pub fn new(state: SharedState) -> Self {
pub fn new(state: Option<SharedState>) -> Self {
Self { state }
}
}

#[Object]
impl RunQuery {
async fn current_run(&self) -> CurrentRun {
CurrentRun { state: &self.state }
async fn current_run(&self) -> Option<CurrentRun> {
Some(CurrentRun {
state: self.state.as_ref()?,
})
}
}
6 changes: 3 additions & 3 deletions crates/turborepo-ui/src/wui/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use tokio::sync::Mutex;

use crate::{
tui::event::{CacheResult, TaskResult},
wui::{event::WebUIEvent, server::SharedState},
wui::{event::WebUIEvent, query::SharedState},
};

/// Subscribes to the Web UI events and updates the state
Expand Down Expand Up @@ -150,7 +150,7 @@ mod test {
use super::*;
use crate::{
tui::event::OutputLogs,
wui::{sender::WebUISender, server::RunQuery},
wui::{query::RunQuery, sender::WebUISender},
};

#[tokio::test]
Expand Down Expand Up @@ -200,7 +200,7 @@ mod test {
);

// Now let's check with the GraphQL API
let schema = Schema::new(RunQuery::new(state), EmptyMutation, EmptySubscription);
let schema = Schema::new(RunQuery::new(Some(state)), EmptyMutation, EmptySubscription);
let result = schema
.execute("query { currentRun { tasks { name state { status } } } }")
.await;
Expand Down

0 comments on commit c793376

Please sign in to comment.