How do I apply a limit to the number of bytes read by futures::Stream::concat2?

1001 views rust

An answer to How do I read the entire body of a Tokio-based Hyper request? suggests:

you may wish to establish some kind of cap on the number of bytes read [when using futures::Stream::concat2]

How can I actually achieve this? For example, here's some code that mimics a malicious user who is sending my service an infinite amount of data:

extern crate futures; // 0.1.25

use futures::{stream, future, prelude::*};

fn some_bytes() -> impl Stream<Item = Vec<u8>, Error = ()> {

fn limited() -> impl Future<Item = Vec<u8>, Error = ()> {

fn main() {
    let v = limited().wait().unwrap();
    println!("{}", v.len());

answered question

1 Answer


One solution is to create a stream combinator that ens the stream once some threshold of bytes has passed. Here's one possible implementation:

struct TakeBytes<S> {
    inner: S,
    seen: usize,
    limit: usize,

impl<S> Stream for TakeBytes<S>
    S: Stream<Item = Vec<u8>>,
    type Item = Vec<u8>;
    type Error = S::Error;

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        match try_ready!(self.inner.poll()) {
            Some(v) => {
                if self.seen > self.limit {
                    Ok(Async::Ready(None)) // Stream is over
                } else {
                    self.seen += v.len();
            None => Ok(Async::Ready(None)),

trait TakeBytesExt: Sized {
    fn take_bytes(self, limit: usize) -> TakeBytes<Self>;

impl<S> TakeBytesExt for S
    S: Stream<Item = Vec<u8>>,
    fn take_bytes(self, limit: usize) -> TakeBytes<Self> {
        TakeBytes {
            inner: self,
            seen: 0,

This can then be chained onto the stream before concat2:

fn limited() -> impl Future<Item = Vec<u8>, Error = ()> {

This implementation has caveats:

  • it only works for Vec<u8>. You can introduce generics to make it more broadly applicable, of course.
  • it allows for more bytes than the limit to come in, it just stops the stream after that point. Those types of decisions are application-dependent.

Another thing to keep in mind is that you want to attempt to tackle this problem as low as you can — if the source of the data has already allocated a gigabyte of memory, placing a limit won't help as much.

posted this

Have an answer?


Please login first before posting an answer.