1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
use std::collections::{BTreeMap, BTreeSet};

use ahash::HashSet;
use itertools::Itertools;

use nohash_hasher::IntMap;
use re_data_store::{StoreDiff, StoreDiffKind, StoreEvent, StoreSubscriber};
use re_log_types::{
    ComponentPath, EntityPath, EntityPathHash, EntityPathPart, RowId, TimeInt, TimePoint, Timeline,
};
use re_types_core::{ComponentName, Loggable};

// Used all over in docstrings.
#[allow(unused_imports)]
use re_data_store::DataStore;

use crate::TimeHistogramPerTimeline;

// ----------------------------------------------------------------------------

/// A recursive, manually updated [`re_data_store::StoreSubscriber`] that maintains the entity hierarchy.
///
/// The tree contains a list of subtrees, and so on recursively.
pub struct EntityTree {
    /// Full path prefix to the root of this (sub)tree.
    pub path: EntityPath,

    /// Direct descendants of this (sub)tree.
    pub children: BTreeMap<EntityPathPart, EntityTree>,

    /// Information about this specific entity (excluding children).
    pub entity: EntityInfo,

    /// Info about this subtree, including all children, recursively.
    pub subtree: SubtreeInfo,
}

// NOTE: This is only to let people know that this is in fact a [`StoreSubscriber`], so they A) don't try
// to implement it on their own and B) don't try to register it.
impl StoreSubscriber for EntityTree {
    fn name(&self) -> String {
        "rerun.store_subscribers.EntityTree".into()
    }

    fn as_any(&self) -> &dyn std::any::Any {
        self
    }

    fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
        self
    }

    #[allow(clippy::unimplemented)]
    fn on_events(&mut self, _events: &[StoreEvent]) {
        unimplemented!(
            r"EntityTree view is maintained manually, see `EntityTree::on_store_{{additions|deletions}}`"
        );
    }
}

/// Information about this specific entity (excluding children).
#[derive(Default)]
pub struct EntityInfo {
    /// Book-keeping around whether we should clear fields when data is added.
    clears: BTreeMap<RowId, TimePoint>,

    /// Flat time histograms for each component of this [`EntityTree`].
    ///
    /// Keeps track of the _number of times a component is logged_ per time per timeline, only for
    /// this specific [`EntityTree`].
    /// A component logged twice at the same timestamp is counted twice.
    ///
    /// ⚠ Auto-generated instance keys are _not_ accounted for. ⚠
    pub components: BTreeMap<ComponentName, TimeHistogramPerTimeline>,
}

/// Info about stuff at a given [`EntityPath`], including all of its children, recursively.
#[derive(Default)]
pub struct SubtreeInfo {
    /// Book-keeping around whether we should clear recursively when data is added.
    clears: BTreeMap<RowId, TimePoint>,

    /// Recursive time histogram for this [`EntityTree`].
    ///
    /// Keeps track of the _number of components logged_ per time per timeline, recursively across
    /// all of the [`EntityTree`]'s children.
    /// A component logged twice at the same timestamp is counted twice.
    ///
    /// ⚠ Auto-generated instance keys are _not_ accounted for. ⚠
    pub time_histogram: TimeHistogramPerTimeline,

    /// Number of bytes used by all arrow data
    data_bytes: u64,
}

impl SubtreeInfo {
    /// Assumes the event has been filtered to be part of this subtree.
    fn on_event(&mut self, event: &StoreEvent) {
        use re_types_core::SizeBytes as _;

        match event.kind {
            StoreDiffKind::Addition => {
                self.time_histogram
                    .add(&event.times, event.num_components() as _);

                for cell in event.cells.values() {
                    self.data_bytes += cell.total_size_bytes();
                }
            }
            StoreDiffKind::Deletion => {
                self.time_histogram
                    .remove(&event.timepoint(), event.num_components() as _);

                for cell in event.cells.values() {
                    let removed_bytes = cell.total_size_bytes();
                    self.data_bytes =
                        self.data_bytes
                            .checked_sub(removed_bytes)
                            .unwrap_or_else(|| {
                                re_log::debug!(
                                    store_id = %event.store_id,
                                    entity_path = %event.diff.entity_path,
                                    current = self.data_bytes,
                                    removed = removed_bytes,
                                    "book keeping underflowed"
                                );
                                u64::MIN
                            });
                }
            }
        }
    }

    /// Number of bytes used by all arrow data in this tree (including their schemas, but otherwise ignoring book-keeping overhead).
    #[inline]
    pub fn data_bytes(&self) -> u64 {
        self.data_bytes
    }
}

/// Maintains an optimized representation of a batch of [`StoreEvent`]s specifically designed to
/// accelerate garbage collection of [`EntityTree`]s.
///
/// See [`EntityTree::on_store_deletions`].
#[derive(Default)]
pub struct CompactedStoreEvents {
    /// What rows were deleted?
    pub row_ids: HashSet<RowId>,

