1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
use std::collections::BTreeSet;
use std::{collections::BTreeMap, sync::Arc};

use indexmap::IndexMap;
use itertools::Itertools;
use parking_lot::RwLock;

use re_data_store::{DataStore, LatestAtQuery, TimeInt};
use re_log_types::EntityPath;
use re_types_core::ComponentName;
use re_types_core::SizeBytes;

use crate::{CacheKey, Caches, LatestAtComponentResults, LatestAtResults, Promise};

// ---

impl Caches {
    /// Queries for the given `component_names` using latest-at semantics.
    ///
    /// See [`LatestAtResults`] for more information about how to handle the results.
    ///
    /// This is a cached API -- data will be lazily cached upon access.
    pub fn latest_at(
        &self,
        store: &DataStore,
        query: &LatestAtQuery,
        entity_path: &EntityPath,
        component_names: impl IntoIterator<Item = ComponentName>,
    ) -> LatestAtResults {
        re_tracing::profile_function!(entity_path.to_string());

        let mut results = LatestAtResults::default();

        for component_name in component_names {
            let key = CacheKey::new(entity_path.clone(), query.timeline(), component_name);

            let cache = if crate::cacheable(component_name) {
                Arc::clone(
                    self.latest_at_per_cache_key
                        .write()
                        .entry(key.clone())
                        .or_insert_with(|| Arc::new(RwLock::new(LatestAtCache::new(key.clone())))),
                )
            } else {
                // If the component shouldn't be cached, simply instantiate a new cache for it.
                // It will be dropped when the user is done with it.
                Arc::new(RwLock::new(LatestAtCache::new(key.clone())))
            };

            let mut cache = cache.write();
            cache.handle_pending_invalidation();
            if let Some(cached) = cache.latest_at(store, query, entity_path, component_name) {
                results.add(component_name, cached);
            }
        }

        results
    }
}

// ---

/// Caches the results of `LatestAt` queries for a given [`CacheKey`].
pub struct LatestAtCache {
    /// For debugging purposes.
    pub cache_key: CacheKey,

    /// Organized by _query_ time.
    ///
    /// If the data you're looking for isn't in here, try partially running the query and check
    /// if there is any data available for the resulting _data_ time in [`Self::per_data_time`].
    //
    // NOTE: `Arc` so we can share buckets across query time & data time.
    pub per_query_time: BTreeMap<TimeInt, Arc<LatestAtComponentResults>>,

    /// Organized by _data_ time.
    ///
    /// Due to how our latest-at semantics work, any number of queries at time `T+n` where `n >= 0`
    /// can result in a data time of `T`.
    //
    // NOTE: `Arc` so we can share buckets across query time & data time.
    pub per_data_time: BTreeMap<TimeInt, Arc<LatestAtComponentResults>>,

    /// These timestamps have been invalidated asynchronously.
    ///
    /// The next time this cache gets queried, it must remove any invalidated entries accordingly.
    ///
    /// Invalidation is deferred to query time because it is far more efficient that way: the frame
    /// time effectively behaves as a natural micro-batching mechanism.
    pub pending_invalidations: BTreeSet<TimeInt>,
}

impl LatestAtCache {
    #[inline]
    pub fn new(cache_key: CacheKey) -> Self {
        Self {
            cache_key,
            per_query_time: Default::default(),
            per_data_time: Default::default(),
            pending_invalidations: Default::default(),
        }
    }
}

impl std::fmt::Debug for LatestAtCache {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        let Self {
            cache_key,
            per_query_time,
            per_data_time,
            pending_invalidations: _,
        } = self;

        let mut strings = Vec::new();

        struct StatsPerBucket {
            query_times: BTreeSet<TimeInt>,
            data_time: TimeInt,
            total_size_bytes: u64,
        }

        let mut buckets: IndexMap<_, _> = per_data_time
            .iter()
            .map(|(&data_time, bucket)| {
                (
                    Arc::as_ptr(bucket),
                    StatsPerBucket {
                        query_times: Default::default(),
                        data_time,
                        total_size_bytes: bucket.total_size_bytes(),
                    },
                )
            })
            .collect();

        for (&query_time, bucket) in per_query_time {
            if let Some(bucket) = buckets.get_mut(&Arc::as_ptr(bucket)) {
                bucket.query_times.insert(query_time);
            }
        }

        for bucket in buckets.values() {
            strings.push(format!(
                "query_times=[{}] -> data_time={:?} ({})",
                bucket
                    .query_times
                    .iter()
                    .map(|t| cache_key.timeline.typ().format_utc(*t))
                    .collect_vec()
                    .join(", "),
                bucket.data_time.as_i64(),
                re_format::format_bytes(bucket.total_size_bytes as _),
            ));
        }

        if strings.is_empty() {
            return f.write_str("<empty>");
        }

        f.write_str(&strings.join("\n").replace("\n\n", "\n"))
    }
}

