Rollup merge of #119408 - betrusted-io:xous-fixes-add-network, r=Mark-Simulacrum

xous: misc fixes + add network support

This patchset makes several fixes to Xous support. Additionally, this patch adds networking support.

Many of these fixes are the result of the recent patch to get `unwinding` support merged. As a result of this patch, we can now run rust tests. As a result of these tests, we now have 729 tests passing:

```
failures:
    env::tests::test
    env::tests::test_self_exe_path
    env::tests::vars_debug
    env::tests::vars_os_debug
    os::raw::tests::same
    path::tests::test_push
    path::tests::test_set_file_name
    time::tests::since_epoch
test result: FAILED. 729 passed; 8 failed; 1 ignored; 0 measured; 0 filtered out; finished in 214.54s
```

In the course of fixing several tests and getting the test sequence to reliably run, several issues were found. This patchset fixes those issues.
This commit is contained in:
Matthias Krüger 2024-01-22 16:13:26 +01:00 committed by GitHub
commit e9c2e1bfbe
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 1777 additions and 183 deletions

View File

@ -88,29 +88,31 @@ fn lend_impl(
let a3 = opcode;
let a4 = data.as_ptr() as usize;
let a5 = data.len();
let mut a6 = arg1;
let mut a7 = arg2;
let a6 = arg1;
let a7 = arg2;
let mut ret1;
let mut ret2;
unsafe {
core::arch::asm!(
"ecall",
inlateout("a0") a0,
inlateout("a1") a1 => _,
inlateout("a2") a2 => _,
inlateout("a1") a1 => ret1,
inlateout("a2") a2 => ret2,
inlateout("a3") a3 => _,
inlateout("a4") a4 => _,
inlateout("a5") a5 => _,
inlateout("a6") a6,
inlateout("a7") a7,
inlateout("a6") a6 => _,
inlateout("a7") a7 => _,
)
};
let result = a0;
if result == SyscallResult::MemoryReturned as usize {
Ok((a6, a7))
Ok((ret1, ret2))
} else if result == SyscallResult::Error as usize {
Err(a1.into())
Err(ret1.into())
} else {
Err(Error::InternalError)
}
@ -405,7 +407,7 @@ pub(crate) unsafe fn map_memory<T>(
pub(crate) unsafe fn unmap_memory<T>(range: *mut [T]) -> Result<(), Error> {
let mut a0 = Syscall::UnmapMemory as usize;
let mut a1 = range.as_mut_ptr() as usize;
let a2 = range.len();
let a2 = range.len() * core::mem::size_of::<T>();
let a3 = 0;
let a4 = 0;
let a5 = 0;
@ -450,7 +452,7 @@ pub(crate) unsafe fn update_memory_flags<T>(
) -> Result<(), Error> {
let mut a0 = Syscall::UpdateMemoryFlags as usize;
let mut a1 = range.as_mut_ptr() as usize;
let a2 = range.len();
let a2 = range.len() * core::mem::size_of::<T>();
let a3 = new_flags.bits();
let a4 = 0; // Process ID is currently None
let a5 = 0;

View File

@ -1,9 +1,15 @@
use crate::os::xous::ffi::Connection;
use core::sync::atomic::{AtomicU32, Ordering};
mod dns;
pub(crate) use dns::*;
mod log;
pub(crate) use log::*;
mod net;
pub(crate) use net::*;
mod systime;
pub(crate) use systime::*;

View File

@ -0,0 +1,28 @@
use crate::os::xous::ffi::Connection;
use crate::os::xous::services::connect;
use core::sync::atomic::{AtomicU32, Ordering};
#[repr(usize)]
pub(crate) enum DnsLendMut {
RawLookup = 6,
}
impl Into<usize> for DnsLendMut {
fn into(self) -> usize {
self as usize
}
}
/// Return a `Connection` to the DNS lookup server. This server is used for
/// querying domain name values.
pub(crate) fn dns_server() -> Connection {
static DNS_CONNECTION: AtomicU32 = AtomicU32::new(0);
let cid = DNS_CONNECTION.load(Ordering::Relaxed);
if cid != 0 {
return cid.into();
}
let cid = connect("_DNS Resolver Middleware_").unwrap();
DNS_CONNECTION.store(cid.into(), Ordering::Relaxed);
cid
}

View File

@ -45,6 +45,17 @@ impl<'a> Into<[usize; 5]> for LogScalar<'a> {
}
}
pub(crate) enum LogLend {
StandardOutput = 1,
StandardError = 2,
}
impl Into<usize> for LogLend {
fn into(self) -> usize {
self as usize
}
}
/// Return a `Connection` to the log server, which is used for printing messages to
/// the console and reporting panics. If the log server has not yet started, this
/// will block until the server is running. It is safe to call this multiple times,

View File

@ -0,0 +1,95 @@
use crate::os::xous::ffi::Connection;
use crate::os::xous::services::connect;
use core::sync::atomic::{AtomicU32, Ordering};
pub(crate) enum NetBlockingScalar {
StdGetTtlUdp(u16 /* fd */), /* 36 */
StdSetTtlUdp(u16 /* fd */, u32 /* ttl */), /* 37 */
StdGetTtlTcp(u16 /* fd */), /* 36 */
StdSetTtlTcp(u16 /* fd */, u32 /* ttl */), /* 37 */
StdGetNodelay(u16 /* fd */), /* 38 */
StdSetNodelay(u16 /* fd */, bool), /* 39 */
StdTcpClose(u16 /* fd */), /* 34 */
StdUdpClose(u16 /* fd */), /* 41 */
StdTcpStreamShutdown(u16 /* fd */, crate::net::Shutdown /* how */), /* 46 */
}
pub(crate) enum NetLendMut {
StdTcpConnect, /* 30 */
StdTcpTx(u16 /* fd */), /* 31 */
StdTcpPeek(u16 /* fd */, bool /* nonblocking */), /* 32 */
StdTcpRx(u16 /* fd */, bool /* nonblocking */), /* 33 */
StdGetAddress(u16 /* fd */), /* 35 */
StdUdpBind, /* 40 */
StdUdpRx(u16 /* fd */), /* 42 */
StdUdpTx(u16 /* fd */), /* 43 */
StdTcpListen, /* 44 */
StdTcpAccept(u16 /* fd */), /* 45 */
}
impl Into<usize> for NetLendMut {
fn into(self) -> usize {
match self {
NetLendMut::StdTcpConnect => 30,
NetLendMut::StdTcpTx(fd) => 31 | ((fd as usize) << 16),
NetLendMut::StdTcpPeek(fd, blocking) => {
32 | ((fd as usize) << 16) | if blocking { 0x8000 } else { 0 }
}
NetLendMut::StdTcpRx(fd, blocking) => {
33 | ((fd as usize) << 16) | if blocking { 0x8000 } else { 0 }
}
NetLendMut::StdGetAddress(fd) => 35 | ((fd as usize) << 16),
NetLendMut::StdUdpBind => 40,
NetLendMut::StdUdpRx(fd) => 42 | ((fd as usize) << 16),
NetLendMut::StdUdpTx(fd) => 43 | ((fd as usize) << 16),
NetLendMut::StdTcpListen => 44,
NetLendMut::StdTcpAccept(fd) => 45 | ((fd as usize) << 16),
}
}
}
impl<'a> Into<[usize; 5]> for NetBlockingScalar {
fn into(self) -> [usize; 5] {
match self {
NetBlockingScalar::StdGetTtlTcp(fd) => [36 | ((fd as usize) << 16), 0, 0, 0, 0],
NetBlockingScalar::StdGetTtlUdp(fd) => [36 | ((fd as usize) << 16), 0, 0, 0, 1],
NetBlockingScalar::StdSetTtlTcp(fd, ttl) => {
[37 | ((fd as usize) << 16), ttl as _, 0, 0, 0]
}
NetBlockingScalar::StdSetTtlUdp(fd, ttl) => {
[37 | ((fd as usize) << 16), ttl as _, 0, 0, 1]
}
NetBlockingScalar::StdGetNodelay(fd) => [38 | ((fd as usize) << 16), 0, 0, 0, 0],
NetBlockingScalar::StdSetNodelay(fd, enabled) => {
[39 | ((fd as usize) << 16), if enabled { 1 } else { 0 }, 0, 0, 1]
}
NetBlockingScalar::StdTcpClose(fd) => [34 | ((fd as usize) << 16), 0, 0, 0, 0],
NetBlockingScalar::StdUdpClose(fd) => [41 | ((fd as usize) << 16), 0, 0, 0, 0],
NetBlockingScalar::StdTcpStreamShutdown(fd, how) => [
46 | ((fd as usize) << 16),
match how {
crate::net::Shutdown::Read => 1,
crate::net::Shutdown::Write => 2,
crate::net::Shutdown::Both => 3,
},
0,
0,
0,
],
}
}
}
/// Return a `Connection` to the Network server. This server provides all
/// OS-level networking functions.
pub(crate) fn net_server() -> Connection {
static NET_CONNECTION: AtomicU32 = AtomicU32::new(0);
let cid = NET_CONNECTION.load(Ordering::Relaxed);
if cid != 0 {
return cid.into();
}
let cid = connect("_Middleware Network Server_").unwrap();
NET_CONNECTION.store(cid.into(), Ordering::Relaxed);
cid
}

View File

@ -1,7 +1,15 @@
use crate::alloc::{GlobalAlloc, Layout, System};
#[cfg(not(test))]
#[export_name = "_ZN16__rust_internals3std3sys4xous5alloc8DLMALLOCE"]
static mut DLMALLOC: dlmalloc::Dlmalloc = dlmalloc::Dlmalloc::new();
#[cfg(test)]
extern "Rust" {
#[link_name = "_ZN16__rust_internals3std3sys4xous5alloc8DLMALLOCE"]
static mut DLMALLOC: dlmalloc::Dlmalloc;
}
#[stable(feature = "alloc_system_type", since = "1.28.0")]
unsafe impl GlobalAlloc for System {
#[inline]

View File

@ -1,14 +1,17 @@
use super::mutex::Mutex;
use crate::os::xous::ffi::{blocking_scalar, scalar};
use crate::os::xous::services::ticktimer_server;
use crate::sync::Mutex as StdMutex;
use crate::os::xous::services::{ticktimer_server, TicktimerScalar};
use crate::time::Duration;
use core::sync::atomic::{AtomicUsize, Ordering};
// The implementation is inspired by Andrew D. Birrell's paper
// "Implementing Condition Variables with Semaphores"
const NOTIFY_TRIES: usize = 3;
pub struct Condvar {
counter: StdMutex<usize>,
counter: AtomicUsize,
timed_out: AtomicUsize,
}
unsafe impl Send for Condvar {}
@ -18,94 +21,128 @@ impl Condvar {
#[inline]
#[rustc_const_stable(feature = "const_locks", since = "1.63.0")]
pub const fn new() -> Condvar {
Condvar { counter: StdMutex::new(0) }
Condvar { counter: AtomicUsize::new(0), timed_out: AtomicUsize::new(0) }
}
fn notify_some(&self, to_notify: usize) {
// Assumption: The Mutex protecting this condvar is locked throughout the
// entirety of this call, preventing calls to `wait` and `wait_timeout`.
// Logic check: Ensure that there aren't any missing waiters. Remove any that
// timed-out, ensuring the counter doesn't underflow.
assert!(self.timed_out.load(Ordering::Relaxed) <= self.counter.load(Ordering::Relaxed));
self.counter.fetch_sub(self.timed_out.swap(0, Ordering::Relaxed), Ordering::Relaxed);
// Figure out how many threads to notify. Note that it is impossible for `counter`
// to increase during this operation because Mutex is locked. However, it is
// possible for `counter` to decrease due to a condvar timing out, in which
// case the corresponding `timed_out` will increase accordingly.
let Ok(waiter_count) =
self.counter.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |counter| {
if counter == 0 {
return None;
} else {
Some(counter - counter.min(to_notify))
}
})
else {
// No threads are waiting on this condvar
return;
};
let mut remaining_to_wake = waiter_count.min(to_notify);
if remaining_to_wake == 0 {
return;
}
for _wake_tries in 0..NOTIFY_TRIES {
let result = blocking_scalar(
ticktimer_server(),
TicktimerScalar::NotifyCondition(self.index(), remaining_to_wake).into(),
)
.expect("failure to send NotifyCondition command");
// Remove the list of waiters that were notified
remaining_to_wake -= result[0];
// Also remove the number of waiters that timed out. Clamp it to 0 in order to
// ensure we don't wait forever in case the waiter woke up between the time
// we counted the remaining waiters and now.
remaining_to_wake =
remaining_to_wake.saturating_sub(self.timed_out.swap(0, Ordering::Relaxed));
if remaining_to_wake == 0 {
return;
}
crate::thread::yield_now();
}
}
pub fn notify_one(&self) {
let mut counter = self.counter.lock().unwrap();
if *counter <= 0 {
return;
} else {
*counter -= 1;
}
let result = blocking_scalar(
ticktimer_server(),
crate::os::xous::services::TicktimerScalar::NotifyCondition(self.index(), 1).into(),
);
drop(counter);
result.expect("failure to send NotifyCondition command");
self.notify_some(1)
}
pub fn notify_all(&self) {
let mut counter = self.counter.lock().unwrap();
if *counter <= 0 {
return;
}
let result = blocking_scalar(
ticktimer_server(),
crate::os::xous::services::TicktimerScalar::NotifyCondition(self.index(), *counter)
.into(),
);
*counter = 0;
drop(counter);
result.expect("failure to send NotifyCondition command");
self.notify_some(self.counter.load(Ordering::Relaxed))
}
fn index(&self) -> usize {
self as *const Condvar as usize
core::ptr::from_ref(self).addr()
}
/// Unlock the given Mutex and wait for the notification. Wait at most
/// `ms` milliseconds, or pass `0` to wait forever.
///
/// Returns `true` if the condition was received, `false` if it timed out
fn wait_ms(&self, mutex: &Mutex, ms: usize) -> bool {
self.counter.fetch_add(1, Ordering::Relaxed);
unsafe { mutex.unlock() };
// Threading concern: There is a chance that the `notify` thread wakes up here before
// we have a chance to wait for the condition. This is fine because we've recorded
// the fact that we're waiting by incrementing the counter.
let result = blocking_scalar(
ticktimer_server(),
TicktimerScalar::WaitForCondition(self.index(), ms).into(),
);
let awoken = result.expect("Ticktimer: failure to send WaitForCondition command")[0] == 0;
// If we awoke due to a timeout, increment the `timed_out` counter so that the
// main loop of `notify` knows there's a timeout.
//
// This is done with the Mutex still unlocked, because the Mutex might still
// be locked by the `notify` process above.
if !awoken {
self.timed_out.fetch_add(1, Ordering::Relaxed);
}
unsafe { mutex.lock() };
awoken
}
pub unsafe fn wait(&self, mutex: &Mutex) {
let mut counter = self.counter.lock().unwrap();
*counter += 1;
unsafe { mutex.unlock() };
drop(counter);
let result = blocking_scalar(
ticktimer_server(),
crate::os::xous::services::TicktimerScalar::WaitForCondition(self.index(), 0).into(),
);
unsafe { mutex.lock() };
result.expect("Ticktimer: failure to send WaitForCondition command");
// Wait for 0 ms, which is a special case to "wait forever"
self.wait_ms(mutex, 0);
}
pub unsafe fn wait_timeout(&self, mutex: &Mutex, dur: Duration) -> bool {
let mut counter = self.counter.lock().unwrap();
*counter += 1;
unsafe { mutex.unlock() };
drop(counter);
let mut millis = dur.as_millis() as usize;
// Ensure we don't wait for 0 ms, which would cause us to wait forever
if millis == 0 {
millis = 1;
}
let result = blocking_scalar(
ticktimer_server(),
crate::os::xous::services::TicktimerScalar::WaitForCondition(self.index(), millis)
.into(),
);
unsafe { mutex.lock() };
let result = result.expect("Ticktimer: failure to send WaitForCondition command")[0] == 0;
// If we awoke due to a timeout, decrement the wake count, as that would not have
// been done in the `notify()` call.
if !result {
*self.counter.lock().unwrap() -= 1;
}
result
self.wait_ms(mutex, millis)
}
}
impl Drop for Condvar {
fn drop(&mut self) {
scalar(
ticktimer_server(),
crate::os::xous::services::TicktimerScalar::FreeCondition(self.index()).into(),
)
.ok();
let remaining_count = self.counter.load(Ordering::Relaxed);
let timed_out = self.timed_out.load(Ordering::Relaxed);
assert!(
remaining_count - timed_out == 0,
"counter was {} and timed_out was {} not 0",
remaining_count,
timed_out
);
scalar(ticktimer_server(), TicktimerScalar::FreeCondition(self.index()).into()).ok();
}
}

View File

@ -1,5 +1,5 @@
use crate::os::xous::ffi::{blocking_scalar, do_yield, scalar};
use crate::os::xous::services::ticktimer_server;
use crate::os::xous::ffi::{blocking_scalar, do_yield};
use crate::os::xous::services::{ticktimer_server, TicktimerScalar};
use crate::sync::atomic::{AtomicBool, AtomicUsize, Ordering::Relaxed, Ordering::SeqCst};
pub struct Mutex {
@ -29,7 +29,7 @@ impl Mutex {
}
fn index(&self) -> usize {
self as *const Mutex as usize
core::ptr::from_ref(self).addr()
}
#[inline]
@ -83,10 +83,7 @@ impl Mutex {
}
// Unblock one thread that is waiting on this message.
scalar(
ticktimer_server(),
crate::os::xous::services::TicktimerScalar::UnlockMutex(self.index()).into(),
)
blocking_scalar(ticktimer_server(), TicktimerScalar::UnlockMutex(self.index()).into())
.expect("failure to send UnlockMutex command");
}
@ -106,10 +103,7 @@ impl Drop for Mutex {
// If there was Mutex contention, then we involved the ticktimer. Free
// the resources associated with this Mutex as it is deallocated.
if self.contended.load(Relaxed) {
scalar(
ticktimer_server(),
crate::os::xous::services::TicktimerScalar::FreeMutex(self.index()).into(),
)
blocking_scalar(ticktimer_server(), TicktimerScalar::FreeMutex(self.index()).into())
.ok();
}
}

View File

@ -1,5 +1,5 @@
use crate::os::xous::ffi::do_yield;
use crate::sync::atomic::{AtomicIsize, Ordering::SeqCst};
use crate::sync::atomic::{AtomicIsize, Ordering::Acquire};
use crate::thread::yield_now;
pub struct RwLock {
/// The "mode" value indicates how many threads are waiting on this
@ -14,6 +14,9 @@ pub struct RwLock {
mode: AtomicIsize,
}
const RWLOCK_WRITING: isize = -1;
const RWLOCK_FREE: isize = 0;
unsafe impl Send for RwLock {}
unsafe impl Sync for RwLock {}
@ -21,52 +24,51 @@ impl RwLock {
#[inline]
#[rustc_const_stable(feature = "const_locks", since = "1.63.0")]
pub const fn new() -> RwLock {
RwLock { mode: AtomicIsize::new(0) }
RwLock { mode: AtomicIsize::new(RWLOCK_FREE) }
}
#[inline]
pub unsafe fn read(&self) {
while !unsafe { self.try_read() } {
do_yield();
yield_now();
}
}
#[inline]
pub unsafe fn try_read(&self) -> bool {
// Non-atomically determine the current value.
let current = self.mode.load(SeqCst);
// If it's currently locked for writing, then we cannot read.
if current < 0 {
return false;
}
// Attempt to lock. If the `current` value has changed, then this
// operation will fail and we will not obtain the lock even if we
// could potentially keep it.
let new = current + 1;
self.mode.compare_exchange(current, new, SeqCst, SeqCst).is_ok()
self.mode
.fetch_update(
Acquire,
Acquire,
|v| if v == RWLOCK_WRITING { None } else { Some(v + 1) },
)
.is_ok()
}
#[inline]
pub unsafe fn write(&self) {
while !unsafe { self.try_write() } {
do_yield();
yield_now();
}
}
#[inline]
pub unsafe fn try_write(&self) -> bool {
self.mode.compare_exchange(0, -1, SeqCst, SeqCst).is_ok()
self.mode.compare_exchange(RWLOCK_FREE, RWLOCK_WRITING, Acquire, Acquire).is_ok()
}
#[inline]
pub unsafe fn read_unlock(&self) {
self.mode.fetch_sub(1, SeqCst);
let previous = self.mode.fetch_sub(1, Acquire);
assert!(previous != RWLOCK_FREE);
assert!(previous != RWLOCK_WRITING);
}
#[inline]
pub unsafe fn write_unlock(&self) {
assert_eq!(self.mode.compare_exchange(-1, 0, SeqCst, SeqCst), Ok(-1));
assert_eq!(
self.mode.compare_exchange(RWLOCK_WRITING, RWLOCK_FREE, Acquire, Acquire),
Ok(RWLOCK_WRITING)
);
}
}

View File

@ -12,10 +12,7 @@ pub mod fs;
#[path = "../unsupported/io.rs"]
pub mod io;
pub mod locks;
#[path = "../unsupported/net.rs"]
pub mod net;
#[path = "../unsupported/once.rs"]
pub mod once;
pub mod os;
#[path = "../unix/path.rs"]
pub mod path;

View File

@ -0,0 +1,127 @@
use crate::io;
use crate::net::{Ipv4Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
use crate::os::xous::ffi::lend_mut;
use crate::os::xous::services::{dns_server, DnsLendMut};
use core::convert::{TryFrom, TryInto};
pub struct DnsError {
pub code: u8,
}
#[repr(C, align(4096))]
struct LookupHostQuery([u8; 4096]);
pub struct LookupHost {
data: LookupHostQuery,
port: u16,
offset: usize,
count: usize,
}
impl LookupHost {
pub fn port(&self) -> u16 {
self.port
}
}
impl Iterator for LookupHost {
type Item = SocketAddr;
fn next(&mut self) -> Option<SocketAddr> {
if self.offset >= self.data.0.len() {
return None;
}
match self.data.0.get(self.offset) {
Some(&4) => {
self.offset += 1;
if self.offset + 4 > self.data.0.len() {
return None;
}
let result = Some(SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(
self.data.0[self.offset],
self.data.0[self.offset + 1],
self.data.0[self.offset + 2],
self.data.0[self.offset + 3],
),
self.port,
)));
self.offset += 4;
result
}
Some(&6) => {
self.offset += 1;
if self.offset + 16 > self.data.0.len() {
return None;
}
let mut new_addr = [0u8; 16];
for (src, octet) in self.data.0[(self.offset + 1)..(self.offset + 16 + 1)]
.iter()
.zip(new_addr.iter_mut())
{
*octet = *src;
}
let result =
Some(SocketAddr::V6(SocketAddrV6::new(new_addr.into(), self.port, 0, 0)));
self.offset += 16;
result
}
_ => None,
}
}
}
pub fn lookup(query: &str, port: u16) -> Result<LookupHost, DnsError> {
let mut result = LookupHost { data: LookupHostQuery([0u8; 4096]), offset: 0, count: 0, port };
// Copy the query into the message that gets sent to the DNS server
for (query_byte, result_byte) in query.as_bytes().iter().zip(result.data.0.iter_mut()) {
*result_byte = *query_byte;
}
lend_mut(
dns_server(),
DnsLendMut::RawLookup.into(),
&mut result.data.0,
0,
query.as_bytes().len(),
)
.unwrap();
if result.data.0[0] != 0 {
return Err(DnsError { code: result.data.0[1] });
}
assert_eq!(result.offset, 0);
result.count = result.data.0[1] as usize;
// Advance the offset to the first record
result.offset = 2;
Ok(result)
}
impl TryFrom<&str> for LookupHost {
type Error = io::Error;
fn try_from(s: &str) -> io::Result<LookupHost> {
macro_rules! try_opt {
($e:expr, $msg:expr) => {
match $e {
Some(r) => r,
None => return Err(io::const_io_error!(io::ErrorKind::InvalidInput, &$msg)),
}
};
}
// split the string by ':' and convert the second part to u16
let (host, port_str) = try_opt!(s.rsplit_once(':'), "invalid socket address");
let port: u16 = try_opt!(port_str.parse().ok(), "invalid port value");
(host, port).try_into()
}
}
impl TryFrom<(&str, u16)> for LookupHost {
type Error = io::Error;
fn try_from(v: (&str, u16)) -> io::Result<LookupHost> {
lookup(v.0, v.1)
.map_err(|_e| io::const_io_error!(io::ErrorKind::InvalidInput, &"DNS failure"))
}
}

View File

@ -0,0 +1,84 @@
mod dns;
mod tcpstream;
pub use tcpstream::*;
mod tcplistener;
pub use tcplistener::*;
mod udp;
pub use udp::*;
// this structure needs to be synchronized with what's in net/src/api.rs
#[repr(C)]
#[derive(Debug)]
enum NetError {
// Ok = 0,
Unaddressable = 1,
SocketInUse = 2,
// AccessDenied = 3,
Invalid = 4,
// Finished = 5,
LibraryError = 6,
// AlreadyUsed = 7,
TimedOut = 8,
WouldBlock = 9,
}
#[repr(C, align(4096))]
struct ConnectRequest {
raw: [u8; 4096],
}
#[repr(C, align(4096))]
struct SendData {
raw: [u8; 4096],
}
#[repr(C, align(4096))]
pub struct ReceiveData {
raw: [u8; 4096],
}
#[repr(C, align(4096))]
pub struct GetAddress {
raw: [u8; 4096],
}
pub use dns::LookupHost;
#[allow(nonstandard_style)]
pub mod netc {
pub const AF_INET: u8 = 0;
pub const AF_INET6: u8 = 1;
pub type sa_family_t = u8;
#[derive(Copy, Clone)]
pub struct in_addr {
pub s_addr: u32,
}
#[derive(Copy, Clone)]
pub struct sockaddr_in {
pub sin_family: sa_family_t,
pub sin_port: u16,
pub sin_addr: in_addr,
}
#[derive(Copy, Clone)]
pub struct in6_addr {
pub s6_addr: [u8; 16],
}
#[derive(Copy, Clone)]
pub struct sockaddr_in6 {
pub sin6_family: sa_family_t,
pub sin6_port: u16,
pub sin6_addr: in6_addr,
pub sin6_flowinfo: u32,
pub sin6_scope_id: u32,
}
#[derive(Copy, Clone)]
pub struct sockaddr {}
}

View File

@ -0,0 +1,247 @@
use super::*;
use crate::fmt;
use crate::io;
use crate::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use crate::os::xous::services;
use crate::sync::Arc;
use core::convert::TryInto;
use core::sync::atomic::{AtomicBool, AtomicU16, AtomicUsize, Ordering};
macro_rules! unimpl {
() => {
return Err(io::const_io_error!(
io::ErrorKind::Unsupported,
&"This function is not yet implemented",
));
};
}
#[derive(Clone)]
pub struct TcpListener {
fd: Arc<AtomicU16>,
local: SocketAddr,
handle_count: Arc<AtomicUsize>,
nonblocking: Arc<AtomicBool>,
}
impl TcpListener {
pub fn bind(socketaddr: io::Result<&SocketAddr>) -> io::Result<TcpListener> {
let mut addr = *socketaddr?;
let fd = TcpListener::bind_inner(&mut addr)?;
return Ok(TcpListener {
fd: Arc::new(AtomicU16::new(fd)),
local: addr,
handle_count: Arc::new(AtomicUsize::new(1)),
nonblocking: Arc::new(AtomicBool::new(false)),
});
}
/// This returns the raw fd of a Listener, so that it can also be used by the
/// accept routine to replenish the Listener object after its handle has been converted into
/// a TcpStream object.
fn bind_inner(addr: &mut SocketAddr) -> io::Result<u16> {
// Construct the request
let mut connect_request = ConnectRequest { raw: [0u8; 4096] };
// Serialize the StdUdpBind structure. This is done "manually" because we don't want to
// make an auto-serdes (like bincode or rkyv) crate a dependency of Xous.
let port_bytes = addr.port().to_le_bytes();
connect_request.raw[0] = port_bytes[0];
connect_request.raw[1] = port_bytes[1];
match addr.ip() {
IpAddr::V4(addr) => {
connect_request.raw[2] = 4;
for (dest, src) in connect_request.raw[3..].iter_mut().zip(addr.octets()) {
*dest = src;
}
}
IpAddr::V6(addr) => {
connect_request.raw[2] = 6;
for (dest, src) in connect_request.raw[3..].iter_mut().zip(addr.octets()) {
*dest = src;
}
}
}
let Ok((_, valid)) = crate::os::xous::ffi::lend_mut(
services::net_server(),
services::NetLendMut::StdTcpListen.into(),
&mut connect_request.raw,
0,
4096,
) else {
return Err(io::const_io_error!(io::ErrorKind::InvalidInput, &"Invalid response"));
};
// The first four bytes should be zero upon success, and will be nonzero
// for an error.
let response = connect_request.raw;
if response[0] != 0 || valid == 0 {
let errcode = response[1];
if errcode == NetError::SocketInUse as u8 {
return Err(io::const_io_error!(io::ErrorKind::ResourceBusy, &"Socket in use"));
} else if errcode == NetError::Invalid as u8 {
return Err(io::const_io_error!(
io::ErrorKind::AddrNotAvailable,
&"Invalid address"
));
} else if errcode == NetError::LibraryError as u8 {
return Err(io::const_io_error!(io::ErrorKind::Other, &"Library error"));
} else {
return Err(io::const_io_error!(
io::ErrorKind::Other,
&"Unable to connect or internal error"
));
}
}
let fd = response[1] as usize;
if addr.port() == 0 {
// oddly enough, this is a valid port and it means "give me something valid, up to you what that is"
let assigned_port = u16::from_le_bytes(response[2..4].try_into().unwrap());
addr.set_port(assigned_port);
}
// println!("TcpListening with file handle of {}\r\n", fd);
Ok(fd.try_into().unwrap())
}
pub fn socket_addr(&self) -> io::Result<SocketAddr> {
Ok(self.local)
}
pub fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
let mut receive_request = ReceiveData { raw: [0u8; 4096] };
if self.nonblocking.load(Ordering::Relaxed) {
// nonblocking
receive_request.raw[0] = 0;
} else {
// blocking
receive_request.raw[0] = 1;
}
if let Ok((_offset, _valid)) = crate::os::xous::ffi::lend_mut(
services::net_server(),
services::NetLendMut::StdTcpAccept(self.fd.load(Ordering::Relaxed)).into(),
&mut receive_request.raw,
0,
0,
) {
if receive_request.raw[0] != 0 {
// error case
if receive_request.raw[1] == NetError::TimedOut as u8 {
return Err(io::const_io_error!(io::ErrorKind::TimedOut, &"accept timed out",));
} else if receive_request.raw[1] == NetError::WouldBlock as u8 {
return Err(io::const_io_error!(
io::ErrorKind::WouldBlock,
&"accept would block",
));
} else if receive_request.raw[1] == NetError::LibraryError as u8 {
return Err(io::const_io_error!(io::ErrorKind::Other, &"Library error"));
} else {
return Err(io::const_io_error!(io::ErrorKind::Other, &"library error",));
}
} else {
// accept successful
let rr = &receive_request.raw;
let stream_fd = u16::from_le_bytes(rr[1..3].try_into().unwrap());
let port = u16::from_le_bytes(rr[20..22].try_into().unwrap());
let addr = if rr[3] == 4 {
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(rr[4], rr[5], rr[6], rr[7])), port)
} else if rr[3] == 6 {
SocketAddr::new(
IpAddr::V6(Ipv6Addr::new(
u16::from_be_bytes(rr[4..6].try_into().unwrap()),
u16::from_be_bytes(rr[6..8].try_into().unwrap()),
u16::from_be_bytes(rr[8..10].try_into().unwrap()),
u16::from_be_bytes(rr[10..12].try_into().unwrap()),
u16::from_be_bytes(rr[12..14].try_into().unwrap()),
u16::from_be_bytes(rr[14..16].try_into().unwrap()),
u16::from_be_bytes(rr[16..18].try_into().unwrap()),
u16::from_be_bytes(rr[18..20].try_into().unwrap()),
)),
port,
)
} else {
return Err(io::const_io_error!(io::ErrorKind::Other, &"library error",));
};
// replenish the listener
let mut local_copy = self.local.clone(); // port is non-0 by this time, but the method signature needs a mut
let new_fd = TcpListener::bind_inner(&mut local_copy)?;
self.fd.store(new_fd, Ordering::Relaxed);
// now return a stream converted from the old stream's fd
Ok((TcpStream::from_listener(stream_fd, self.local.port(), port, addr), addr))
}
} else {
Err(io::const_io_error!(io::ErrorKind::InvalidInput, &"Unable to accept"))
}
}
pub fn duplicate(&self) -> io::Result<TcpListener> {
self.handle_count.fetch_add(1, Ordering::Relaxed);
Ok(self.clone())
}
pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
if ttl > 255 {
return Err(io::Error::new(io::ErrorKind::InvalidInput, "TTL must be less than 256"));
}
crate::os::xous::ffi::blocking_scalar(
services::net_server(),
services::NetBlockingScalar::StdSetTtlTcp(self.fd.load(Ordering::Relaxed), ttl).into(),
)
.or(Err(io::const_io_error!(io::ErrorKind::InvalidInput, &"Unexpected return value")))
.map(|_| ())
}
pub fn ttl(&self) -> io::Result<u32> {
Ok(crate::os::xous::ffi::blocking_scalar(
services::net_server(),
services::NetBlockingScalar::StdGetTtlTcp(self.fd.load(Ordering::Relaxed)).into(),
)
.or(Err(io::const_io_error!(io::ErrorKind::InvalidInput, &"Unexpected return value")))
.map(|res| res[0] as _)?)
}
pub fn set_only_v6(&self, _: bool) -> io::Result<()> {
unimpl!();
}
pub fn only_v6(&self) -> io::Result<bool> {
unimpl!();
}
pub fn take_error(&self) -> io::Result<Option<io::Error>> {
// this call doesn't have a meaning on our platform, but we can at least not panic if it's used.
Ok(None)
}
pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
self.nonblocking.store(nonblocking, Ordering::Relaxed);
Ok(())
}
}
impl fmt::Debug for TcpListener {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "TCP listening on {:?}", self.local)
}
}
impl Drop for TcpListener {
fn drop(&mut self) {
if self.handle_count.fetch_sub(1, Ordering::Relaxed) == 1 {
// only drop if we're the last clone
crate::os::xous::ffi::blocking_scalar(
services::net_server(),
crate::os::xous::services::NetBlockingScalar::StdTcpClose(
self.fd.load(Ordering::Relaxed),
)
.into(),
)
.unwrap();
}
}
}

