Skip to content

Commit

Permalink
implement Worker.terminate() and self.close() (#4684)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartlomieju authored Apr 9, 2020
1 parent ac215a2 commit be71885
Show file tree
Hide file tree
Showing 12 changed files with 370 additions and 102 deletions.
11 changes: 6 additions & 5 deletions cli/compilers/ts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use crate::startup_data;
use crate::state::*;
use crate::tokio_util;
use crate::version;
use crate::web_worker::WebWorkerHandle;
use crate::worker::WorkerEvent;
use crate::worker::WorkerHandle;
use deno_core::Buf;
use deno_core::ErrBox;
use deno_core::ModuleSpecifier;
Expand Down Expand Up @@ -609,7 +609,7 @@ async fn execute_in_thread(
req: Buf,
) -> Result<Buf, ErrBox> {
let (handle_sender, handle_receiver) =
std::sync::mpsc::sync_channel::<Result<WorkerHandle, ErrBox>>(1);
std::sync::mpsc::sync_channel::<Result<WebWorkerHandle, ErrBox>>(1);
let builder =
std::thread::Builder::new().name("deno-ts-compiler".to_string());
let join_handle = builder.spawn(move || {
Expand All @@ -618,15 +618,16 @@ async fn execute_in_thread(
drop(handle_sender);
tokio_util::run_basic(worker).expect("Panic in event loop");
})?;
let mut handle = handle_receiver.recv().unwrap()?;
handle.post_message(req).await?;
let handle = handle_receiver.recv().unwrap()?;
handle.post_message(req)?;
let event = handle.get_event().await.expect("Compiler didn't respond");
let buf = match event {
WorkerEvent::Message(buf) => Ok(buf),
WorkerEvent::Error(error) => Err(error),
WorkerEvent::TerminalError(error) => Err(error),
}?;
// Shutdown worker and wait for thread to finish
handle.sender.close_channel();
handle.terminate();
join_handle.join().unwrap();
Ok(buf)
}
Expand Down
11 changes: 6 additions & 5 deletions cli/compilers/wasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use crate::global_state::GlobalState;
use crate::startup_data;
use crate::state::*;
use crate::tokio_util;
use crate::web_worker::WebWorkerHandle;
use crate::worker::WorkerEvent;
use crate::worker::WorkerHandle;
use deno_core::Buf;
use deno_core::ErrBox;
use deno_core::ModuleSpecifier;
Expand Down Expand Up @@ -118,7 +118,7 @@ async fn execute_in_thread(
req: Buf,
) -> Result<Buf, ErrBox> {
let (handle_sender, handle_receiver) =
std::sync::mpsc::sync_channel::<Result<WorkerHandle, ErrBox>>(1);
std::sync::mpsc::sync_channel::<Result<WebWorkerHandle, ErrBox>>(1);
let builder =
std::thread::Builder::new().name("deno-wasm-compiler".to_string());
let join_handle = builder.spawn(move || {
Expand All @@ -127,15 +127,16 @@ async fn execute_in_thread(
drop(handle_sender);
tokio_util::run_basic(worker).expect("Panic in event loop");
})?;
let mut handle = handle_receiver.recv().unwrap()?;
handle.post_message(req).await?;
let handle = handle_receiver.recv().unwrap()?;
handle.post_message(req)?;
let event = handle.get_event().await.expect("Compiler didn't respond");
let buf = match event {
WorkerEvent::Message(buf) => Ok(buf),
WorkerEvent::Error(error) => Err(error),
WorkerEvent::TerminalError(error) => Err(error),
}?;
// Shutdown worker and wait for thread to finish
handle.sender.close_channel();
handle.terminate();
join_handle.join().unwrap();
Ok(buf)
}
Expand Down
8 changes: 8 additions & 0 deletions cli/js/web/workers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,14 @@ export class WorkerImpl extends EventTarget implements Worker {

const type = event.type;

if (type === "terminalError") {
this.#terminated = true;
if (!this.#handleError(event.error)) {
throw Error(event.error.message);
}
continue;
}

if (type === "msg") {
if (this.onmessage) {
const message = decodeMessage(new Uint8Array(event.data));
Expand Down
44 changes: 39 additions & 5 deletions cli/ops/web_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ use super::dispatch_json::{JsonOp, Value};
use crate::op_error::OpError;
use crate::ops::json_op;
use crate::state::State;
use crate::web_worker::WebWorkerHandle;
use crate::worker::WorkerEvent;
use deno_core::*;
use futures::channel::mpsc;
use futures::sink::SinkExt;
use std::convert::From;

pub fn web_worker_op<D>(
Expand All @@ -25,7 +25,32 @@ where
-> Result<JsonOp, OpError> { dispatcher(&sender, args, zero_copy) }
}

pub fn init(i: &mut Isolate, s: &State, sender: &mpsc::Sender<WorkerEvent>) {
pub fn web_worker_op2<D>(
handle: WebWorkerHandle,
sender: mpsc::Sender<WorkerEvent>,
dispatcher: D,
) -> impl Fn(Value, Option<ZeroCopyBuf>) -> Result<JsonOp, OpError>
where
D: Fn(
WebWorkerHandle,
&mpsc::Sender<WorkerEvent>,
Value,
Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError>,
{
move |args: Value,
zero_copy: Option<ZeroCopyBuf>|
-> Result<JsonOp, OpError> {
dispatcher(handle.clone(), &sender, args, zero_copy)
}
}

pub fn init(
i: &mut Isolate,
s: &State,
sender: &mpsc::Sender<WorkerEvent>,
handle: WebWorkerHandle,
) {
i.register_op(
"op_worker_post_message",
s.core_op(json_op(web_worker_op(
Expand All @@ -35,7 +60,11 @@ pub fn init(i: &mut Isolate, s: &State, sender: &mpsc::Sender<WorkerEvent>) {
);
i.register_op(
"op_worker_close",
s.core_op(json_op(web_worker_op(sender.clone(), op_worker_close))),
s.core_op(json_op(web_worker_op2(
handle,
sender.clone(),
op_worker_close,
))),
);
}

Expand All @@ -47,18 +76,23 @@ fn op_worker_post_message(
) -> Result<JsonOp, OpError> {
let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice();
let mut sender = sender.clone();
let fut = sender.send(WorkerEvent::Message(d));
futures::executor::block_on(fut).expect("Failed to post message to host");
sender
.try_send(WorkerEvent::Message(d))
.expect("Failed to post message to host");
Ok(JsonOp::Sync(json!({})))
}

/// Notify host that guest worker closes
fn op_worker_close(
handle: WebWorkerHandle,
sender: &mpsc::Sender<WorkerEvent>,
_args: Value,
_data: Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError> {
let mut sender = sender.clone();
// Notify parent that we're finished
sender.close_channel();
// Terminate execution of current worker
handle.terminate();
Ok(JsonOp::Sync(json!({})))
}
67 changes: 53 additions & 14 deletions cli/ops/worker_host.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
use crate::fmt_errors::JSError;
use crate::futures::SinkExt;
use crate::global_state::GlobalState;
use crate::op_error::OpError;
use crate::permissions::DenoPermissions;
use crate::startup_data;
use crate::state::State;
use crate::tokio_util::create_basic_runtime;
use crate::web_worker::WebWorker;
use crate::web_worker::WebWorkerHandle;
use crate::worker::WorkerEvent;
use crate::worker::WorkerHandle;
use deno_core::*;
use futures::future::FutureExt;
use futures::future::TryFutureExt;
use std::convert::From;
use std::thread::JoinHandle;

Expand Down Expand Up @@ -58,9 +56,9 @@ fn run_worker_thread(
specifier: ModuleSpecifier,
has_source_code: bool,
source_code: String,
) -> Result<(JoinHandle<()>, WorkerHandle), ErrBox> {
) -> Result<(JoinHandle<()>, WebWorkerHandle), ErrBox> {
let (handle_sender, handle_receiver) =
std::sync::mpsc::sync_channel::<Result<WorkerHandle, ErrBox>>(1);
std::sync::mpsc::sync_channel::<Result<WebWorkerHandle, ErrBox>>(1);

let builder =
std::thread::Builder::new().name(format!("deno-worker-{}", name));
Expand All @@ -78,6 +76,7 @@ fn run_worker_thread(
}

let mut worker = result.unwrap();
let name = worker.name.to_string();
// Send thread safe handle to newly created worker to host thread
handle_sender.send(Ok(worker.thread_safe_handle())).unwrap();
drop(handle_sender);
Expand Down Expand Up @@ -109,7 +108,8 @@ fn run_worker_thread(

if let Err(e) = result {
let mut sender = worker.internal_channels.sender.clone();
futures::executor::block_on(sender.send(WorkerEvent::Error(e)))
sender
.try_send(WorkerEvent::TerminalError(e))
.expect("Failed to post message to host");

// Failure to execute script is a terminal error, bye, bye.
Expand All @@ -120,6 +120,7 @@ fn run_worker_thread(
// that means that we should store JoinHandle to thread to ensure
// that it actually terminates.
rt.block_on(worker).expect("Panic in event loop");
debug!("Worker thread shuts down {}", &name);
})?;

let worker_handle = handle_receiver.recv().unwrap()?;
Expand Down Expand Up @@ -205,6 +206,28 @@ fn op_host_terminate_worker(
fn serialize_worker_event(event: WorkerEvent) -> Value {
match event {
WorkerEvent::Message(buf) => json!({ "type": "msg", "data": buf }),
WorkerEvent::TerminalError(error) => {
let mut serialized_error = json!({
"type": "terminalError",
"error": {
"message": error.to_string(),
}
});

if let Ok(js_error) = error.downcast::<JSError>() {
serialized_error = json!({
"type": "terminalError",
"error": {
"message": js_error.message,
"fileName": js_error.script_resource_name,
"lineNumber": js_error.line_number,
"columnNumber": js_error.start_column,
}
});
}

serialized_error
}
WorkerEvent::Error(error) => {
let mut serialized_error = json!({
"type": "error",
Expand Down Expand Up @@ -247,13 +270,30 @@ fn op_host_get_message(
let state_ = state.clone();
let op = async move {
let response = match worker_handle.get_event().await {
Some(event) => serialize_worker_event(event),
Some(event) => {
// Terminal error means that worker should be removed from worker table.
if let WorkerEvent::TerminalError(_) = &event {
let mut state_ = state_.borrow_mut();
if let Some((join_handle, mut worker_handle)) =
state_.workers.remove(&id)
{
worker_handle.sender.close_channel();
join_handle.join().expect("Worker thread panicked");
}
}
serialize_worker_event(event)
}
None => {
// Worker shuts down
let mut state_ = state_.borrow_mut();
let (join_handle, mut worker_handle) =
state_.workers.remove(&id).expect("No worker handle found");
worker_handle.sender.close_channel();
join_handle.join().expect("Worker thread panicked");
// Try to remove worker from workers table - NOTE: `Worker.terminate()` might have been called
// already meaning that we won't find worker in table - in that case ignore.
if let Some((join_handle, mut worker_handle)) =
state_.workers.remove(&id)
{
worker_handle.sender.close_channel();
join_handle.join().expect("Worker thread panicked");
}
json!({ "type": "close" })
}
};
Expand All @@ -276,9 +316,8 @@ fn op_host_post_message(
let state = state.borrow();
let (_, worker_handle) =
state.workers.get(&id).expect("No worker handle found");
let fut = worker_handle
worker_handle
.post_message(msg)
.map_err(|e| OpError::other(e.to_string()));
futures::executor::block_on(fut)?;
.map_err(|e| OpError::other(e.to_string()))?;
Ok(JsonOp::Sync(json!({})))
}
4 changes: 2 additions & 2 deletions cli/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::op_error::OpError;
use crate::ops::JsonOp;
use crate::ops::MinimalOp;
use crate::permissions::DenoPermissions;
use crate::worker::WorkerHandle;
use crate::web_worker::WebWorkerHandle;
use deno_core::Buf;
use deno_core::CoreOp;
use deno_core::ErrBox;
Expand Down Expand Up @@ -62,7 +62,7 @@ pub struct StateInner {
pub import_map: Option<ImportMap>,
pub metrics: Metrics,
pub global_timer: GlobalTimer,
pub workers: HashMap<u32, (JoinHandle<()>, WorkerHandle)>,
pub workers: HashMap<u32, (JoinHandle<()>, WebWorkerHandle)>,
pub next_worker_id: u32,
pub start_time: Instant,
pub seeded_rng: Option<StdRng>,
Expand Down
8 changes: 8 additions & 0 deletions cli/tests/subdir/busy_worker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
self.onmessage = function (_evt) {
// infinite loop
for (let i = 0; true; i++) {
if (i % 1000 == 0) {
postMessage(i);
}
}
};
21 changes: 21 additions & 0 deletions cli/tests/subdir/racy_worker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// See issue for details
// https:/denoland/deno/issues/4080
//
// After first call to `postMessage() this worker schedules
// [close(), postMessage()] ops on the same turn of microtask queue
// (because message is rather big).
// Only single `postMessage()` call should make it
// to host, ie. after calling `close()` no more code should be run.

setTimeout(() => {
close();
}, 50);

while (true) {
await new Promise((done) => {
setTimeout(() => {
postMessage({ buf: new Array(999999) });
done();
});
});
}
14 changes: 8 additions & 6 deletions cli/tests/workers_test.out
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
running 4 tests
test workersBasic ... ok [WILDCARD]
test nestedWorker ... ok [WILDCARD]
test workerThrowsWhenExecuting ... ok [WILDCARD]
test workerCanUseFetch ... ok [WILDCARD]
running 6 tests
test worker terminate ... ok [WILDCARD]
test worker nested ... ok [WILDCARD]
test worker throws when executing ... ok [WILDCARD]
test worker fetch API ... ok [WILDCARD]
test worker terminate busy loop ... ok [WILDCARD]
test worker race condition ... ok [WILDCARD]

test result: ok. 4 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out [WILDCARD]
test result: ok. 6 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out [WILDCARD]
Loading

0 comments on commit be71885

Please sign in to comment.