From 07f8555b3e5d1b0a31936ed7ca08358d2919dd2f Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Sat, 21 Jan 2012 17:29:52 -0800 Subject: [PATCH] std: Add some hacks to use libuv --- mk/rt.mk | 1 + src/libstd/std.rc | 3 +- src/libstd/uvtmp.rs | 149 ++++++++++++++++++ src/rt/rust_uv.cpp | 1 + src/rt/rust_uvtmp.cpp | 346 ++++++++++++++++++++++++++++++++++++++++++ src/rt/rustrt.def.in | 9 ++ 6 files changed, 508 insertions(+), 1 deletion(-) create mode 100644 src/libstd/uvtmp.rs create mode 100644 src/rt/rust_uvtmp.cpp diff --git a/mk/rt.mk b/mk/rt.mk index 2eae4208729..a161f298a63 100644 --- a/mk/rt.mk +++ b/mk/rt.mk @@ -48,6 +48,7 @@ RUNTIME_CS_$(1) := \ rt/rust_port.cpp \ rt/rust_upcall.cpp \ rt/rust_uv.cpp \ + rt/rust_uvtmp.cpp \ rt/rust_log.cpp \ rt/rust_timer.cpp \ rt/circular_buffer.cpp \ diff --git a/src/libstd/std.rc b/src/libstd/std.rc index f8d105ef77e..bcfe108eae5 100644 --- a/src/libstd/std.rc +++ b/src/libstd/std.rc @@ -7,7 +7,7 @@ #[license = "MIT"]; #[crate_type = "lib"]; -export fs, io, net, run, uv; +export fs, io, net, run, uv, uvtmp; export c_vec, four, tri, util; export bitv, deque, fun_treemap, list, map, smallintmap, sort, treemap, ufind; export rope; @@ -25,6 +25,7 @@ mod net; #[path = "run_program.rs"] mod run; mod uv; +mod uvtmp; // Utility modules diff --git a/src/libstd/uvtmp.rs b/src/libstd/uvtmp.rs new file mode 100644 index 00000000000..e2059890511 --- /dev/null +++ b/src/libstd/uvtmp.rs @@ -0,0 +1,149 @@ +// Some temporary libuv hacks for servo + +#[cfg(target_os = "linux")]; +#[cfg(target_os = "macos")]; +#[cfg(target_os = "freebsd")]; + + +#[nolink] +native mod rustrt { + fn rust_uvtmp_create_thread() -> thread; + fn rust_uvtmp_start_thread(thread: thread); + fn rust_uvtmp_join_thread(thread: thread); + fn rust_uvtmp_delete_thread(thread: thread); + fn rust_uvtmp_connect( + thread: thread, + ip: str::sbuf, + chan: comm::chan); + fn rust_uvtmp_close_connection(thread: thread, cd: connect_data); + fn rust_uvtmp_write( + thread: thread, + cd: connect_data, + buf: *u8, + len: ctypes::size_t, + chan: comm::chan); + fn rust_uvtmp_read_start( + thread: thread, + cd: connect_data, + chan: comm::chan); + fn rust_uvtmp_delete_buf(buf: *u8); +} + +type thread = *ctypes::void; + +type connect_data = *ctypes::void; + +enum iomsg { + whatever, + connected(connect_data), + wrote(connect_data), + read(connect_data, *u8, ctypes::ssize_t) +} + +fn create_thread() -> thread { + rustrt::rust_uvtmp_create_thread() +} + +fn start_thread(thread: thread) { + rustrt::rust_uvtmp_start_thread(thread) +} + +fn join_thread(thread: thread) { + rustrt::rust_uvtmp_join_thread(thread) +} + +fn delete_thread(thread: thread) { + rustrt::rust_uvtmp_delete_thread(thread) +} + +fn connect(thread: thread, ip: str, ch: comm::chan) { + str::as_buf(ip) {|ipbuf| + rustrt::rust_uvtmp_connect(thread, ipbuf, ch) + } +} + +fn close_connection(thread: thread, cd: connect_data) { + rustrt::rust_uvtmp_close_connection(thread ,cd); +} + +fn write(thread: thread, cd: connect_data,bytes: [u8], + chan: comm::chan) unsafe { + rustrt::rust_uvtmp_write( + thread, cd, vec::to_ptr(bytes), vec::len(bytes), chan); +} + +fn read_start(thread: thread, cd: connect_data, + chan: comm::chan) { + rustrt::rust_uvtmp_read_start(thread, cd, chan); +} + +fn delete_buf(buf: *u8) { + rustrt::rust_uvtmp_delete_buf(buf); +} + +#[test] +fn test_start_stop() { + let thread = create_thread(); + start_thread(thread); + join_thread(thread); + delete_thread(thread); +} + +#[test] +#[ignore] +fn test_connect() { + let thread = create_thread(); + start_thread(thread); + let port = comm::port(); + let chan = comm::chan(port); + connect(thread, "74.125.224.146", chan); + alt comm::recv(port) { + connected(cd) { + close_connection(thread, cd); + } + } + join_thread(thread); + delete_thread(thread); +} + +#[test] +#[ignore] +fn test_http() { + let thread = create_thread(); + start_thread(thread); + let port = comm::port(); + let chan = comm::chan(port); + connect(thread, "74.125.224.146", chan); + alt comm::recv(port) { + connected(cd) { + write(thread, cd, str::bytes("GET / HTTP/1.0\n\n"), chan); + alt comm::recv(port) { + wrote(cd) { + read_start(thread, cd, chan); + let keep_going = true; + while keep_going { + alt comm::recv(port) { + read(_, buf, -1) { + keep_going = false; + delete_buf(buf); + } + read(_, buf, len) { + unsafe { + log(error, len); + let buf = vec::unsafe::from_buf(buf, len as uint); + let str = str::unsafe_from_bytes(buf); + #error("read something"); + io::println(str); + } + delete_buf(buf); + } + } + } + close_connection(thread, cd); + } + } + } + } + join_thread(thread); + delete_thread(thread); +} \ No newline at end of file diff --git a/src/rt/rust_uv.cpp b/src/rt/rust_uv.cpp index b339d77c043..9ee1b844add 100644 --- a/src/rt/rust_uv.cpp +++ b/src/rt/rust_uv.cpp @@ -48,3 +48,4 @@ extern "C" CDECL size_t rust_uv_size_of_idle_t() { return sizeof(uv_idle_t); } + diff --git a/src/rt/rust_uvtmp.cpp b/src/rt/rust_uvtmp.cpp new file mode 100644 index 00000000000..3d4cb89f711 --- /dev/null +++ b/src/rt/rust_uvtmp.cpp @@ -0,0 +1,346 @@ +#include +#include +#include +#include "rust_internal.h" +#include "uv.h" + +class rust_uvtmp_thread; + +struct connect_data { + rust_uvtmp_thread *thread; + uv_connect_t connect; + uv_tcp_t tcp; + chan_handle chan; +}; + +const intptr_t connected_tag = 1; +const intptr_t wrote_tag = 2; +const intptr_t read_tag = 3; + +struct iomsg { + intptr_t tag; + union { + connect_data *connected_val; + connect_data *wrote_val; + struct { + connect_data *cd; + uint8_t *buf; + ssize_t nread; + } read_val; + } val; +}; + +struct write_data { + connect_data *cd; + uint8_t *buf; + size_t len; + chan_handle chan; +}; + +struct read_start_data { + connect_data *cd; + chan_handle chan; +}; + +// FIXME: Copied from rust_builtins.cpp. Could bitrot easily +static void +send(rust_task *task, chan_handle chan, void *data) { + rust_task *target_task = task->kernel->get_task_by_id(chan.task); + if(target_task) { + rust_port *port = target_task->get_port_by_id(chan.port); + if(port) { + port->send(data); + scoped_lock with(target_task->lock); + port->deref(); + } + target_task->deref(); + } +} + +class rust_uvtmp_thread : public rust_thread { + +private: + rust_task *task; + uv_loop_t *loop; + uv_idle_t idle; + lock_and_signal lock; + bool stop_flag; + std::queue > connect_queue; + std::queue close_connection_queue; + std::queue write_queue; + std::queue read_start_queue; + +public: + + rust_uvtmp_thread() { + task = rust_scheduler::get_task(); + stop_flag = false; + loop = uv_loop_new(); + uv_idle_init(loop, &idle); + idle.data = this; + uv_idle_start(&idle, idle_cb); + } + + ~rust_uvtmp_thread() { + uv_loop_delete(loop); + } + + void stop() { + scoped_lock with(lock); + stop_flag = true; + } + + void connect(char *ip, chan_handle chan) { + scoped_lock with(lock); + connect_queue.push(std::pair + (std::string(ip), chan)); + } + + void + close_connection(connect_data *cd) { + scoped_lock with(lock); + close_connection_queue.push(cd); + } + + void + write(connect_data *cd, uint8_t *buf, size_t len, chan_handle chan) { + scoped_lock with(lock); + write_data *wd = new write_data(); + wd->cd = cd; + wd->buf = new uint8_t[len]; + wd->len = len; + wd->chan = chan; + + memcpy(wd->buf, buf, len); + + write_queue.push(wd); + } + + void + read_start(connect_data *cd, chan_handle chan) { + scoped_lock with(lock); + read_start_data *rd = new read_start_data(); + rd->cd = cd; + rd->chan = chan; + + read_start_queue.push(rd); + } + +private: + + virtual void + run() { + uv_run(loop); + } + + static void + idle_cb(uv_idle_t* handle, int status) { + rust_uvtmp_thread *self = (rust_uvtmp_thread*) handle->data; + self->on_idle(); + } + + void + on_idle() { + scoped_lock with(lock); + make_new_connections(); + close_connections(); + write_buffers(); + start_reads(); + close_idle_if_stop(); + } + + void + make_new_connections() { + assert(lock.lock_held_by_current_thread()); + while (!connect_queue.empty()) { + std::pair pair = connect_queue.front(); + connect_queue.pop(); + struct sockaddr_in client_addr = uv_ip4_addr("0.0.0.0", 0); + struct sockaddr_in server_addr = uv_ip4_addr(pair.first.c_str(), 80); + + connect_data *cd = new connect_data(); + cd->thread = this; + cd->chan = pair.second; + cd->connect.data = cd; + + uv_tcp_init(loop, &cd->tcp); + uv_tcp_bind(&cd->tcp, client_addr); + + uv_tcp_connect(&cd->connect, &cd->tcp, server_addr, connect_cb); + } + } + + static void + connect_cb(uv_connect_t *handle, int status) { + connect_data *cd = (connect_data*)handle->data; + cd->thread->on_connect(cd); + } + + void + on_connect(connect_data *cd) { + iomsg msg; + msg.tag = connected_tag; + msg.val.connected_val = cd; + + send(task, cd->chan, &msg); + } + + void + close_connections() { + assert(lock.lock_held_by_current_thread()); + while (!close_connection_queue.empty()) { + connect_data *cd = close_connection_queue.front(); + close_connection_queue.pop(); + + cd->tcp.data = cd; + + uv_close((uv_handle_t*)&cd->tcp, tcp_close_cb); + } + } + + static void + tcp_close_cb(uv_handle_t *handle) { + connect_data *cd = (connect_data*)handle->data; + delete cd; + } + + void + write_buffers() { + assert(lock.lock_held_by_current_thread()); + while (!write_queue.empty()) { + write_data *wd = write_queue.front(); + write_queue.pop(); + + uv_write_t *write = new uv_write_t(); + + write->data = wd; + + uv_buf_t buf; + buf.base = (char*)wd->buf; + buf.len = wd->len; + + uv_write(write, (uv_stream_t*)&wd->cd->tcp, &buf, 1, write_cb); + } + } + + static void + write_cb(uv_write_t *handle, int status) { + write_data *wd = (write_data*)handle->data; + rust_uvtmp_thread *self = wd->cd->thread; + self->on_write(handle, wd); + } + + void + on_write(uv_write_t *handle, write_data *wd) { + iomsg msg; + msg.tag = wrote_tag; + msg.val.wrote_val = wd->cd; + + send(task, wd->chan, &msg); + + delete [] wd->buf; + delete wd; + delete handle; + } + + void + start_reads() { + assert (lock.lock_held_by_current_thread()); + while (!read_start_queue.empty()) { + read_start_data *rd = read_start_queue.front(); + read_start_queue.pop(); + + connect_data *cd = rd->cd; + cd->tcp.data = rd; + + uv_read_start((uv_stream_t*)&cd->tcp, alloc_cb, read_cb); + } + } + + static uv_buf_t + alloc_cb(uv_handle_t* handle, size_t size) { + uv_buf_t buf; + buf.base = new char[size]; + buf.len = size; + return buf; + } + + static void + read_cb(uv_stream_t *handle, ssize_t nread, uv_buf_t buf) { + read_start_data *rd = (read_start_data*)handle->data; + rust_uvtmp_thread *self = rd->cd->thread; + self->on_read(rd, nread, buf); + } + + void + on_read(read_start_data *rd, ssize_t nread, uv_buf_t buf) { + iomsg msg; + msg.tag = read_tag; + msg.val.read_val.cd = rd->cd; + msg.val.read_val.buf = (uint8_t*)buf.base; + msg.val.read_val.nread = nread; + + send(task, rd->chan, &msg); + if (nread == -1) { + delete rd; + } + } + + void + close_idle_if_stop() { + assert(lock.lock_held_by_current_thread()); + if (stop_flag) { + uv_close((uv_handle_t*)&idle, NULL); + } + } + +}; + +extern "C" rust_uvtmp_thread * +rust_uvtmp_create_thread() { + rust_uvtmp_thread *thread = new rust_uvtmp_thread(); + return thread; +} + +extern "C" void +rust_uvtmp_start_thread(rust_uvtmp_thread *thread) { + thread->start(); +} + +extern "C" void +rust_uvtmp_join_thread(rust_uvtmp_thread *thread) { + thread->stop(); + thread->join(); +} + +extern "C" void +rust_uvtmp_delete_thread(rust_uvtmp_thread *thread) { + delete thread; +} + +extern "C" void +rust_uvtmp_connect(rust_uvtmp_thread *thread, char *ip, chan_handle *chan) { + thread->connect(ip, *chan); +} + +extern "C" void +rust_uvtmp_close_connection(rust_uvtmp_thread *thread, connect_data *cd) { + thread->close_connection(cd); +} + +extern "C" void +rust_uvtmp_write(rust_uvtmp_thread *thread, connect_data *cd, + uint8_t *buf, size_t len, chan_handle *chan) { + thread->write(cd, buf, len, *chan); +} + +extern "C" void +rust_uvtmp_read_start(rust_uvtmp_thread *thread, connect_data *cd, + chan_handle *chan) { + thread->read_start(cd, *chan); +} + +extern "C" void +rust_uvtmp_delete_buf(uint8_t *buf) { + delete [] buf; +} diff --git a/src/rt/rustrt.def.in b/src/rt/rustrt.def.in index 745e4c6e34b..86f24a20f96 100644 --- a/src/rt/rustrt.def.in +++ b/src/rt/rustrt.def.in @@ -88,3 +88,12 @@ rust_uv_unref rust_uv_idle_init rust_uv_idle_start rust_uv_size_of_idle_t +rust_uvtmp_create_thread +rust_uvtmp_start_thread +rust_uvtmp_join_thread +rust_uvtmp_delete_thread +rust_uvtmp_connect +rust_uvtmp_close_connection +rust_uvtmp_write +rust_uvtmp_read_start +rust_uvtmp_delete_buf