View File

@ -0,0 +1,435 @@
use super::*;
use crate::fmt;
use crate::io::{self, BorrowedCursor, IoSlice, IoSliceMut};
use crate::net::{IpAddr, Ipv4Addr, Shutdown, SocketAddr, SocketAddrV4, SocketAddrV6};
use crate::os::xous::services;
use crate::sync::Arc;
use crate::time::Duration;
use core::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering};
macro_rules! unimpl {
() => {
return Err(io::const_io_error!(
io::ErrorKind::Unsupported,
&"This function is not yet implemented",
));
};
}
enum ReadOrPeek {
Read,
Peek,
}
#[derive(Clone)]
pub struct TcpStream {
fd: u16,
local_port: u16,
remote_port: u16,
peer_addr: SocketAddr,
// milliseconds
read_timeout: Arc<AtomicU32>,
// milliseconds
write_timeout: Arc<AtomicU32>,
handle_count: Arc<AtomicUsize>,
nonblocking: Arc<AtomicBool>,
}
fn sockaddr_to_buf(duration: Duration, addr: &SocketAddr, buf: &mut [u8]) {
// Construct the request.
let port_bytes = addr.port().to_le_bytes();
buf[0] = port_bytes[0];
buf[1] = port_bytes[1];
for (dest, src) in buf[2..].iter_mut().zip((duration.as_millis() as u64).to_le_bytes()) {
*dest = src;
}
match addr.ip() {
IpAddr::V4(addr) => {
buf[10] = 4;
for (dest, src) in buf[11..].iter_mut().zip(addr.octets()) {
*dest = src;
}
}
IpAddr::V6(addr) => {
buf[10] = 6;
for (dest, src) in buf[11..].iter_mut().zip(addr.octets()) {
*dest = src;
}
}
}
}
impl TcpStream {
pub(crate) fn from_listener(
fd: u16,
local_port: u16,
remote_port: u16,
peer_addr: SocketAddr,
) -> TcpStream {
TcpStream {
fd,
local_port,
remote_port,
peer_addr,
read_timeout: Arc::new(AtomicU32::new(0)),
write_timeout: Arc::new(AtomicU32::new(0)),
handle_count: Arc::new(AtomicUsize::new(1)),
nonblocking: Arc::new(AtomicBool::new(false)),
}
}
pub fn connect(socketaddr: io::Result<&SocketAddr>) -> io::Result<TcpStream> {
Self::connect_timeout(socketaddr?, Duration::ZERO)
}
pub fn connect_timeout(addr: &SocketAddr, duration: Duration) -> io::Result<TcpStream> {
let mut connect_request = ConnectRequest { raw: [0u8; 4096] };
// Construct the request.
sockaddr_to_buf(duration, &addr, &mut connect_request.raw);
let Ok((_, valid)) = crate::os::xous::ffi::lend_mut(
services::net_server(),
services::NetLendMut::StdTcpConnect.into(),
&mut connect_request.raw,
0,
4096,
) else {
return Err(io::const_io_error!(io::ErrorKind::InvalidInput, &"Invalid response"));
};
// The first four bytes should be zero upon success, and will be nonzero
// for an error.
let response = connect_request.raw;
if response[0] != 0 || valid == 0 {
// errcode is a u8 but stuck in a u16 where the upper byte is invalid. Mask & decode accordingly.
let errcode = response[0];
if errcode == NetError::SocketInUse as u8 {
return Err(io::const_io_error!(io::ErrorKind::ResourceBusy, &"Socket in use",));
} else if errcode == NetError::Unaddressable as u8 {
return Err(io::const_io_error!(
io::ErrorKind::AddrNotAvailable,
&"Invalid address",
));
} else {
return Err(io::const_io_error!(
io::ErrorKind::InvalidInput,
&"Unable to connect or internal error",
));
}
}
let fd = u16::from_le_bytes([response[2], response[3]]);
let local_port = u16::from_le_bytes([response[4], response[5]]);
let remote_port = u16::from_le_bytes([response[6], response[7]]);
// println!(
// "Connected with local port of {}, remote port of {}, file handle of {}",
// local_port, remote_port, fd
// );
Ok(TcpStream {
fd,
local_port,
remote_port,
peer_addr: *addr,
read_timeout: Arc::new(AtomicU32::new(0)),
write_timeout: Arc::new(AtomicU32::new(0)),
handle_count: Arc::new(AtomicUsize::new(1)),
nonblocking: Arc::new(AtomicBool::new(false)),
})
}
pub fn set_read_timeout(&self, timeout: Option<Duration>) -> io::Result<()> {
if let Some(to) = timeout {
if to.is_zero() {
return Err(io::const_io_error!(
io::ErrorKind::InvalidInput,
&"Zero is an invalid timeout",
));
}
}
self.read_timeout.store(
timeout.map(|t| t.as_millis().min(u32::MAX as u128) as u32).unwrap_or_default(),
Ordering::Relaxed,
);
Ok(())
}
pub fn set_write_timeout(&self, timeout: Option<Duration>) -> io::Result<()> {
if let Some(to) = timeout {
if to.is_zero() {
return Err(io::const_io_error!(
io::ErrorKind::InvalidInput,
&"Zero is an invalid timeout",
));
}
}
self.write_timeout.store(
timeout.map(|t| t.as_millis().min(u32::MAX as u128) as u32).unwrap_or_default(),
Ordering::Relaxed,
);
Ok(())
}
pub fn read_timeout(&self) -> io::Result<Option<Duration>> {
match self.read_timeout.load(Ordering::Relaxed) {
0 => Ok(None),
t => Ok(Some(Duration::from_millis(t as u64))),
}
}
pub fn write_timeout(&self) -> io::Result<Option<Duration>> {
match self.write_timeout.load(Ordering::Relaxed) {
0 => Ok(None),
t => Ok(Some(Duration::from_millis(t as u64))),
}
}
fn read_or_peek(&self, buf: &mut [u8], op: ReadOrPeek) -> io::Result<usize> {
let mut receive_request = ReceiveData { raw: [0u8; 4096] };
let data_to_read = buf.len().min(receive_request.raw.len());
let opcode = match op {
ReadOrPeek::Read => {
services::NetLendMut::StdTcpRx(self.fd, self.nonblocking.load(Ordering::Relaxed))
}
ReadOrPeek::Peek => {
services::NetLendMut::StdTcpPeek(self.fd, self.nonblocking.load(Ordering::Relaxed))
}
};
let Ok((offset, length)) = crate::os::xous::ffi::lend_mut(
services::net_server(),
opcode.into(),
&mut receive_request.raw,
// Reuse the `offset` as the read timeout
self.read_timeout.load(Ordering::Relaxed) as usize,
data_to_read,
) else {
return Err(io::const_io_error!(
io::ErrorKind::InvalidInput,
&"Library failure: wrong message type or messaging error"
));
};
if offset != 0 {
for (dest, src) in buf.iter_mut().zip(receive_request.raw[..length].iter()) {
*dest = *src;
}
Ok(length)
} else {
let result = receive_request.raw;
if result[0] != 0 {
if result[1] == 8 {
// timed out
return Err(io::const_io_error!(io::ErrorKind::TimedOut, &"Timeout",));
}
if result[1] == 9 {
// would block
return Err(io::const_io_error!(io::ErrorKind::WouldBlock, &"Would block",));
}
}
Err(io::const_io_error!(io::ErrorKind::Other, &"recv_slice failure"))
}
}
pub fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
self.read_or_peek(buf, ReadOrPeek::Peek)
}
pub fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
self.read_or_peek(buf, ReadOrPeek::Read)
}
pub fn read_vectored(&self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
crate::io::default_read_vectored(|b| self.read(b), bufs)
}
pub fn read_buf(&self, cursor: BorrowedCursor<'_>) -> io::Result<()> {
crate::io::default_read_buf(|buf| self.read(buf), cursor)
}
pub fn is_read_vectored(&self) -> bool {
false
}
pub fn write(&self, buf: &[u8]) -> io::Result<usize> {
let mut send_request = SendData { raw: [0u8; 4096] };
for (dest, src) in send_request.raw.iter_mut().zip(buf) {
*dest = *src;
}
let buf_len = send_request.raw.len().min(buf.len());
let (_offset, _valid) = crate::os::xous::ffi::lend_mut(
services::net_server(),
services::NetLendMut::StdTcpTx(self.fd).into(),
&mut send_request.raw,
// Reuse the offset as the timeout
self.write_timeout.load(Ordering::Relaxed) as usize,
buf_len,
)
.or(Err(io::const_io_error!(io::ErrorKind::InvalidInput, &"Internal error")))?;
if send_request.raw[0] != 0 {
if send_request.raw[4] == 8 {
// timed out
return Err(io::const_io_error!(
io::ErrorKind::BrokenPipe,
&"Timeout or connection closed",
));
} else if send_request.raw[4] == 9 {
// would block
return Err(io::const_io_error!(io::ErrorKind::WouldBlock, &"Would block",));
} else {
return Err(io::const_io_error!(
io::ErrorKind::InvalidInput,
&"Error when sending",
));
}
}
Ok(u32::from_le_bytes([
send_request.raw[4],
send_request.raw[5],
send_request.raw[6],
send_request.raw[7],
]) as usize)
}
pub fn write_vectored(&self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
crate::io::default_write_vectored(|b| self.write(b), bufs)
}
pub fn is_write_vectored(&self) -> bool {
false
}
pub fn peer_addr(&self) -> io::Result<SocketAddr> {
Ok(self.peer_addr)
}
pub fn socket_addr(&self) -> io::Result<SocketAddr> {
let mut get_addr = GetAddress { raw: [0u8; 4096] };
let Ok((_offset, _valid)) = crate::os::xous::ffi::lend_mut(
services::net_server(),
services::NetLendMut::StdGetAddress(self.fd).into(),
&mut get_addr.raw,
0,
0,
) else {
return Err(io::const_io_error!(io::ErrorKind::InvalidInput, &"Internal error"));
};
let mut i = get_addr.raw.iter();
match *i.next().unwrap() {
4 => Ok(SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(
*i.next().unwrap(),
*i.next().unwrap(),
*i.next().unwrap(),
*i.next().unwrap(),
),
self.local_port,
))),
6 => {
let mut new_addr = [0u8; 16];
for (src, octet) in i.zip(new_addr.iter_mut()) {
*octet = *src;
}
Ok(SocketAddr::V6(SocketAddrV6::new(new_addr.into(), self.local_port, 0, 0)))
}
_ => Err(io::const_io_error!(io::ErrorKind::InvalidInput, &"Internal error")),
}
}
pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
crate::os::xous::ffi::blocking_scalar(
services::net_server(),
services::NetBlockingScalar::StdTcpStreamShutdown(self.fd, how).into(),
)
.or(Err(io::const_io_error!(io::ErrorKind::InvalidInput, &"Unexpected return value")))
.map(|_| ())
}
pub fn duplicate(&self) -> io::Result<TcpStream> {
self.handle_count.fetch_add(1, Ordering::Relaxed);
Ok(self.clone())
}
pub fn set_linger(&self, _: Option<Duration>) -> io::Result<()> {
unimpl!();
}
pub fn linger(&self) -> io::Result<Option<Duration>> {
unimpl!();
}
pub fn set_nodelay(&self, enabled: bool) -> io::Result<()> {
crate::os::xous::ffi::blocking_scalar(
services::net_server(),
services::NetBlockingScalar::StdSetNodelay(self.fd, enabled).into(),
)
.or(Err(io::const_io_error!(io::ErrorKind::InvalidInput, &"Unexpected return value")))
.map(|_| ())
}
pub fn nodelay(&self) -> io::Result<bool> {
Ok(crate::os::xous::ffi::blocking_scalar(
services::net_server(),
services::NetBlockingScalar::StdGetNodelay(self.fd).into(),
)
.or(Err(io::const_io_error!(io::ErrorKind::InvalidInput, &"Unexpected return value")))
.map(|res| res[0] != 0)?)
}
pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
if ttl > 255 {
return Err(io::Error::new(io::ErrorKind::InvalidInput, "TTL must be less than 256"));
}
crate::os::xous::ffi::blocking_scalar(
services::net_server(),
services::NetBlockingScalar::StdSetTtlTcp(self.fd, ttl).into(),
)
.or(Err(io::const_io_error!(io::ErrorKind::InvalidInput, &"Unexpected return value")))
.map(|_| ())
}
pub fn ttl(&self) -> io::Result<u32> {
Ok(crate::os::xous::ffi::blocking_scalar(
services::net_server(),
services::NetBlockingScalar::StdGetTtlTcp(self.fd).into(),
)
.or(Err(io::const_io_error!(io::ErrorKind::InvalidInput, &"Unexpected return value")))
.map(|res| res[0] as _)?)
}
pub fn take_error(&self) -> io::Result<Option<io::Error>> {
// this call doesn't have a meaning on our platform, but we can at least not panic if it's used.
Ok(None)
}
pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
self.nonblocking.store(nonblocking, Ordering::SeqCst);
Ok(())
}
}
impl fmt::Debug for TcpStream {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"TCP connection to {:?} port {} to local port {}",
self.peer_addr, self.remote_port, self.local_port
)
}
}
impl Drop for TcpStream {
fn drop(&mut self) {
if self.handle_count.fetch_sub(1, Ordering::Relaxed) == 1 {
// only drop if we're the last clone
crate::os::xous::ffi::blocking_scalar(
services::net_server(),
services::NetBlockingScalar::StdTcpClose(self.fd).into(),
)
.unwrap();
}
}
}

