From 193992fd14e88d91a3695f10204232d4c81192dc Mon Sep 17 00:00:00 2001 From: Aleksey Kladov Date: Tue, 18 Dec 2018 12:45:20 +0300 Subject: [PATCH] move thread worker to a separate crate --- Cargo.lock | 10 ++++ crates/ra_lsp_server/Cargo.toml | 1 + crates/ra_lsp_server/src/lib.rs | 1 - crates/ra_lsp_server/src/main_loop.rs | 6 +- crates/ra_lsp_server/src/project_model.rs | 10 ++-- crates/ra_lsp_server/src/vfs.rs | 7 +-- .../tests/heavy_tests/support.rs | 6 +- crates/thread_worker/Cargo.toml | 11 ++++ .../src/lib.rs} | 56 +++++++++---------- 9 files changed, 62 insertions(+), 46 deletions(-) create mode 100644 crates/thread_worker/Cargo.toml rename crates/{ra_lsp_server/src/thread_watcher.rs => thread_worker/src/lib.rs} (63%) diff --git a/Cargo.lock b/Cargo.lock index 56d5c65b9cd..15209184dcf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -700,6 +700,7 @@ dependencies = [ "tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", "test_utils 0.1.0", "text_unit 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", + "thread_worker 0.1.0", "threadpool 1.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "url_serde 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "walkdir 2.2.6 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1116,6 +1117,15 @@ dependencies = [ "lazy_static 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "thread_worker" +version = "0.1.0" +dependencies = [ + "crossbeam-channel 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)", + "drop_bomb 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "threadpool" version = "1.7.1" diff --git a/crates/ra_lsp_server/Cargo.toml b/crates/ra_lsp_server/Cargo.toml index 133decc5257..30a8d35cd3a 100644 --- a/crates/ra_lsp_server/Cargo.toml +++ b/crates/ra_lsp_server/Cargo.toml @@ -26,6 +26,7 @@ text_unit = { version = "0.1.2", features = ["serde"] } smol_str = { version = "0.1.5", features = ["serde"] } rustc-hash = "1.0" +thread_worker = { path = "../thread_worker" } ra_syntax = { path = "../ra_syntax" } ra_editor = { path = "../ra_editor" } ra_text_edit = { path = "../ra_text_edit" } diff --git a/crates/ra_lsp_server/src/lib.rs b/crates/ra_lsp_server/src/lib.rs index 75c6fa1b8da..1d7258c359b 100644 --- a/crates/ra_lsp_server/src/lib.rs +++ b/crates/ra_lsp_server/src/lib.rs @@ -5,7 +5,6 @@ mod path_map; mod project_model; pub mod req; mod server_world; -pub mod thread_watcher; mod vfs; pub type Result = ::std::result::Result; diff --git a/crates/ra_lsp_server/src/main_loop.rs b/crates/ra_lsp_server/src/main_loop.rs index 41f70f263fa..eab82ee8599 100644 --- a/crates/ra_lsp_server/src/main_loop.rs +++ b/crates/ra_lsp_server/src/main_loop.rs @@ -10,6 +10,7 @@ use gen_lsp_server::{ use languageserver_types::NumberOrString; use ra_analysis::{Canceled, FileId, LibraryData}; use rayon; +use thread_worker::Worker; use threadpool::ThreadPool; use rustc_hash::FxHashSet; use serde::{de::DeserializeOwned, Serialize}; @@ -21,7 +22,6 @@ use crate::{ project_model::{workspace_loader, CargoWorkspace}, req, server_world::{ServerWorld, ServerWorldState}, - thread_watcher::Worker, vfs::{self, FileEvent}, Result, }; @@ -92,8 +92,8 @@ pub fn main_loop( let ws_res = ws_watcher.stop(); main_res?; - fs_res?; - ws_res?; + fs_res.map_err(|_| format_err!("fs watcher died"))?; + ws_res.map_err(|_| format_err!("ws watcher died"))?; Ok(()) } diff --git a/crates/ra_lsp_server/src/project_model.rs b/crates/ra_lsp_server/src/project_model.rs index cb91ada90ac..b881f8b6f30 100644 --- a/crates/ra_lsp_server/src/project_model.rs +++ b/crates/ra_lsp_server/src/project_model.rs @@ -4,11 +4,9 @@ use cargo_metadata::{metadata_run, CargoOpt}; use ra_syntax::SmolStr; use rustc_hash::{FxHashMap, FxHashSet}; use failure::{format_err, bail}; +use thread_worker::{WorkerHandle, Worker}; -use crate::{ - Result, - thread_watcher::{ThreadWatcher, Worker}, -}; +use crate::Result; /// `CargoWorksapce` represents the logical structure of, well, a Cargo /// workspace. It pretty closely mirrors `cargo metadata` output. @@ -199,8 +197,8 @@ impl TargetKind { } } -pub fn workspace_loader() -> (Worker>, ThreadWatcher) { - Worker::>::spawn( +pub fn workspace_loader() -> (Worker>, WorkerHandle) { + thread_worker::spawn::, _>( "workspace loader", 1, |input_receiver, output_sender| { diff --git a/crates/ra_lsp_server/src/vfs.rs b/crates/ra_lsp_server/src/vfs.rs index 00ab3e6c3d6..fcf7693d868 100644 --- a/crates/ra_lsp_server/src/vfs.rs +++ b/crates/ra_lsp_server/src/vfs.rs @@ -4,8 +4,7 @@ use std::{ }; use walkdir::WalkDir; - -use crate::thread_watcher::{ThreadWatcher, Worker}; +use thread_worker::{WorkerHandle, Worker}; #[derive(Debug)] pub struct FileEvent { @@ -18,8 +17,8 @@ pub enum FileEventKind { Add(String), } -pub fn roots_loader() -> (Worker)>, ThreadWatcher) { - Worker::)>::spawn( +pub fn roots_loader() -> (Worker)>, WorkerHandle) { + thread_worker::spawn::), _>( "roots loader", 128, |input_receiver, output_sender| { diff --git a/crates/ra_lsp_server/tests/heavy_tests/support.rs b/crates/ra_lsp_server/tests/heavy_tests/support.rs index 4b75be3ee74..07a878a2648 100644 --- a/crates/ra_lsp_server/tests/heavy_tests/support.rs +++ b/crates/ra_lsp_server/tests/heavy_tests/support.rs @@ -17,11 +17,11 @@ use languageserver_types::{ use serde::Serialize; use serde_json::{to_string_pretty, Value}; use tempdir::TempDir; +use thread_worker::{WorkerHandle, Worker}; use test_utils::{parse_fixture, find_mismatch}; use ra_lsp_server::{ main_loop, req, - thread_watcher::{ThreadWatcher, Worker}, }; pub fn project(fixture: &str) -> Server { @@ -45,13 +45,13 @@ pub struct Server { messages: RefCell>, dir: TempDir, worker: Option>, - watcher: Option, + watcher: Option, } impl Server { fn new(dir: TempDir, files: Vec<(PathBuf, String)>) -> Server { let path = dir.path().to_path_buf(); - let (worker, watcher) = Worker::::spawn( + let (worker, watcher) = thread_worker::spawn::( "test server", 128, move |mut msg_receiver, mut msg_sender| { diff --git a/crates/thread_worker/Cargo.toml b/crates/thread_worker/Cargo.toml new file mode 100644 index 00000000000..62d66a1a3dd --- /dev/null +++ b/crates/thread_worker/Cargo.toml @@ -0,0 +1,11 @@ +[package] +edition = "2018" +name = "thread_worker" +version = "0.1.0" +authors = ["Aleksey Kladov "] + +[dependencies] +drop_bomb = "0.1.0" +crossbeam-channel = "0.2.4" +log = "0.4.3" + diff --git a/crates/ra_lsp_server/src/thread_watcher.rs b/crates/thread_worker/src/lib.rs similarity index 63% rename from crates/ra_lsp_server/src/thread_watcher.rs rename to crates/thread_worker/src/lib.rs index 99825d440d8..e558559ef20 100644 --- a/crates/ra_lsp_server/src/thread_watcher.rs +++ b/crates/thread_worker/src/lib.rs @@ -1,28 +1,35 @@ +//! Small utility to correctly spawn crossbeam-channel based worker threads. + use std::thread; use crossbeam_channel::{bounded, unbounded, Receiver, Sender}; use drop_bomb::DropBomb; -use failure::format_err; - -use crate::Result; pub struct Worker { pub inp: Sender, pub out: Receiver, } -impl Worker { - pub fn spawn(name: &'static str, buf: usize, f: F) -> (Self, ThreadWatcher) - where - F: FnOnce(Receiver, Sender) + Send + 'static, - I: Send + 'static, - O: Send + 'static, - { - let (worker, inp_r, out_s) = worker_chan(buf); - let watcher = ThreadWatcher::spawn(name, move || f(inp_r, out_s)); - (worker, watcher) - } +pub struct WorkerHandle { + name: &'static str, + thread: thread::JoinHandle<()>, + bomb: DropBomb, +} +pub fn spawn(name: &'static str, buf: usize, f: F) -> (Worker, WorkerHandle) +where + F: FnOnce(Receiver, Sender) + Send + 'static, + I: Send + 'static, + O: Send + 'static, +{ + let (worker, inp_r, out_s) = worker_chan(buf); + let watcher = WorkerHandle::spawn(name, move || f(inp_r, out_s)); + (worker, watcher) +} + +impl Worker { + /// Stops the worker. Returns the message receiver to fetch results which + /// have become ready before the worker is stopped. pub fn stop(self) -> Receiver { self.out } @@ -32,30 +39,21 @@ impl Worker { } } -pub struct ThreadWatcher { - name: &'static str, - thread: thread::JoinHandle<()>, - bomb: DropBomb, -} - -impl ThreadWatcher { - fn spawn(name: &'static str, f: impl FnOnce() + Send + 'static) -> ThreadWatcher { +impl WorkerHandle { + fn spawn(name: &'static str, f: impl FnOnce() + Send + 'static) -> WorkerHandle { let thread = thread::spawn(f); - ThreadWatcher { + WorkerHandle { name, thread, - bomb: DropBomb::new(format!("ThreadWatcher {} was not stopped", name)), + bomb: DropBomb::new(format!("WorkerHandle {} was not stopped", name)), } } - pub fn stop(mut self) -> Result<()> { + pub fn stop(mut self) -> thread::Result<()> { log::info!("waiting for {} to finish ...", self.name); let name = self.name; self.bomb.defuse(); - let res = self - .thread - .join() - .map_err(|_| format_err!("ThreadWatcher {} died", name)); + let res = self.thread.join(); match &res { Ok(()) => log::info!("... {} terminated with ok", name), Err(_) => log::error!("... {} terminated with err", name),