use re_log_types::{EntityPathHash, ResolvedTimeRange, TimePoint};
use re_types_core::SizeBytes;
use crate::{store::IndexedBucketInner, DataStore, IndexedBucket, IndexedTable, MetadataRegistry};
#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, PartialOrd)]
pub struct DataStoreRowStats {
pub num_rows: u64,
pub num_bytes: u64,
}
impl std::ops::Sub for DataStoreRowStats {
type Output = Self;
fn sub(self, rhs: Self) -> Self::Output {
Self {
num_rows: self.num_rows - rhs.num_rows,
num_bytes: self.num_bytes - rhs.num_bytes,
}
}
}
impl std::ops::Add for DataStoreRowStats {
type Output = Self;
fn add(self, rhs: Self) -> Self::Output {
Self {
num_rows: self.num_rows + rhs.num_rows,
num_bytes: self.num_bytes + rhs.num_bytes,
}
}
}
#[derive(Default, Debug, Clone, PartialEq, Eq, PartialOrd)]
pub struct DataStoreStats {
pub type_registry: DataStoreRowStats,
pub metadata_registry: DataStoreRowStats,
pub static_tables: DataStoreRowStats,
pub temporal: DataStoreRowStats,
pub temporal_buckets: u64,
pub total: DataStoreRowStats,
}
impl std::ops::Sub for DataStoreStats {
type Output = Self;
fn sub(self, rhs: Self) -> Self::Output {
Self {
type_registry: self.type_registry - rhs.type_registry,
metadata_registry: self.metadata_registry - rhs.metadata_registry,
static_tables: self.static_tables - rhs.static_tables,
temporal: self.temporal - rhs.temporal,
temporal_buckets: self.temporal_buckets - rhs.temporal_buckets,
total: self.total - rhs.total,
}
}
}
impl std::ops::Add for DataStoreStats {
type Output = Self;
fn add(self, rhs: Self) -> Self::Output {
Self {
type_registry: self.type_registry + rhs.type_registry,
metadata_registry: self.metadata_registry + rhs.metadata_registry,
static_tables: self.static_tables + rhs.static_tables,
temporal: self.temporal + rhs.temporal,
temporal_buckets: self.temporal_buckets + rhs.temporal_buckets,
total: self.total + rhs.total,
}
}
}
impl DataStoreStats {
pub fn from_store(store: &DataStore) -> Self {
re_tracing::profile_function!();
let type_registry = {
re_tracing::profile_scope!("type_registry");
DataStoreRowStats {
num_rows: store.type_registry.len() as _,
num_bytes: store.type_registry.total_size_bytes(),
}
};
let metadata_registry = {
re_tracing::profile_scope!("metadata_registry");
DataStoreRowStats {
num_rows: store.metadata_registry.len() as _,
num_bytes: store.metadata_registry.total_size_bytes(),
}
};
let static_tables = {
re_tracing::profile_scope!("static data");
DataStoreRowStats {
num_rows: store.num_static_rows(),
num_bytes: store.static_size_bytes(),
}
};
let (temporal, temporal_buckets) = {
re_tracing::profile_scope!("temporal");
(
DataStoreRowStats {
num_rows: store.num_temporal_rows(),
num_bytes: store.temporal_size_bytes(),
},
store.num_temporal_buckets(),
)
};
let total = DataStoreRowStats {
num_rows: static_tables.num_rows + temporal.num_rows,
num_bytes: type_registry.num_bytes
+ metadata_registry.num_bytes
+ static_tables.num_bytes
+ temporal.num_bytes,
};
Self {
type_registry,
metadata_registry,
static_tables,
temporal,
temporal_buckets,
total,
}
}
pub fn total_rows_and_bytes(&self) -> (u64, f64) {
let mut num_rows = self.temporal.num_rows + self.metadata_registry.num_rows;
let mut num_bytes = (self.temporal.num_bytes + self.metadata_registry.num_bytes) as f64;
num_rows += self.static_tables.num_rows;
num_bytes += self.static_tables.num_bytes as f64;
(num_rows, num_bytes)
}
}
impl SizeBytes for MetadataRegistry<(TimePoint, EntityPathHash)> {
#[inline]
fn heap_size_bytes(&self) -> u64 {
self.heap_size_bytes
}
}
impl SizeBytes for DataStore {
#[inline]
fn heap_size_bytes(&self) -> u64 {
self.static_size_bytes() + self.temporal_size_bytes() }
}
impl DataStore {
#[inline]
pub fn num_static_rows(&self) -> u64 {
self.static_tables.len() as _
}
#[inline]
pub fn static_size_bytes(&self) -> u64 {
re_tracing::profile_function!();
self.static_tables
.values()
.map(|static_table| {
static_table
.cells
.values()
.map(|static_cell| static_cell.cell.total_size_bytes())
.sum::<u64>()
})
.sum()
}
#[inline]
pub fn num_temporal_rows(&self) -> u64 {
re_tracing::profile_function!();
self.tables.values().map(|table| table.num_rows()).sum()
}
#[inline]
pub fn temporal_size_bytes(&self) -> u64 {
re_tracing::profile_function!();
self.tables
.values()
.map(|table| table.total_size_bytes())
.sum()
}
#[inline]
pub fn num_temporal_buckets(&self) -> u64 {
re_tracing::profile_function!();
self.tables.values().map(|table| table.num_buckets()).sum()
}
pub fn entity_stats(
&self,
timeline: re_log_types::Timeline,
entity_path_hash: re_log_types::EntityPathHash,
) -> EntityStats {
let mut entity_stats = self.tables.get(&(entity_path_hash, timeline)).map_or(
EntityStats::default(),
|table| EntityStats {
num_rows: table.buckets_num_rows,
size_bytes: table.buckets_size_bytes,
time_range: table.time_range(),
num_static_cells: 0,
static_size_bytes: 0,
},
);
if let Some(static_table) = self.static_tables.get(&entity_path_hash) {
entity_stats.num_static_cells = static_table.cells.len() as _;
entity_stats.static_size_bytes = static_table
.cells
.values()
.map(|static_cell| static_cell.cell.total_size_bytes())
.sum();
}
entity_stats
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct EntityStats {
pub num_rows: u64,
pub size_bytes: u64,
pub time_range: re_log_types::ResolvedTimeRange,
pub num_static_cells: u64,
pub static_size_bytes: u64,
}
impl Default for EntityStats {
fn default() -> Self {
Self {
num_rows: 0,
size_bytes: 0,
time_range: re_log_types::ResolvedTimeRange::EMPTY,
num_static_cells: 0,
static_size_bytes: 0,
}
}
}
impl IndexedTable {
#[inline]
pub fn num_rows(&self) -> u64 {
self.buckets_num_rows
}
#[inline]
pub(crate) fn num_rows_uncached(&self) -> u64 {
re_tracing::profile_function!();
self.buckets.values().map(|bucket| bucket.num_rows()).sum()
}
#[inline]
pub(crate) fn size_bytes_uncached(&self) -> u64 {
re_tracing::profile_function!();
self.stack_size_bytes()
+ self
.buckets
.values()
.map(|bucket| bucket.total_size_bytes())
.sum::<u64>()
}
#[inline]
pub fn num_buckets(&self) -> u64 {
self.buckets.len() as _
}
pub fn time_range(&self) -> ResolvedTimeRange {
if let (Some((_, first)), Some((_, last))) = (
self.buckets.first_key_value(),
self.buckets.last_key_value(),
) {
first
.inner
.read()
.time_range
.union(last.inner.read().time_range)
} else {
ResolvedTimeRange::EMPTY
}
}
}
impl SizeBytes for IndexedTable {
#[inline]
fn heap_size_bytes(&self) -> u64 {
self.buckets_size_bytes
}
}
impl IndexedBucket {
#[inline]
pub fn num_rows(&self) -> u64 {
self.inner.read().col_time.len() as u64
}
}
impl SizeBytes for IndexedBucket {
#[inline]
fn heap_size_bytes(&self) -> u64 {
self.inner.read().size_bytes
}
}
impl IndexedBucketInner {
#[inline]
pub fn compute_size_bytes(&mut self) -> u64 {
re_tracing::profile_function!();
let Self {
is_sorted,
time_range,
col_time,
col_insert_id,
col_row_id,
max_row_id,
columns,
size_bytes,
} = self;
*size_bytes = is_sorted.total_size_bytes()
+ time_range.total_size_bytes()
+ col_time.total_size_bytes()
+ col_insert_id.total_size_bytes()
+ col_row_id.total_size_bytes()
+ max_row_id.total_size_bytes()
+ columns.total_size_bytes()
+ size_bytes.total_size_bytes();
*size_bytes
}
}