    /// What time points were deleted for each entity+timeline+component?
    pub temporal: IntMap<EntityPathHash, IntMap<Timeline, IntMap<ComponentName, Vec<TimeInt>>>>,

    /// For each entity+component, how many timeless entries were deleted?
    pub timeless: IntMap<EntityPathHash, IntMap<ComponentName, u64>>,
}

impl CompactedStoreEvents {
    pub fn new(store_events: &[&StoreEvent]) -> Self {
        let mut this = CompactedStoreEvents {
            row_ids: store_events.iter().map(|event| event.row_id).collect(),
            temporal: Default::default(),
            timeless: Default::default(),
        };

        for event in store_events {
            if event.is_static() {
                let per_component = this.timeless.entry(event.entity_path.hash()).or_default();
                for component_name in event.cells.keys() {
                    *per_component.entry(*component_name).or_default() +=
                        event.delta().unsigned_abs();
                }
            } else {
                for &(timeline, time) in &event.times {
                    let per_timeline = this.temporal.entry(event.entity_path.hash()).or_default();
                    let per_component = per_timeline.entry(timeline).or_default();
                    for component_name in event.cells.keys() {
                        per_component.entry(*component_name).or_default().push(time);
                    }
                }
            }
        }

        this
    }
}

/// Cascaded clears that need be to applied to the [`DataStore`] as a result of modifying the [`EntityTree`].
///
/// When an [`EntityTree`] gets updated with new data, two cascading effects might happen:
///
/// 1. If the data contains a `Clear` component, then inserting it will trigger an immediate clear
///    at this specific timepoint, that can affect an arbitrary number of components and, if the `Clear`
///    is recursive, even an arbitrary number of entity paths.
///    That `Clear` then lives on and might affect data added later on, which leads us to
///    side-effect #2 described below.
///
/// 2. If data is inserted at an entity path that is under the influence of a previously logged
///    `Clear` component, then the insertion will trigger a pending clear for all components at
///    that path.
///
/// `Clear` components themselves are not affected by clears.
#[derive(Debug, Clone, Default)]
pub struct ClearCascade {
    /// [`ComponentPath`]s that should be cleared as a result of the cascade.
    ///
    /// Keep in mind: these are the [`RowId`]s of the `Clear` components that triggered the
    /// cascades, they are therefore not unique and, by definition, illegal!
    pub to_be_cleared: BTreeMap<RowId, BTreeMap<EntityPath, (TimePoint, BTreeSet<ComponentPath>)>>,
}

impl ClearCascade {
    pub fn is_empty(&self) -> bool {
        let Self { to_be_cleared } = self;
        to_be_cleared.is_empty()
    }
}

impl EntityTree {
    pub fn root() -> Self {
        Self::new(EntityPath::root(), Default::default())
    }

    pub fn new(path: EntityPath, recursive_clears: BTreeMap<RowId, TimePoint>) -> Self {
        Self {
            path,
            children: Default::default(),
            entity: EntityInfo {
                clears: recursive_clears.clone(),
                ..Default::default()
            },
            subtree: SubtreeInfo {
                clears: recursive_clears,
                ..Default::default()
            },
        }
    }

    /// Has no child entities.
    pub fn is_leaf(&self) -> bool {
        self.children.is_empty()
    }

    pub fn num_children_and_fields(&self) -> usize {
        self.children.len() + self.entity.components.len()
    }

    /// Number of timeless messages in this tree, or any child, recursively.
    pub fn num_static_messages_recursive(&self) -> u64 {
        self.subtree.time_histogram.num_static_messages()
    }

    pub fn time_histogram_for_component(
        &self,
        timeline: &Timeline,
        component_name: impl Into<ComponentName>,
    ) -> Option<&crate::TimeHistogram> {
        self.entity
            .components
            .get(&component_name.into())
            .and_then(|per_timeline| per_timeline.get(timeline))
    }

    /// Updates the [`EntityTree`] by applying a batch of [`StoreEvent`]s.
    ///
    /// Returns an [`ClearCascade`] that describes the cascading side-effects to be applied to the
    /// [`DataStore`] as a result, if any.
    /// See [`ClearCascade`]'s documentation for more information.
    ///
    /// Only reacts to additions (`event.kind == StoreDiffKind::Addition`).
    pub fn on_store_additions(&mut self, events: &[StoreEvent]) -> ClearCascade {
        re_tracing::profile_function!();

        let mut clear_cascade = ClearCascade::default();
        for event in events.iter().filter(|e| e.kind == StoreDiffKind::Addition) {
            self.on_store_addition(event, &mut clear_cascade);
        }
        clear_cascade
    }

