1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use bytes::{Buf, BufMut};
use futures_core::ready;
use std::io::{self, IoSlice};
use std::mem::MaybeUninit;
use std::pin::Pin;
use std::task::{Context, Poll};
/// Try to read data from an `AsyncRead` into an implementer of the [`BufMut`] trait.
///
/// [`BufMut`]: bytes::Buf
///
/// # Example
///
/// ```
/// use bytes::{Bytes, BytesMut};
/// use tokio_stream as stream;
/// use tokio::io::Result;
/// use tokio_util::io::{StreamReader, poll_read_buf};
/// use futures::future::poll_fn;
/// use std::pin::Pin;
/// # #[tokio::main]
/// # async fn main() -> std::io::Result<()> {
///
/// // Create a reader from an iterator. This particular reader will always be
/// // ready.
/// let mut read = StreamReader::new(stream::iter(vec![Result::Ok(Bytes::from_static(&[0, 1, 2, 3]))]));
///
/// let mut buf = BytesMut::new();
/// let mut reads = 0;
///
/// loop {
/// reads += 1;
/// let n = poll_fn(|cx| poll_read_buf(Pin::new(&mut read), cx, &mut buf)).await?;
///
/// if n == 0 {
/// break;
/// }
/// }
///
/// // one or more reads might be necessary.
/// assert!(reads >= 1);
/// assert_eq!(&buf[..], &[0, 1, 2, 3]);
/// # Ok(())
/// # }
/// ```
#[cfg_attr(not(feature = "io"), allow(unreachable_pub))]
pub fn poll_read_buf<T: AsyncRead, B: BufMut>(
io: Pin<&mut T>,
cx: &mut Context<'_>,
buf: &mut B,
) -> Poll<io::Result<usize>> {
if !buf.has_remaining_mut() {
return Poll::Ready(Ok(0));
}
let n = {
let dst = buf.chunk_mut();
// Safety: `chunk_mut()` returns a `&mut UninitSlice`, and `UninitSlice` is a
// transparent wrapper around `[MaybeUninit<u8>]`.
let dst = unsafe { &mut *(dst as *mut _ as *mut [MaybeUninit<u8>]) };
let mut buf = ReadBuf::uninit(dst);
let ptr = buf.filled().as_ptr();
ready!(io.poll_read(cx, &mut buf)?);
// Ensure the pointer does not change from under us
assert_eq!(ptr, buf.filled().as_ptr());
buf.filled().len()
};
// Safety: This is guaranteed to be the number of initialized (and read)
// bytes due to the invariants provided by `ReadBuf::filled`.
unsafe {
buf.advance_mut(n);
}
Poll::Ready(Ok(n))
}
/// Try to write data from an implementer of the [`Buf`] trait to an
/// [`AsyncWrite`], advancing the buffer's internal cursor.
///
/// This function will use [vectored writes] when the [`AsyncWrite`] supports
/// vectored writes.
///
/// # Examples
///
/// [`File`] implements [`AsyncWrite`] and [`Cursor<&[u8]>`] implements
/// [`Buf`]:
///
/// ```no_run
/// use tokio_util::io::poll_write_buf;
/// use tokio::io;
/// use tokio::fs::File;
///
/// use bytes::Buf;
/// use std::io::Cursor;
/// use std::pin::Pin;
/// use futures::future::poll_fn;
///
/// #[tokio::main]
/// async fn main() -> io::Result<()> {
/// let mut file = File::create("foo.txt").await?;
/// let mut buf = Cursor::new(b"data to write");
///
/// // Loop until the entire contents of the buffer are written to
/// // the file.
/// while buf.has_remaining() {
/// poll_fn(|cx| poll_write_buf(Pin::new(&mut file), cx, &mut buf)).await?;
/// }
///
/// Ok(())
/// }
/// ```
///
/// [`Buf`]: bytes::Buf
/// [`AsyncWrite`]: tokio::io::AsyncWrite
/// [`File`]: tokio::fs::File
/// [vectored writes]: tokio::io::AsyncWrite::poll_write_vectored
#[cfg_attr(not(feature = "io"), allow(unreachable_pub))]
pub fn poll_write_buf<T: AsyncWrite, B: Buf>(
io: Pin<&mut T>,
cx: &mut Context<'_>,
buf: &mut B,
) -> Poll<io::Result<usize>> {
const MAX_BUFS: usize = 64;
if !buf.has_remaining() {
return Poll::Ready(Ok(0));
}
let n = if io.is_write_vectored() {
let mut slices = [IoSlice::new(&[]); MAX_BUFS];
let cnt = buf.chunks_vectored(&mut slices);
ready!(io.poll_write_vectored(cx, &slices[..cnt]))?
} else {
ready!(io.poll_write(cx, buf.chunk()))?
};
buf.advance(n);
Poll::Ready(Ok(n))
}