impl SizeBytes for LatestAtCache {
    #[inline]
    fn heap_size_bytes(&self) -> u64 {
        let Self {
            cache_key: _,
            per_query_time,
            per_data_time,
            pending_invalidations,
        } = self;

        let per_query_time = per_query_time
            .keys()
            .map(|k| k.total_size_bytes())
            .sum::<u64>();
        // NOTE: per query time buckets are just pointers, don't count them.

        let per_data_time_keys = per_data_time
            .keys()
            .map(|k| k.total_size_bytes())
            .sum::<u64>();
        let per_data_time_values = per_data_time
            .values()
            // NOTE: make sure to dereference the Arc, else this will account for zero (assumed amortized!)
            .map(|arc| (**arc).total_size_bytes())
            .sum::<u64>();

        let per_data_time = per_data_time_keys + per_data_time_values;
        let pending_invalidations = pending_invalidations.total_size_bytes();

        per_query_time + per_data_time + pending_invalidations
    }
}

impl LatestAtCache {
    /// Queries cached latest-at data for a single component.
    pub fn latest_at(
        &mut self,
        store: &DataStore,
        query: &LatestAtQuery,
        entity_path: &EntityPath,
        component_name: ComponentName,
    ) -> Option<Arc<LatestAtComponentResults>> {
        re_tracing::profile_scope!("latest_at", format!("{query:?}"));

        let LatestAtCache {
            cache_key: _,
            per_query_time,
            per_data_time,
            pending_invalidations: _,
        } = self;

        let query_time_bucket_at_query_time = match per_query_time.entry(query.at()) {
            std::collections::btree_map::Entry::Occupied(entry) => {
                // Fastest path: we have an entry for this exact query time, no need to look any
                // further.
                return Some(Arc::clone(entry.get()));
            }
            std::collections::btree_map::Entry::Vacant(entry) => entry,
        };

        let result = store.latest_at(query, entity_path, component_name, &[component_name]);

        // NOTE: cannot `result.and_then(...)` or borrowck gets lost.
        if let Some((data_time, row_id, mut cells)) = result {
            // Fast path: we've run the query and realized that we already have the data for the resulting
            // _data_ time, so let's use that to avoid join & deserialization costs.
            if let Some(data_time_bucket_at_data_time) = per_data_time.get(&data_time) {
                query_time_bucket_at_query_time.insert(Arc::clone(data_time_bucket_at_data_time));

                // We now know for a fact that a query at that data time would yield the same
                // results: copy the bucket accordingly so that the next cache hit for that query
                // time ends up taking the fastest path.
                let query_time_bucket_at_data_time = per_query_time.entry(data_time);
                query_time_bucket_at_data_time
                    .and_modify(|v| *v = Arc::clone(data_time_bucket_at_data_time))
                    .or_insert(Arc::clone(data_time_bucket_at_data_time));

                return Some(Arc::clone(data_time_bucket_at_data_time));
            }

            // Soundness:
            // * `cells[0]` is guaranteed to exist since we passed in `&[component_name]`
            // * `cells[0]` is guaranteed to be non-null, otherwise this whole result would be null
            let Some(cell) = cells[0].take() else {
                debug_assert!(cells[0].is_some(), "unreachable: `cells[0]` is missing");
                return None;
            };

            let bucket = Arc::new(LatestAtComponentResults {
                index: (data_time, row_id),
                promise: Some(Promise::new(cell)),
                cached_dense: Default::default(),
            });

            // Slowest path: this is a complete cache miss.
            {
                let query_time_bucket_at_query_time =
                    query_time_bucket_at_query_time.insert(Arc::clone(&bucket));

                let data_time_bucket_at_data_time = per_data_time.entry(data_time);
                data_time_bucket_at_data_time
                    .and_modify(|v| *v = Arc::clone(query_time_bucket_at_query_time))
                    .or_insert(Arc::clone(query_time_bucket_at_query_time));
            }

            Some(bucket)
        } else {
            None
        }
    }

    pub fn handle_pending_invalidation(&mut self) {
        let Self {
            cache_key: _,
            per_query_time,
            per_data_time,
            pending_invalidations,
        } = self;

        // First, remove any data indexed by a _query time_ that's more recent than the oldest
        // _data time_ that's been invalidated.
        //
        // Note that this data time might very well be `TimeInt::STATIC`, in which case the entire
        // query-time-based index will be dropped.
        if let Some(&oldest_data_time) = pending_invalidations.first() {
            per_query_time.retain(|&query_time, _| query_time < oldest_data_time);
        }

        // Second, remove any data indexed by _data time_, if it's been invalidated.
        let mut dropped_data_times = Vec::new();
        per_data_time.retain(|data_time, _| {
            if pending_invalidations.contains(data_time) {
                dropped_data_times.push(*data_time);
                false
            } else {
                true
            }
        });

        // TODO(#5974): Because of non-deterministic ordering and parallelism and all things of that
        // nature, it can happen that we try to handle pending invalidations before we even cached
        // the associated data.
        //
        // If that happens, the data will be cached after we've invalidated *nothing*, and will stay
        // there indefinitely since the cache doesn't have a dedicated GC yet.
        //
        // TL;DR: make sure to keep track of pending invalidations indefinitely as long as we
        // haven't had the opportunity to actually invalidate the associated data.
        for data_time in dropped_data_times {
            pending_invalidations.remove(&data_time);
        }
    }
}