    fn on_store_addition(&mut self, event: &StoreEvent, clear_cascade: &mut ClearCascade) {
        re_tracing::profile_function!();

        let entity_path = &event.diff.entity_path;

        // Book-keeping for each level in the hierarchy:
        let mut tree = self;
        tree.subtree.on_event(event);

        for (i, part) in entity_path.iter().enumerate() {
            tree = tree.children.entry(part.clone()).or_insert_with(|| {
                EntityTree::new(
                    entity_path.as_slice()[..=i].into(),
                    tree.subtree.clears.clone(),
                )
            });
            tree.subtree.on_event(event);
        }

        // Finally book-keeping for the entity where data was actually added:
        tree.on_added_data(clear_cascade, &event.diff);
    }

    /// Handles the addition of new data into the tree.
    ///
    /// Updates the given [`ClearCascade`] with immediate and pending clears as a
    /// result of the operation.
    fn on_added_data(&mut self, clear_cascade: &mut ClearCascade, store_diff: &StoreDiff) {
        for (component_name, cell) in &store_diff.cells {
            let component_path =
                ComponentPath::new(store_diff.entity_path.clone(), *component_name);

            let mut pending_clears = vec![];

            let per_component = self
                .entity
                .components
                .entry(component_path.component_name)
                .or_insert_with(|| {
                    // If we needed to create a new leaf to hold this data, we also want to
                    // insert all of the historical pending clear operations.
                    pending_clears = self.entity.clears.clone().into_iter().collect_vec();
                    Default::default()
                });
            per_component.add(&store_diff.times, 1);

            // Is the newly added component under the influence of previously logged `Clear`
            // component?
            //
            // If so, this is one of two cascading side-effects that happen when updating the entity
            // tree: a pending clear.
            //
            // We need to inform the [`DataStore`] that it should insert a cleared batch for the
            // current component, _using the Timepoint and RowId of the previously logged clear_.
            //
            // ## RowId duplication
            //
            // We want to insert new data (empty cells) using an old RowId (specifically, the RowId
            // of the original insertion that was used to register the pending clear in the first
            // place).
            // By definition, this is illegal: RowIds are unique.
            //
            // On the other hand, the GC process is driven by RowId order, which means we must make
            // sure that the empty cell we're inserting uses a RowId with a similar timestamp as the
            // one used in the original `Clear` component cell, so they roughly get GC'd at the same time.
            //
            // This is fine, the insertion retry mechanism will make sure we get a unique RowId
            // that is still close to this one.

            for (pending_row_id, pending_timepoint) in pending_clears {
                let per_entity = clear_cascade
                    .to_be_cleared
                    .entry(pending_row_id)
                    .or_default();
                let (timepoint, component_paths) = per_entity
                    .entry(store_diff.entity_path.clone())
                    .or_default();
                *timepoint = pending_timepoint.union_max(timepoint);
                component_paths.insert(component_path.clone());
            }

            use re_types_core::components::ClearIsRecursive;
            if cell.component_name() == ClearIsRecursive::name() {
                let is_recursive = cell
                    .try_to_native_mono::<ClearIsRecursive>()
                    .unwrap()
                    .map_or(false, |settings| settings.0);

                self.on_added_clear(clear_cascade, store_diff, is_recursive);
            }
        }
    }

    /// Handles the addition of new `Clear` component into the tree.
    ///
    /// Updates the given [`ClearCascade`] as a result of the operation.
    ///
    /// Additional pending clear operations will be stored in the tree for future
    /// insertion.
    fn on_added_clear(
        &mut self,
        clear_cascade: &mut ClearCascade,
        store_diff: &StoreDiff,
        is_recursive: bool,
    ) {
        use re_types_core::{archetypes::Clear, components::ClearIsRecursive, Archetype as _};

        re_tracing::profile_function!();

        fn filter_out_clear_components(comp_name: &ComponentName) -> bool {
            let is_clear_component = [
                Clear::indicator().name(), //
                ClearIsRecursive::name(),  //
            ]
            .contains(comp_name);
            !is_clear_component
        }

        fn clear_tree(
            tree: &mut EntityTree,
            is_recursive: bool,
            row_id: RowId,
            timepoint: TimePoint,
        ) -> impl IntoIterator<Item = ComponentPath> + '_ {
            if is_recursive {
                // Track that any future children need a Null at the right timepoint when added.
                let cur_timepoint = tree.subtree.clears.entry(row_id).or_default();
                *cur_timepoint = timepoint.clone().union_max(cur_timepoint);
            }

            // Track that any future fields need a Null at the right timepoint when added.
            let cur_timepoint = tree.entity.clears.entry(row_id).or_default();
            *cur_timepoint = timepoint.union_max(cur_timepoint);

            // For every existing field return a clear event.
            tree.entity
                .components
                .keys()
                // Don't clear `Clear` components, or we'd end up with recursive cascades!
                .filter(|comp_name| filter_out_clear_components(comp_name))
                .map(|component_name| ComponentPath::new(tree.path.clone(), *component_name))
        }

