Rollup merge of #78641 - the8472:buffered-copy, r=sfackler

Let io::copy reuse BufWriter buffers

This optimization will allow users to implicitly set the buffer size for io::copy by wrapping the writer into a `BufWriter` if the default block size is insufficient, which should fix #49921

Due to min_specialization limitations this approach only works with `BufWriter` but not for `BufReader<R>` since `R` is unconstrained and thus the necessary specialization on `R: Read` is not always applicable. Once specialization becomes more powerful this optimization could be extended to look at the reader and writer side and use whichever buffer is larger.
This commit is contained in:
Jonas Schievink 2021-02-01 14:29:28 +01:00 committed by GitHub
commit a7a6f013a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 138 additions and 8 deletions

View File

@ -117,7 +117,7 @@ impl<W: Write> BufWriter<W> {
/// "successfully written" (by returning nonzero success values from
/// `write`), any 0-length writes from `inner` must be reported as i/o
/// errors from this method.
pub(super) fn flush_buf(&mut self) -> io::Result<()> {
pub(in crate::io) fn flush_buf(&mut self) -> io::Result<()> {
/// Helper struct to ensure the buffer is updated after all the writes
/// are complete. It tracks the number of written bytes and drains them
/// all from the front of the buffer when dropped.
@ -243,6 +243,18 @@ impl<W: Write> BufWriter<W> {
&self.buf
}
/// Returns a mutable reference to the internal buffer.
///
/// This can be used to write data directly into the buffer without triggering writers
/// to the underlying writer.
///
/// That the buffer is a `Vec` is an implementation detail.
/// Callers should not modify the capacity as there currently is no public API to do so
/// and thus any capacity changes would be unexpected by the user.
pub(in crate::io) fn buffer_mut(&mut self) -> &mut Vec<u8> {
&mut self.buf
}
/// Returns the number of bytes the internal buffer can hold without flushing.
///
/// # Examples

View File

@ -1,4 +1,4 @@
use crate::io::{self, ErrorKind, Read, Write};
use super::{BufWriter, ErrorKind, Read, Result, Write, DEFAULT_BUF_SIZE};
use crate::mem::MaybeUninit;
/// Copies the entire contents of a reader into a writer.
@ -40,7 +40,7 @@ use crate::mem::MaybeUninit;
/// }
/// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub fn copy<R: ?Sized, W: ?Sized>(reader: &mut R, writer: &mut W) -> io::Result<u64>
pub fn copy<R: ?Sized, W: ?Sized>(reader: &mut R, writer: &mut W) -> Result<u64>
where
R: Read,
W: Write,
@ -54,14 +54,82 @@ where
}
}
/// The general read-write-loop implementation of
/// `io::copy` that is used when specializations are not available or not applicable.
pub(crate) fn generic_copy<R: ?Sized, W: ?Sized>(reader: &mut R, writer: &mut W) -> io::Result<u64>
/// The userspace read-write-loop implementation of `io::copy` that is used when
/// OS-specific specializations for copy offloading are not available or not applicable.
pub(crate) fn generic_copy<R: ?Sized, W: ?Sized>(reader: &mut R, writer: &mut W) -> Result<u64>
where
R: Read,
W: Write,
{
let mut buf = MaybeUninit::<[u8; super::DEFAULT_BUF_SIZE]>::uninit();
BufferedCopySpec::copy_to(reader, writer)
}
/// Specialization of the read-write loop that either uses a stack buffer
/// or reuses the internal buffer of a BufWriter
trait BufferedCopySpec: Write {
fn copy_to<R: Read + ?Sized>(reader: &mut R, writer: &mut Self) -> Result<u64>;
}
impl<W: Write + ?Sized> BufferedCopySpec for W {
default fn copy_to<R: Read + ?Sized>(reader: &mut R, writer: &mut Self) -> Result<u64> {
stack_buffer_copy(reader, writer)
}
}
impl<I: Write> BufferedCopySpec for BufWriter<I> {
fn copy_to<R: Read + ?Sized>(reader: &mut R, writer: &mut Self) -> Result<u64> {
if writer.capacity() < DEFAULT_BUF_SIZE {
return stack_buffer_copy(reader, writer);
}
// FIXME: #42788
//
// - This creates a (mut) reference to a slice of
// _uninitialized_ integers, which is **undefined behavior**
//
// - Only the standard library gets to soundly "ignore" this,
// based on its privileged knowledge of unstable rustc
// internals;
unsafe {
let spare_cap = writer.buffer_mut().spare_capacity_mut();
reader.initializer().initialize(MaybeUninit::slice_assume_init_mut(spare_cap));
}
let mut len = 0;
loop {
let buf = writer.buffer_mut();
let spare_cap = buf.spare_capacity_mut();
if spare_cap.len() >= DEFAULT_BUF_SIZE {
match reader.read(unsafe { MaybeUninit::slice_assume_init_mut(spare_cap) }) {
Ok(0) => return Ok(len), // EOF reached
Ok(bytes_read) => {
assert!(bytes_read <= spare_cap.len());
// Safety: The initializer contract guarantees that either it or `read`
// will have initialized these bytes. And we just checked that the number
// of bytes is within the buffer capacity.
unsafe { buf.set_len(buf.len() + bytes_read) };
len += bytes_read as u64;
// Read again if the buffer still has enough capacity, as BufWriter itself would do
// This will occur if the reader returns short reads
continue;
}
Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
Err(e) => return Err(e),
}
}
writer.flush_buf()?;
}
}
}
fn stack_buffer_copy<R: Read + ?Sized, W: Write + ?Sized>(
reader: &mut R,
writer: &mut W,
) -> Result<u64> {
let mut buf = MaybeUninit::<[u8; DEFAULT_BUF_SIZE]>::uninit();
// FIXME: #42788
//
// - This creates a (mut) reference to a slice of

View File

@ -1,5 +1,8 @@
use crate::cmp::{max, min};
use crate::io::prelude::*;
use crate::io::{copy, empty, repeat, sink, Empty, Repeat, SeekFrom, Sink};
use crate::io::{
copy, empty, repeat, sink, BufWriter, Empty, Repeat, Result, SeekFrom, Sink, DEFAULT_BUF_SIZE,
};
#[test]
fn copy_copies() {
@ -11,6 +14,51 @@ fn copy_copies() {
assert_eq!(copy(&mut r as &mut dyn Read, &mut w as &mut dyn Write).unwrap(), 1 << 17);
}
struct ShortReader {
cap: usize,
read_size: usize,
observed_buffer: usize,
}
impl Read for ShortReader {
fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
let bytes = min(self.cap, self.read_size);
self.cap -= bytes;
self.observed_buffer = max(self.observed_buffer, buf.len());
Ok(bytes)
}
}
struct WriteObserver {
observed_buffer: usize,
}
impl Write for WriteObserver {
fn write(&mut self, buf: &[u8]) -> Result<usize> {
self.observed_buffer = max(self.observed_buffer, buf.len());
Ok(buf.len())
}
fn flush(&mut self) -> Result<()> {
Ok(())
}
}
#[test]
fn copy_specializes_bufwriter() {
let cap = 117 * 1024;
let buf_sz = 16 * 1024;
let mut r = ShortReader { cap, observed_buffer: 0, read_size: 1337 };
let mut w = BufWriter::with_capacity(buf_sz, WriteObserver { observed_buffer: 0 });
assert_eq!(
copy(&mut r, &mut w).unwrap(),
cap as u64,
"expected the whole capacity to be copied"
);
assert_eq!(r.observed_buffer, buf_sz, "expected a large buffer to be provided to the reader");
assert!(w.get_mut().observed_buffer > DEFAULT_BUF_SIZE, "expected coalesced writes");
}
#[test]
fn sink_sinks() {
let mut s = sink();

View File

@ -284,6 +284,7 @@
#![feature(maybe_uninit_extra)]
#![feature(maybe_uninit_ref)]
#![feature(maybe_uninit_slice)]
#![feature(maybe_uninit_uninit_array)]
#![feature(min_specialization)]
#![feature(needs_panic_runtime)]
#![feature(negative_impls)]
@ -327,6 +328,7 @@
#![feature(unsafe_cell_raw_get)]
#![feature(unwind_attributes)]
#![feature(vec_into_raw_parts)]
#![feature(vec_spare_capacity)]
#![feature(wake_trait)]
// NB: the above list is sorted to minimize merge conflicts.
#![default_lib_allocator]