use std::collections::BTreeMap;
use arrow2::Either;
use re_log_types::{
DataCellColumn, DataRow, DataTable, ErasedTimeVec, ResolvedTimeRange, RowId, RowIdVec, TableId,
TimeInt, TimePoint, Timeline,
};
use crate::{store::IndexedBucketInner, DataStore, IndexedBucket};
impl DataStore {
pub fn to_rows(&self) -> re_log_types::DataReadResult<Vec<DataRow>> {
re_tracing::profile_function!();
let mut rows = ahash::HashMap::<RowId, DataRow>::default();
for table in self.to_data_tables(None) {
for row in table.to_rows().collect::<Vec<_>>() {
let row = row?;
match rows.entry(row.row_id()) {
std::collections::hash_map::Entry::Occupied(mut entry) => {
for (timeline, time) in row.timepoint() {
entry.get_mut().timepoint.insert(*timeline, *time);
}
}
std::collections::hash_map::Entry::Vacant(entry) => {
entry.insert(row);
}
}
}
}
let mut rows = rows.into_values().collect::<Vec<_>>();
{
re_tracing::profile_scope!("sort_rows");
rows.sort_by_key(|row| (row.timepoint.clone(), row.row_id));
}
Ok(rows)
}
pub fn to_data_table(&self) -> re_log_types::DataReadResult<DataTable> {
re_tracing::profile_function!();
let rows = self.to_rows()?;
Ok(re_log_types::DataTable::from_rows(
re_log_types::TableId::new(),
rows,
))
}
pub fn to_data_tables(
&self,
time_filter: Option<(Timeline, ResolvedTimeRange)>,
) -> impl Iterator<Item = DataTable> + '_ {
let static_tables = self.dump_static_tables();
let temporal = if let Some(time_filter) = time_filter {
Either::Left(self.dump_temporal_tables_filtered(time_filter))
} else {
Either::Right(self.dump_temporal_tables())
};
static_tables.chain(temporal)
}
fn dump_static_tables(&self) -> impl Iterator<Item = DataTable> + '_ {
self.static_tables.values().map(|static_table| {
let mut cells_per_row_id: BTreeMap<RowId, Vec<_>> = Default::default();
for static_cell in static_table.cells.values() {
cells_per_row_id
.entry(static_cell.row_id)
.or_default()
.push(static_cell.clone());
}
let rows = cells_per_row_id
.into_iter()
.filter_map(|(row_id, static_cells)| {
DataRow::from_cells(
row_id,
TimePoint::default(),
static_table.entity_path.clone(),
static_cells.into_iter().map(|static_cell| static_cell.cell),
)
.ok()
});
DataTable::from_rows(TableId::ZERO, rows)
})
}
fn dump_temporal_tables(&self) -> impl Iterator<Item = DataTable> + '_ {
self.tables.values().flat_map(|table| {
re_tracing::profile_scope!("temporal_table");
table.buckets.values().map(move |bucket| {
re_tracing::profile_scope!("temporal_bucket");
bucket.sort_indices_if_needed();
let IndexedBucket { timeline, inner } = bucket;
let IndexedBucketInner {
is_sorted: _,
time_range: _,
col_time,
col_insert_id: _,
col_row_id,
max_row_id: _,
columns,
size_bytes: _,
} = &*inner.read();
DataTable {
table_id: TableId::new(),
col_row_id: col_row_id.clone(),
col_timelines: [(*timeline, col_time.iter().copied().map(Some).collect())]
.into(),
col_entity_path: std::iter::repeat_with(|| table.entity_path.clone())
.take(col_row_id.len())
.collect(),
columns: columns.clone().into_iter().collect(), }
})
})
}
fn dump_temporal_tables_filtered(
&self,
(timeline_filter, time_filter): (Timeline, ResolvedTimeRange),
) -> impl Iterator<Item = DataTable> + '_ {
self.tables
.values()
.filter_map(move |table| {
re_tracing::profile_scope!("temporal_table_filtered");
if table.timeline != timeline_filter {
return None;
}
Some(table.buckets.values().filter_map(move |bucket| {
re_tracing::profile_scope!("temporal_bucket_filtered");
bucket.sort_indices_if_needed();
let IndexedBucket { timeline, inner } = bucket;
let IndexedBucketInner {
is_sorted: _,
time_range,
col_time,
col_insert_id: _,
col_row_id,
max_row_id: _,
columns,
size_bytes: _,
} = &*inner.read();
if !time_range.intersects(time_filter) {
return None;
}
let col_row_id: RowIdVec =
filter_column(col_time, col_row_id.iter(), time_filter).collect();
debug_assert!(!col_row_id.is_empty());
if col_row_id.is_empty() {
return None;
}
let col_timelines = [(
*timeline,
filter_column(col_time, col_time.iter(), time_filter)
.map(Some)
.collect(),
)]
.into();
let col_entity_path = std::iter::repeat_with(|| table.entity_path.clone())
.take(col_row_id.len())
.collect();
let mut columns2 = BTreeMap::default();
for (component, column) in columns {
let column = filter_column(col_time, column.iter(), time_filter).collect();
columns2.insert(*component, DataCellColumn(column));
}
Some(DataTable {
table_id: TableId::new(),
col_row_id,
col_timelines,
col_entity_path,
columns: columns2,
})
}))
})
.flatten()
}
}
fn filter_column<'a, T: 'a + Clone>(
col_time: &'a ErasedTimeVec,
column: impl Iterator<Item = &'a T> + 'a,
time_filter: ResolvedTimeRange,
) -> impl Iterator<Item = T> + 'a {
col_time
.iter()
.zip(column)
.filter(move |(&time, _)| time_filter.contains(TimeInt::new_temporal(time)))
.map(|(_, v)| v.clone())
}