        let mut cleared_paths = BTreeSet::new();

        if is_recursive {
            let mut stack = vec![];
            stack.push(self);
            while let Some(next) = stack.pop() {
                cleared_paths.extend(clear_tree(
                    next,
                    is_recursive,
                    store_diff.row_id,
                    store_diff.timepoint(),
                ));
                stack.extend(next.children.values_mut().collect::<Vec<&mut Self>>());
            }
        } else {
            cleared_paths.extend(clear_tree(
                self,
                is_recursive,
                store_diff.row_id,
                store_diff.timepoint(),
            ));
        }

        // Are there previous logged components under the influence of the newly logged `Clear`
        // component?
        //
        // If so, this is one of two cascading side-effects that happen when updating the entity
        // tree: an immediate clear.
        //
        // We need to inform the [`DataStore`] that it should insert a cleared batch for each of
        // these components, _using the Timepoint and RowId of the newly logged clear_.
        //
        // ## RowId duplication
        //
        // We want to insert new data (empty cells) using a single RowId (specifically, the RowId
        // that was used to log this new `Clear` component.
        // By definition, this is illegal: RowIds are unique.
        //
        // On the other hand, the GC process is driven by RowId order, which means we must make
        // sure that the empty cell we're inserting uses a RowId with a similar timestamp as the
        // one used by the `Clear` component cell, so they roughly get GC'd at the same time.
        //
        // This is fine, the insertion retry mechanism will make sure we get a unique RowId
        // that is still close to this one.

        for component_path in cleared_paths {
            let per_entity = clear_cascade
                .to_be_cleared
                .entry(store_diff.row_id)
                .or_default();
            let (timepoint, component_paths) = per_entity
                .entry(component_path.entity_path().clone())
                .or_default();

            *timepoint = store_diff.timepoint().union_max(timepoint);
            component_paths.insert(component_path.clone());
        }
    }

    /// Updates the [`EntityTree`] by applying a batch of [`StoreEvent`]s.
    ///
    /// Returns an [`ClearCascade`] that describes a list of deletions that should be applied
    /// to the store as a result.
    ///
    /// Only reacts to additions (`event.kind == StoreDiffKind::Addition`).
    pub fn on_store_deletions(
        &mut self,
        store_events: &[&StoreEvent],
        compacted: &CompactedStoreEvents,
    ) {
        re_tracing::profile_function!();

        let Self {
            path,
            children,
            entity,
            subtree,
        } = self;

        // Only keep events relevant to this branch of the tree.
        let subtree_events = store_events
            .iter()
            .filter(|e| e.entity_path.starts_with(path))
            .copied() // NOTE: not actually copying, just removing the superfluous ref layer
            .collect_vec();

        {
            re_tracing::profile_scope!("entity");

            {
                re_tracing::profile_scope!("clears");
                entity
                    .clears
                    .retain(|row_id, _| !compacted.row_ids.contains(row_id));
            }

            re_tracing::profile_scope!("components");
            for event in subtree_events.iter().filter(|e| &e.entity_path == path) {
                for component_name in event.cells.keys() {
                    if let Some(histo) = entity.components.get_mut(component_name) {
                        histo.remove(&event.timepoint(), 1);
                        if histo.is_empty() {
                            entity.components.remove(component_name);
                        }
                    }
                }
            }
        }

        {
            re_tracing::profile_scope!("subtree");
            {
                re_tracing::profile_scope!("clears");
                subtree
                    .clears
                    .retain(|row_id, _| !compacted.row_ids.contains(row_id));
            }
            re_tracing::profile_scope!("on_event");
            for &event in &subtree_events {
                subtree.on_event(event);
            }
        }

        children.retain(|_, child| {
            child.on_store_deletions(&subtree_events, compacted);
            child.num_children_and_fields() > 0
        });
    }

    pub fn subtree(&self, path: &EntityPath) -> Option<&Self> {
        fn subtree_recursive<'tree>(
            this: &'tree EntityTree,
            path: &[EntityPathPart],
        ) -> Option<&'tree EntityTree> {
            match path {
                [] => Some(this),
                [first, rest @ ..] => subtree_recursive(this.children.get(first)?, rest),
            }
        }

        subtree_recursive(self, path.as_slice())
    }

    // Invokes visitor for `self` and all children recursively.
    pub fn visit_children_recursively(&self, visitor: &mut impl FnMut(&EntityPath, &EntityInfo)) {
        visitor(&self.path, &self.entity);
        for child in self.children.values() {
            child.visit_children_recursively(visitor);
        }
    }
}