View File

@ -0,0 +1,471 @@
use super::*;
use crate::cell::Cell;
use crate::fmt;
use crate::io;
use crate::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use crate::os::xous::services;
use crate::sync::Arc;
use crate::time::Duration;
use core::convert::TryInto;
use core::sync::atomic::{AtomicUsize, Ordering};
macro_rules! unimpl {
() => {
return Err(io::const_io_error!(
io::ErrorKind::Unsupported,
&"This function is not yet implemented",
));
};
}
#[derive(Clone)]
pub struct UdpSocket {
fd: u16,
local: SocketAddr,
remote: Cell<Option<SocketAddr>>,
// in milliseconds. The setting applies only to `recv` calls after the timeout is set.
read_timeout: Cell<u64>,
// in milliseconds. The setting applies only to `send` calls after the timeout is set.
write_timeout: Cell<u64>,
handle_count: Arc<AtomicUsize>,
nonblocking: Cell<bool>,
}
impl UdpSocket {
pub fn bind(socketaddr: io::Result<&SocketAddr>) -> io::Result<UdpSocket> {
let addr = socketaddr?;
// Construct the request
let mut connect_request = ConnectRequest { raw: [0u8; 4096] };
// Serialize the StdUdpBind structure. This is done "manually" because we don't want to
// make an auto-serdes (like bincode or rkyv) crate a dependency of Xous.
let port_bytes = addr.port().to_le_bytes();
connect_request.raw[0] = port_bytes[0];
connect_request.raw[1] = port_bytes[1];
match addr.ip() {
IpAddr::V4(addr) => {
connect_request.raw[2] = 4;
for (dest, src) in connect_request.raw[3..].iter_mut().zip(addr.octets()) {
*dest = src;
}
}
IpAddr::V6(addr) => {
connect_request.raw[2] = 6;
for (dest, src) in connect_request.raw[3..].iter_mut().zip(addr.octets()) {
*dest = src;
}
}
}
let response = crate::os::xous::ffi::lend_mut(
services::net_server(),
services::NetLendMut::StdUdpBind.into(),
&mut connect_request.raw,
0,
4096,
);
if let Ok((_, valid)) = response {
// The first four bytes should be zero upon success, and will be nonzero
// for an error.
let response = connect_request.raw;
if response[0] != 0 || valid == 0 {
let errcode = response[1];
if errcode == NetError::SocketInUse as u8 {
return Err(io::const_io_error!(io::ErrorKind::ResourceBusy, &"Socket in use"));
} else if errcode == NetError::Invalid as u8 {
return Err(io::const_io_error!(
io::ErrorKind::InvalidInput,
&"Port can't be 0 or invalid address"
));
} else if errcode == NetError::LibraryError as u8 {
return Err(io::const_io_error!(io::ErrorKind::Other, &"Library error"));
} else {
return Err(io::const_io_error!(
io::ErrorKind::Other,
&"Unable to connect or internal error"
));
}
}
let fd = response[1] as u16;
return Ok(UdpSocket {
fd,
local: *addr,
remote: Cell::new(None),
read_timeout: Cell::new(0),
write_timeout: Cell::new(0),
handle_count: Arc::new(AtomicUsize::new(1)),
nonblocking: Cell::new(false),
});
}
Err(io::const_io_error!(io::ErrorKind::InvalidInput, &"Invalid response"))
}
pub fn peer_addr(&self) -> io::Result<SocketAddr> {
match self.remote.get() {
Some(dest) => Ok(dest),
None => Err(io::const_io_error!(io::ErrorKind::NotConnected, &"No peer specified")),
}
}
pub fn socket_addr(&self) -> io::Result<SocketAddr> {
Ok(self.local)
}
fn recv_inner(&self, buf: &mut [u8], do_peek: bool) -> io::Result<(usize, SocketAddr)> {
let mut receive_request = ReceiveData { raw: [0u8; 4096] };
if self.nonblocking.get() {
// nonblocking
receive_request.raw[0] = 0;
} else {
// blocking
receive_request.raw[0] = 1;
for (&s, d) in self
.read_timeout
.get()
.to_le_bytes()
.iter()
.zip(receive_request.raw[1..9].iter_mut())
{
*d = s;
}
}
if let Ok((_offset, _valid)) = crate::os::xous::ffi::lend_mut(
services::net_server(),
services::NetLendMut::StdUdpRx(self.fd).into(),
&mut receive_request.raw,
if do_peek { 1 } else { 0 },
0,
) {
if receive_request.raw[0] != 0 {
// error case
if receive_request.raw[1] == NetError::TimedOut as u8 {
return Err(io::const_io_error!(io::ErrorKind::TimedOut, &"recv timed out",));
} else if receive_request.raw[1] == NetError::WouldBlock as u8 {
return Err(io::const_io_error!(
io::ErrorKind::WouldBlock,
&"recv would block",
));
} else if receive_request.raw[1] == NetError::LibraryError as u8 {
return Err(io::const_io_error!(io::ErrorKind::Other, &"Library error"));
} else {
return Err(io::const_io_error!(io::ErrorKind::Other, &"library error",));
}
} else {
let rr = &receive_request.raw;
let rxlen = u16::from_le_bytes(rr[1..3].try_into().unwrap());
let port = u16::from_le_bytes(rr[20..22].try_into().unwrap());
let addr = if rr[3] == 4 {
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(rr[4], rr[5], rr[6], rr[7])), port)
} else if rr[3] == 6 {
SocketAddr::new(
IpAddr::V6(Ipv6Addr::new(
u16::from_be_bytes(rr[4..6].try_into().unwrap()),
u16::from_be_bytes(rr[6..8].try_into().unwrap()),
u16::from_be_bytes(rr[8..10].try_into().unwrap()),
u16::from_be_bytes(rr[10..12].try_into().unwrap()),
u16::from_be_bytes(rr[12..14].try_into().unwrap()),
u16::from_be_bytes(rr[14..16].try_into().unwrap()),
u16::from_be_bytes(rr[16..18].try_into().unwrap()),
u16::from_be_bytes(rr[18..20].try_into().unwrap()),
)),
port,
)
} else {
return Err(io::const_io_error!(io::ErrorKind::Other, &"library error",));
};
for (&s, d) in rr[22..22 + rxlen as usize].iter().zip(buf.iter_mut()) {
*d = s;
}
Ok((rxlen as usize, addr))
}
} else {
Err(io::const_io_error!(io::ErrorKind::InvalidInput, &"Unable to recv"))
}
}
pub fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
self.recv_inner(buf, false)
}
pub fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
self.recv_from(buf).map(|(len, _addr)| len)
}
pub fn peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
self.recv_inner(buf, true)
}
pub fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
self.peek_from(buf).map(|(len, _addr)| len)
}
pub fn connect(&self, maybe_addr: io::Result<&SocketAddr>) -> io::Result<()> {
let addr = maybe_addr?;
self.remote.set(Some(*addr));
Ok(())
}
pub fn send(&self, buf: &[u8]) -> io::Result<usize> {
if let Some(addr) = self.remote.get() {
self.send_to(buf, &addr)
} else {
Err(io::const_io_error!(io::ErrorKind::NotConnected, &"No remote specified"))
}
}
pub fn send_to(&self, buf: &[u8], addr: &SocketAddr) -> io::Result<usize> {
let mut tx_req = SendData { raw: [0u8; 4096] };
// Construct the request.
let port_bytes = addr.port().to_le_bytes();
tx_req.raw[0] = port_bytes[0];
tx_req.raw[1] = port_bytes[1];
match addr.ip() {
IpAddr::V4(addr) => {
tx_req.raw[2] = 4;
for (dest, src) in tx_req.raw[3..].iter_mut().zip(addr.octets()) {
*dest = src;
}
}
IpAddr::V6(addr) => {
tx_req.raw[2] = 6;
for (dest, src) in tx_req.raw[3..].iter_mut().zip(addr.octets()) {
*dest = src;
}
}
}
let len = buf.len() as u16;
let len_bytes = len.to_le_bytes();
tx_req.raw[19] = len_bytes[0];
tx_req.raw[20] = len_bytes[1];
for (&s, d) in buf.iter().zip(tx_req.raw[21..].iter_mut()) {
*d = s;
}
// let buf = unsafe {
// xous::MemoryRange::new(
// &mut tx_req as *mut SendData as usize,
// core::mem::size_of::<SendData>(),
// )
// .unwrap()
// };
// write time-outs are implemented on the caller side. Basically, if the Net crate server
// is too busy to take the call immediately: retry, until the timeout is reached.
let now = crate::time::Instant::now();
let write_timeout = if self.nonblocking.get() {
// nonblocking
core::time::Duration::ZERO
} else {
// blocking
if self.write_timeout.get() == 0 {
// forever
core::time::Duration::from_millis(u64::MAX)
} else {
// or this amount of time
core::time::Duration::from_millis(self.write_timeout.get())
}
};
loop {
let response = crate::os::xous::ffi::try_lend_mut(
services::net_server(),
services::NetLendMut::StdUdpTx(self.fd).into(),
&mut tx_req.raw,
0,
4096,
);
match response {
Ok((_, valid)) => {
let response = &tx_req.raw;
if response[0] != 0 || valid == 0 {
let errcode = response[1];
if errcode == NetError::SocketInUse as u8 {
return Err(io::const_io_error!(
io::ErrorKind::ResourceBusy,
&"Socket in use"
));
} else if errcode == NetError::Invalid as u8 {
return Err(io::const_io_error!(
io::ErrorKind::InvalidInput,
&"Socket not valid"
));
} else if errcode == NetError::LibraryError as u8 {
return Err(io::const_io_error!(
io::ErrorKind::Other,
&"Library error"
));
} else {
return Err(io::const_io_error!(
io::ErrorKind::Other,
&"Unable to connect"
));
}
} else {
// no error
return Ok(len as usize);
}
}
Err(crate::os::xous::ffi::Error::ServerQueueFull) => {
if now.elapsed() >= write_timeout {
return Err(io::const_io_error!(
io::ErrorKind::WouldBlock,
&"Write timed out"
));
} else {
// question: do we want to do something a bit more gentle than immediately retrying?
crate::thread::yield_now();
}
}
_ => return Err(io::const_io_error!(io::ErrorKind::Other, &"Library error")),
}
}
}
pub fn duplicate(&self) -> io::Result<UdpSocket> {
self.handle_count.fetch_add(1, Ordering::Relaxed);
Ok(self.clone())
}
pub fn set_read_timeout(&self, timeout: Option<Duration>) -> io::Result<()> {
if let Some(d) = timeout {
if d.is_zero() {
return Err(io::const_io_error!(
io::ErrorKind::InvalidInput,
&"Zero duration is invalid"
));
}
}
self.read_timeout
.set(timeout.map(|t| t.as_millis().min(u64::MAX as u128) as u64).unwrap_or_default());
Ok(())
}
pub fn set_write_timeout(&self, timeout: Option<Duration>) -> io::Result<()> {
if let Some(d) = timeout {
if d.is_zero() {
return Err(io::const_io_error!(
io::ErrorKind::InvalidInput,
&"Zero duration is invalid"
));
}
}
self.write_timeout
.set(timeout.map(|t| t.as_millis().min(u64::MAX as u128) as u64).unwrap_or_default());
Ok(())
}
pub fn read_timeout(&self) -> io::Result<Option<Duration>> {
match self.read_timeout.get() {
0 => Ok(None),
t => Ok(Some(Duration::from_millis(t as u64))),
}
}
pub fn write_timeout(&self) -> io::Result<Option<Duration>> {
match self.write_timeout.get() {
0 => Ok(None),
t => Ok(Some(Duration::from_millis(t as u64))),
}
}
pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
if ttl > 255 {
return Err(io::Error::new(io::ErrorKind::InvalidInput, "TTL must be less than 256"));
}
crate::os::xous::ffi::blocking_scalar(
services::net_server(),
services::NetBlockingScalar::StdSetTtlUdp(self.fd, ttl).into(),
)
.or(Err(io::const_io_error!(io::ErrorKind::InvalidInput, &"Unexpected return value")))
.map(|_| ())
}
pub fn ttl(&self) -> io::Result<u32> {
Ok(crate::os::xous::ffi::blocking_scalar(
services::net_server(),
services::NetBlockingScalar::StdGetTtlUdp(self.fd).into(),
)
.or(Err(io::const_io_error!(io::ErrorKind::InvalidInput, &"Unexpected return value")))
.map(|res| res[0] as _)?)
}
pub fn take_error(&self) -> io::Result<Option<io::Error>> {
// this call doesn't have a meaning on our platform, but we can at least not panic if it's used.
Ok(None)
}
pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
self.nonblocking.set(nonblocking);
Ok(())
}
// ------------- smoltcp base stack does not have multicast or broadcast support ---------------
pub fn set_broadcast(&self, _: bool) -> io::Result<()> {
unimpl!();
}
pub fn broadcast(&self) -> io::Result<bool> {
unimpl!();
}
pub fn set_multicast_loop_v4(&self, _: bool) -> io::Result<()> {
unimpl!();
}
pub fn multicast_loop_v4(&self) -> io::Result<bool> {
unimpl!();
}
pub fn set_multicast_ttl_v4(&self, _: u32) -> io::Result<()> {
unimpl!();
}
pub fn multicast_ttl_v4(&self) -> io::Result<u32> {
unimpl!();
}
pub fn set_multicast_loop_v6(&self, _: bool) -> io::Result<()> {
unimpl!();
}
pub fn multicast_loop_v6(&self) -> io::Result<bool> {
unimpl!();
}
pub fn join_multicast_v4(&self, _: &Ipv4Addr, _: &Ipv4Addr) -> io::Result<()> {
unimpl!();
}
pub fn join_multicast_v6(&self, _: &Ipv6Addr, _: u32) -> io::Result<()> {
unimpl!();
}
pub fn leave_multicast_v4(&self, _: &Ipv4Addr, _: &Ipv4Addr) -> io::Result<()> {
unimpl!();
}
pub fn leave_multicast_v6(&self, _: &Ipv6Addr, _: u32) -> io::Result<()> {
unimpl!();
}
}
impl fmt::Debug for UdpSocket {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "UDP listening on {:?} to {:?}", self.local, self.remote.get(),)
}
}
impl Drop for UdpSocket {
fn drop(&mut self) {
if self.handle_count.fetch_sub(1, Ordering::Relaxed) == 1 {
// only drop if we're the last clone
crate::os::xous::ffi::blocking_scalar(
services::net_server(),
services::NetBlockingScalar::StdUdpClose(self.fd).into(),
)
.unwrap();
}
}
}

