use re_log_types::{ResolvedTimeRange, RowId, TimeInt, VecDequeSortingExt as _};
use re_types_core::{ComponentName, Loggable, SizeBytes as _};
use crate::{DataStore, IndexedBucket, IndexedBucketInner, IndexedTable};
#[derive(thiserror::Error, Debug)]
pub enum SanityError {
#[error(
"Reported time range for indexed bucket is out of sync: got {got:?}, expected {expected:?}"
)]
TimeRangeOutOfSync {
expected: ResolvedTimeRange,
got: ResolvedTimeRange,
},
#[error(
"Reported max RowId for indexed bucket is out of sync: got {got}, expected {expected}"
)]
MaxRowIdOutOfSync { expected: RowId, got: RowId },
#[error("Reported size for {origin} is out of sync: got {got}, expected {expected}")]
SizeOutOfSync {
origin: &'static str,
expected: String,
got: String,
},
#[error("Reported number of rows for {origin} is out of sync: got {got}, expected {expected}")]
RowsOutOfSync {
origin: &'static str,
expected: String,
got: String,
},
#[error("Column '{component}' has too few/many rows: got {got} instead of {expected}")]
ColumnLengthMismatch {
component: ComponentName,
expected: u64,
got: u64,
},
#[error("Found overlapping indexed buckets: {t1_max_formatted} ({t1_max}) <-> {t2_max_formatted} ({t2_max})")]
OverlappingBuckets {
t1_max: i64,
t1_max_formatted: String,
t2_max: i64,
t2_max_formatted: String,
},
}
pub type SanityResult<T> = ::std::result::Result<T, SanityError>;
impl DataStore {
pub fn sanity_check(&self) -> SanityResult<()> {
re_tracing::profile_function!();
for table in self.tables.values() {
table.sanity_check()?;
}
Ok(())
}
}
impl IndexedTable {
pub fn sanity_check(&self) -> SanityResult<()> {
re_tracing::profile_function!();
{
let time_ranges = self
.buckets
.values()
.map(|bucket| bucket.inner.read().time_range)
.collect::<Vec<_>>();
for time_ranges in time_ranges.windows(2) {
let &[t1, t2] = time_ranges else {
unreachable!()
};
if t1.max().as_i64() >= t2.min().as_i64() {
return Err(SanityError::OverlappingBuckets {
t1_max: t1.max().as_i64(),
t1_max_formatted: self.timeline.typ().format_utc(t1.max()),
t2_max: t2.max().as_i64(),
t2_max_formatted: self.timeline.typ().format_utc(t2.max()),
});
}
}
}
{
let num_rows = self.num_rows();
let num_rows_uncached = self.num_rows_uncached();
if num_rows != num_rows_uncached {
return Err(SanityError::RowsOutOfSync {
origin: std::any::type_name::<Self>(),
expected: re_format::format_uint(num_rows_uncached),
got: re_format::format_uint(num_rows),
});
}
}
for bucket in self.buckets.values() {
bucket.sanity_check()?;
}
{
let total_size_bytes = self.total_size_bytes();
let total_size_bytes_uncached = self.size_bytes_uncached();
if total_size_bytes != total_size_bytes_uncached {
return Err(SanityError::SizeOutOfSync {
origin: std::any::type_name::<Self>(),
expected: re_format::format_bytes(total_size_bytes_uncached as _),
got: re_format::format_bytes(total_size_bytes as _),
});
}
}
Ok(())
}
}
impl IndexedBucket {
pub fn sanity_check(&self) -> SanityResult<()> {
re_tracing::profile_function!();
let Self { timeline: _, inner } = self;
{
let IndexedBucketInner {
is_sorted: _,
time_range,
col_time,
col_insert_id,
col_row_id,
max_row_id,
columns,
size_bytes: _,
} = &*inner.read();
{
let mut times = col_time.clone();
times.sort();
let expected_min = times
.front()
.copied()
.and_then(|t| TimeInt::try_from(t).ok())
.unwrap_or(TimeInt::MAX);
let expected_max = times
.back()
.copied()
.and_then(|t| TimeInt::try_from(t).ok())
.unwrap_or(TimeInt::MIN);
let expected_time_range = ResolvedTimeRange::new(expected_min, expected_max);
if expected_time_range != *time_range {
return Err(SanityError::TimeRangeOutOfSync {
expected: expected_time_range,
got: *time_range,
});
}
}
{
let expected = col_row_id.iter().max().copied().unwrap_or(RowId::ZERO);
if expected != *max_row_id {
return Err(SanityError::MaxRowIdOutOfSync {
expected,
got: *max_row_id,
});
}
}
{
const COLUMN_TIMEPOINT: &str = "rerun.controls.TimePoint";
let num_rows = self.num_rows();
let column_lengths = [
(!col_insert_id.is_empty())
.then(|| (DataStore::insert_id_component_name(), col_insert_id.len())), Some((COLUMN_TIMEPOINT.into(), col_time.len())),
Some((RowId::name(), col_row_id.len())),
]
.into_iter()
.flatten()
.chain(
columns
.iter()
.map(|(component, column)| (*component, column.len())),
)
.map(|(component, len)| (component, len as u64));
for (component, len) in column_lengths {
if len != num_rows {
return Err(SanityError::ColumnLengthMismatch {
component,
expected: num_rows,
got: len,
});
}
}
}
}
{
let size_bytes = inner.read().size_bytes;
let size_bytes_uncached = inner.write().compute_size_bytes();
if size_bytes != size_bytes_uncached {
return Err(SanityError::SizeOutOfSync {
origin: std::any::type_name::<Self>(),
expected: re_format::format_bytes(size_bytes_uncached as _),
got: re_format::format_bytes(size_bytes as _),
});
}
}
Ok(())
}
}