diff --git a/src/lib.rs b/src/lib.rs old mode 100644 new mode 100755 index fec8ae6..0bd1446 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,14 +28,14 @@ //! } //! } //! ``` -use std::collections::VecDeque; -use std::ops::Deref; -use std::pin::Pin; -use std::task::{Context, Poll, ready}; use futures::Stream; -use serde::de::DeserializeOwned; use pin_project::pin_project; +use serde::de::DeserializeOwned; use serde_json::Deserializer; +use std::collections::VecDeque; +use std::ops::Deref; +use std::pin::Pin; +use std::task::{ready, Context, Poll}; use tracing::trace; // should be 2^n - 1 for VecDeque to work efficiently @@ -60,6 +60,7 @@ pub struct JsonStream { byte_buffer: VecDeque, finished: bool, max_buffer_capacity: usize, + respect_eof: bool, } impl JsonStream { @@ -85,12 +86,26 @@ impl JsonStream { Self { stream, entry_buffer: Vec::new(), - byte_buffer: VecDeque::with_capacity(std::cmp::min(DEFAULT_BUFFER_CAPACITY, max_capacity)), + byte_buffer: VecDeque::with_capacity(std::cmp::min( + DEFAULT_BUFFER_CAPACITY, + max_capacity, + )), finished: false, - max_buffer_capacity: max_capacity + max_buffer_capacity: max_capacity, + respect_eof: false, } } + /// Toggles a mode where the stream will "complete" when the + /// buffer is empty, there are no processed messages, and the + /// underlying reader returns an EOF error. All other error + /// messages, either from the parsing (because there might be a + /// partial json object,) or from the IO reader (because more data + /// may arrive or recover eventually,) are ignored. + pub fn set_respect_eof(&mut self) { + self.respect_eof = true + } + /// Controls how large the internal buffer can grow in bytes. If the buffer grows larger than this /// the stream is terminated as it is assumed that the stream is malformed. If this number is too /// large, a malformed stream can cause the server to run out of memory. @@ -115,10 +130,10 @@ impl JsonStream { } impl Stream for JsonStream - where - T: DeserializeOwned, - B: Deref, - S: Stream> + Unpin +where + T: DeserializeOwned, + B: Deref, + S: Stream> + Unpin, { type Item = Result; @@ -143,7 +158,7 @@ impl Stream for JsonStream Some(Err(err)) => { self.finish(); return Poll::Ready(Some(Err(err))); - }, + } None => { self.finish(); return Poll::Ready(None); @@ -157,7 +172,7 @@ impl Stream for JsonStream // no room for this chunk self.finish(); return Poll::Ready(None); - }, + } None => { // overflow occurred self.finish(); @@ -182,14 +197,23 @@ impl Stream for JsonStream Some(Ok(entry)) => { last_read_pos = json_iter.byte_offset(); this.entry_buffer.push(entry); - }, + } // if there was an error, log it but move on because this could be a partial entry Some(Err(err)) => { - trace!(err = ?err, "failed to parse json entry"); - break - }, + if *this.respect_eof + && err.is_eof() + && this.entry_buffer.is_empty() + && buffer.is_empty() + { + self.finish(); + return Poll::Ready(None); + } else { + trace!(err = ?err, "failed to parse json entry"); + break; + }; + } // nothing left then we move on - None => break + None => break, } } @@ -201,4 +225,4 @@ impl Stream for JsonStream this.byte_buffer.make_contiguous(); } } -} \ No newline at end of file +}