View File

@ -5,7 +5,7 @@ pub struct Stdout {}
pub struct Stderr;
use crate::os::xous::ffi::{lend, try_lend, try_scalar, Connection};
use crate::os::xous::services::{log_server, try_connect, LogScalar};
use crate::os::xous::services::{log_server, try_connect, LogLend, LogScalar};
impl Stdin {
pub const fn new() -> Stdin {
@ -27,7 +27,7 @@ impl Stdout {
impl io::Write for Stdout {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
#[repr(align(4096))]
#[repr(C, align(4096))]
struct LendBuffer([u8; 4096]);
let mut lend_buffer = LendBuffer([0u8; 4096]);
let connection = log_server();
@ -35,7 +35,8 @@ impl io::Write for Stdout {
for (dest, src) in lend_buffer.0.iter_mut().zip(chunk) {
*dest = *src;
}
lend(connection, 1, &lend_buffer.0, 0, chunk.len()).unwrap();
lend(connection, LogLend::StandardOutput.into(), &lend_buffer.0, 0, chunk.len())
.unwrap();
}
Ok(buf.len())
}
@ -53,7 +54,7 @@ impl Stderr {
impl io::Write for Stderr {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
#[repr(align(4096))]
#[repr(C, align(4096))]
struct LendBuffer([u8; 4096]);
let mut lend_buffer = LendBuffer([0u8; 4096]);
let connection = log_server();
@ -61,7 +62,8 @@ impl io::Write for Stderr {
for (dest, src) in lend_buffer.0.iter_mut().zip(chunk) {
*dest = *src;
}
lend(connection, 1, &lend_buffer.0, 0, chunk.len()).unwrap();
lend(connection, LogLend::StandardError.into(), &lend_buffer.0, 0, chunk.len())
.unwrap();
}
Ok(buf.len())
}

View File

@ -68,14 +68,18 @@ impl Thread {
)
.map_err(|code| io::Error::from_raw_os_error(code as i32))?;
extern "C" fn thread_start(main: *mut usize, guard_page_pre: usize, stack_size: usize) {
extern "C" fn thread_start(
main: *mut usize,
guard_page_pre: usize,
stack_size: usize,
) -> ! {
unsafe {
// Finally, let's run some code.
// Run the contents of the new thread.
Box::from_raw(main as *mut Box<dyn FnOnce()>)();
}
// Destroy TLS, which will free the TLS page and call the destructor for
// any thread local storage.
// any thread local storage (if any).
unsafe {
crate::sys::thread_local_key::destroy_tls();
}

View File

@ -23,10 +23,25 @@ pub type Dtor = unsafe extern "C" fn(*mut u8);
const TLS_MEMORY_SIZE: usize = 4096;
/// TLS keys start at `1` to mimic POSIX.
/// TLS keys start at `1`. Index `0` is unused
#[cfg(not(test))]
#[export_name = "_ZN16__rust_internals3std3sys4xous16thread_local_key13TLS_KEY_INDEXE"]
static TLS_KEY_INDEX: AtomicUsize = AtomicUsize::new(1);
fn tls_ptr_addr() -> *mut usize {
#[cfg(not(test))]
#[export_name = "_ZN16__rust_internals3std3sys4xous16thread_local_key9DTORSE"]
static DTORS: AtomicPtr<Node> = AtomicPtr::new(ptr::null_mut());
#[cfg(test)]
extern "Rust" {
#[link_name = "_ZN16__rust_internals3std3sys4xous16thread_local_key13TLS_KEY_INDEXE"]
static TLS_KEY_INDEX: AtomicUsize;
#[link_name = "_ZN16__rust_internals3std3sys4xous16thread_local_key9DTORSE"]
static DTORS: AtomicPtr<Node>;
}
fn tls_ptr_addr() -> *mut *mut u8 {
let mut tp: usize;
unsafe {
asm!(
@ -34,50 +49,50 @@ fn tls_ptr_addr() -> *mut usize {
out(reg) tp,
);
}
core::ptr::from_exposed_addr_mut::<usize>(tp)
core::ptr::from_exposed_addr_mut::<*mut u8>(tp)
}
/// Create an area of memory that's unique per thread. This area will
/// contain all thread local pointers.
fn tls_ptr() -> *mut usize {
let mut tp = tls_ptr_addr();
fn tls_table() -> &'static mut [*mut u8] {
let tp = tls_ptr_addr();
if !tp.is_null() {
return unsafe {
core::slice::from_raw_parts_mut(tp, TLS_MEMORY_SIZE / core::mem::size_of::<*mut u8>())
};
}
// If the TP register is `0`, then this thread hasn't initialized
// its TLS yet. Allocate a new page to store this memory.
if tp.is_null() {
tp = unsafe {
let tp = unsafe {
map_memory(
None,
None,
TLS_MEMORY_SIZE / core::mem::size_of::<usize>(),
TLS_MEMORY_SIZE / core::mem::size_of::<*mut u8>(),
MemoryFlags::R | MemoryFlags::W,
)
}
.expect("Unable to allocate memory for thread local storage")
.as_mut_ptr();
};
for val in tp.iter() {
assert!(*val as usize == 0);
}
unsafe {
// Key #0 is currently unused.
(tp).write_volatile(0);
// Set the thread's `$tp` register
asm!(
"mv tp, {}",
in(reg) tp as usize,
in(reg) tp.as_mut_ptr() as usize,
);
}
}
tp
}
/// Allocate a new TLS key. These keys are shared among all threads.
fn tls_alloc() -> usize {
TLS_KEY_INDEX.fetch_add(1, SeqCst)
}
#[inline]
pub unsafe fn create(dtor: Option<Dtor>) -> Key {
let key = tls_alloc();
// Allocate a new TLS key. These keys are shared among all threads.
#[allow(unused_unsafe)]
let key = unsafe { TLS_KEY_INDEX.fetch_add(1, SeqCst) };
if let Some(f) = dtor {
unsafe { register_dtor(key, f) };
}
@ -87,18 +102,20 @@ pub unsafe fn create(dtor: Option<Dtor>) -> Key {
#[inline]
pub unsafe fn set(key: Key, value: *mut u8) {
assert!((key < 1022) && (key >= 1));
unsafe { tls_ptr().add(key).write_volatile(value as usize) };
let table = tls_table();
table[key] = value;
}
#[inline]
pub unsafe fn get(key: Key) -> *mut u8 {
assert!((key < 1022) && (key >= 1));
core::ptr::from_exposed_addr_mut::<u8>(unsafe { tls_ptr().add(key).read_volatile() })
tls_table()[key]
}
#[inline]
pub unsafe fn destroy(_key: Key) {
panic!("can't destroy keys on Xous");
// Just leak the key. Probably not great on long-running systems that create
// lots of TLS variables, but in practice that's not an issue.
}
// -------------------------------------------------------------------------
@ -127,8 +144,6 @@ pub unsafe fn destroy(_key: Key) {
// key but also a slot for the destructor queue on windows. An optimization for
// another day!
static DTORS: AtomicPtr<Node> = AtomicPtr::new(ptr::null_mut());
struct Node {
dtor: Dtor,
key: Key,
@ -138,10 +153,12 @@ struct Node {
unsafe fn register_dtor(key: Key, dtor: Dtor) {
let mut node = ManuallyDrop::new(Box::new(Node { key, dtor, next: ptr::null_mut() }));
let mut head = DTORS.load(SeqCst);
#[allow(unused_unsafe)]
let mut head = unsafe { DTORS.load(SeqCst) };
loop {
node.next = head;
match DTORS.compare_exchange(head, &mut **node, SeqCst, SeqCst) {
#[allow(unused_unsafe)]
match unsafe { DTORS.compare_exchange(head, &mut **node, SeqCst, SeqCst) } {
Ok(_) => return, // nothing to drop, we successfully added the node to the list
Err(cur) => head = cur,
}
@ -155,6 +172,7 @@ pub unsafe fn destroy_tls() {
if tp.is_null() {
return;
}
unsafe { run_dtors() };
// Finally, free the TLS array
@ -169,12 +187,19 @@ pub unsafe fn destroy_tls() {
unsafe fn run_dtors() {
let mut any_run = true;
// Run the destructor "some" number of times. This is 5x on Windows,
// so we copy it here. This allows TLS variables to create new
// TLS variables upon destruction that will also get destroyed.
// Keep going until we run out of tries or until we have nothing
// left to destroy.
for _ in 0..5 {
if !any_run {
break;
}
any_run = false;
let mut cur = DTORS.load(SeqCst);
#[allow(unused_unsafe)]
let mut cur = unsafe { DTORS.load(SeqCst) };
while !cur.is_null() {
let ptr = unsafe { get((*cur).key) };

View File

@ -29,31 +29,40 @@ impl Parker {
// Change NOTIFIED to EMPTY and EMPTY to PARKED.
let state = self.state.fetch_sub(1, Acquire);
if state == NOTIFIED {
// The state has gone from NOTIFIED (1) to EMPTY (0)
return;
}
// The state has gone from EMPTY (0) to PARKED (-1)
assert!(state == EMPTY);
// The state was set to PARKED. Wait until the `unpark` wakes us up.
// The state is now PARKED (-1). Wait until the `unpark` wakes us up.
blocking_scalar(
ticktimer_server(),
TicktimerScalar::WaitForCondition(self.index(), 0).into(),
)
.expect("failed to send WaitForCondition command");
self.state.swap(EMPTY, Acquire);
let state = self.state.swap(EMPTY, Acquire);
assert!(state == NOTIFIED || state == PARKED);
}
pub unsafe fn park_timeout(self: Pin<&Self>, timeout: Duration) {
// Change NOTIFIED to EMPTY and EMPTY to PARKED.
let state = self.state.fetch_sub(1, Acquire);
if state == NOTIFIED {
// The state has gone from NOTIFIED (1) to EMPTY (0)
return;
}
// The state has gone from EMPTY (0) to PARKED (-1)
assert!(state == EMPTY);
// A value of zero indicates an indefinite wait. Clamp the number of
// milliseconds to the allowed range.
let millis = usize::max(timeout.as_millis().try_into().unwrap_or(usize::MAX), 1);
let was_timeout = blocking_scalar(
// The state is now PARKED (-1). Wait until the `unpark` wakes us up,
// or things time out.
let _was_timeout = blocking_scalar(
ticktimer_server(),
TicktimerScalar::WaitForCondition(self.index(), millis).into(),
)
@ -61,28 +70,37 @@ impl Parker {
!= 0;
let state = self.state.swap(EMPTY, Acquire);
if was_timeout && state == NOTIFIED {
// The state was set to NOTIFIED after we returned from the wait
// but before we reset the state. Therefore, a wakeup is on its
// way, which we need to consume here.
// NOTICE: this is a priority hole.
blocking_scalar(
ticktimer_server(),
TicktimerScalar::WaitForCondition(self.index(), 0).into(),
)
.expect("failed to send WaitForCondition command");
}
assert!(state == PARKED || state == NOTIFIED);
}
pub fn unpark(self: Pin<&Self>) {
let state = self.state.swap(NOTIFIED, Release);
if state == PARKED {
// The thread is parked, wake it up.
blocking_scalar(
// If the state is already `NOTIFIED`, then another thread has
// indicated it wants to wake up the target thread.
//
// If the state is `EMPTY` then there is nothing to wake up, and
// the target thread will immediately exit from `park()` the
// next time that function is called.
if self.state.swap(NOTIFIED, Release) != PARKED {
return;
}
// The thread is parked, wake it up. Keep trying until we wake something up.
// This will happen when the `NotifyCondition` call returns the fact that
// 1 condition was notified.
// Alternately, keep going until the state is seen as `EMPTY`, indicating
// the thread woke up and kept going. This can happen when the Park
// times out before we can send the NotifyCondition message.
while blocking_scalar(
ticktimer_server(),
TicktimerScalar::NotifyCondition(self.index(), 1).into(),
)
.expect("failed to send NotifyCondition command");
.expect("failed to send NotifyCondition command")[0]
== 0
&& self.state.load(Acquire) != EMPTY
{
// The target thread hasn't yet hit the `WaitForCondition` call.
// Yield to let the target thread run some more.
crate::thread::yield_now();
}
}
}

View File

@ -25,6 +25,7 @@ cfg_if::cfg_if! {
target_family = "unix",
all(target_vendor = "fortanix", target_env = "sgx"),
target_os = "solid_asp3",
target_os = "xous",
))] {
mod queue;
pub use queue::{Once, OnceState};