pub mod stream;
use re_build_info::CrateVersion;
use re_log_types::LogMsg;
use crate::FileHeader;
use crate::MessageHeader;
use crate::OLD_RRD_HEADERS;
use crate::{Compression, EncodingOptions, Serializer};
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum VersionPolicy {
Warn,
Error,
}
fn warn_on_version_mismatch(
version_policy: VersionPolicy,
encoded_version: [u8; 4],
) -> Result<(), DecodeError> {
let encoded_version = if encoded_version == [0, 0, 0, 0] {
CrateVersion::new(0, 2, 0)
} else {
CrateVersion::from_bytes(encoded_version)
};
if encoded_version.is_compatible_with(CrateVersion::LOCAL) {
Ok(())
} else {
match version_policy {
VersionPolicy::Warn => {
re_log::warn_once!(
"Found log stream with Rerun version {encoded_version}, \
which is incompatible with the local Rerun version {}. \
Loading will try to continue, but might fail in subtle ways.",
CrateVersion::LOCAL,
);
Ok(())
}
VersionPolicy::Error => Err(DecodeError::IncompatibleRerunVersion {
file: encoded_version,
local: CrateVersion::LOCAL,
}),
}
}
}
#[derive(thiserror::Error, Debug)]
pub enum DecodeError {
#[error("Not an .rrd file")]
NotAnRrd,
#[error("Data was from an old, incompatible Rerun version")]
OldRrdVersion,
#[error("Data from Rerun version {file}, which is incompatible with the local Rerun version {local}")]
IncompatibleRerunVersion {
file: CrateVersion,
local: CrateVersion,
},
#[error("Failed to decode the options: {0}")]
Options(#[from] crate::OptionsError),
#[error("Failed to read: {0}")]
Read(std::io::Error),
#[error("lz4 error: {0}")]
Lz4(lz4_flex::block::DecompressError),
#[error("MsgPack error: {0}")]
MsgPack(#[from] rmp_serde::decode::Error),
}
pub fn decode_bytes(
version_policy: VersionPolicy,
bytes: &[u8],
) -> Result<Vec<LogMsg>, DecodeError> {
re_tracing::profile_function!();
let decoder = Decoder::new(version_policy, std::io::Cursor::new(bytes))?;
let mut msgs = vec![];
for msg in decoder {
msgs.push(msg?);
}
Ok(msgs)
}
pub fn read_options(
version_policy: VersionPolicy,
bytes: &[u8],
) -> Result<(CrateVersion, EncodingOptions), DecodeError> {
let mut read = std::io::Cursor::new(bytes);
let FileHeader {
magic,
version,
options,
} = FileHeader::decode(&mut read)?;
if OLD_RRD_HEADERS.contains(&magic) {
return Err(DecodeError::OldRrdVersion);
} else if &magic != crate::RRD_HEADER {
return Err(DecodeError::NotAnRrd);
}
warn_on_version_mismatch(version_policy, version)?;
match options.serializer {
Serializer::MsgPack => {}
}
Ok((CrateVersion::from_bytes(version), options))
}
pub struct Decoder<R: std::io::Read> {
version: CrateVersion,
compression: Compression,
read: R,
uncompressed: Vec<u8>, compressed: Vec<u8>, }
impl<R: std::io::Read> Decoder<R> {
pub fn new(version_policy: VersionPolicy, mut read: R) -> Result<Self, DecodeError> {
re_tracing::profile_function!();
let mut data = [0_u8; FileHeader::SIZE];
read.read_exact(&mut data).map_err(DecodeError::Read)?;
let (version, options) = read_options(version_policy, &data)?;
let compression = options.compression;
Ok(Self {
version,
compression,
read,
uncompressed: vec![],
compressed: vec![],
})
}
#[inline]
pub fn version(&self) -> CrateVersion {
self.version
}
}
impl<R: std::io::Read> Iterator for Decoder<R> {
type Item = Result<LogMsg, DecodeError>;
fn next(&mut self) -> Option<Self::Item> {
re_tracing::profile_function!();
let header = match MessageHeader::decode(&mut self.read) {
Ok(header) => header,
Err(err) => match err {
DecodeError::Read(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
return None
}
other => return Some(Err(other)),
},
};
let uncompressed_len = header.uncompressed_len as usize;
self.uncompressed
.resize(self.uncompressed.len().max(uncompressed_len), 0);
match self.compression {
Compression::Off => {
re_tracing::profile_scope!("read uncompressed");
if let Err(err) = self
.read
.read_exact(&mut self.uncompressed[..uncompressed_len])
{
return Some(Err(DecodeError::Read(err)));
}
}
Compression::LZ4 => {
let compressed_len = header.compressed_len as usize;
self.compressed
.resize(self.compressed.len().max(compressed_len), 0);
{
re_tracing::profile_scope!("read compressed");
if let Err(err) = self.read.read_exact(&mut self.compressed[..compressed_len]) {
return Some(Err(DecodeError::Read(err)));
}
}
re_tracing::profile_scope!("lz4");
if let Err(err) = lz4_flex::block::decompress_into(
&self.compressed[..compressed_len],
&mut self.uncompressed[..uncompressed_len],
) {
return Some(Err(DecodeError::Lz4(err)));
}
}
}
re_tracing::profile_scope!("MsgPack deser");
match rmp_serde::from_slice(&self.uncompressed[..uncompressed_len]) {
Ok(re_log_types::LogMsg::SetStoreInfo(mut msg)) => {
msg.info.store_version = Some(self.version());
Some(Ok(re_log_types::LogMsg::SetStoreInfo(msg)))
}
Ok(msg) => Some(Ok(msg)),
Err(err) => Some(Err(err.into())),
}
}
}
#[cfg(all(feature = "decoder", feature = "encoder"))]
#[test]
fn test_encode_decode() {
use re_log_types::{
ApplicationId, LogMsg, RowId, SetStoreInfo, StoreId, StoreInfo, StoreKind, StoreSource,
Time,
};
let rrd_version = CrateVersion::LOCAL;
let messages = vec![LogMsg::SetStoreInfo(SetStoreInfo {
row_id: RowId::new(),
info: StoreInfo {
application_id: ApplicationId("test".to_owned()),
store_id: StoreId::random(StoreKind::Recording),
cloned_from: None,
is_official_example: true,
started: Time::now(),
store_source: StoreSource::RustSdk {
rustc_version: String::new(),
llvm_version: String::new(),
},
store_version: Some(rrd_version),
},
})];
let options = [
EncodingOptions {
compression: Compression::Off,
serializer: Serializer::MsgPack,
},
EncodingOptions {
compression: Compression::LZ4,
serializer: Serializer::MsgPack,
},
];
for options in options {
let mut file = vec![];
crate::encoder::encode(rrd_version, options, messages.iter(), &mut file).unwrap();
let decoded_messages = Decoder::new(VersionPolicy::Error, &mut file.as_slice())
.unwrap()
.collect::<Result<Vec<LogMsg>, DecodeError>>()
.unwrap();
assert_eq!(messages, decoded_messages);
}
}