use crate::{
cfg::{self, CfgPrivate},
clear::Clear,
page,
sync::{
alloc,
atomic::{
AtomicPtr, AtomicUsize,
Ordering::{self, *},
},
},
tid::Tid,
Pack,
};
use std::{fmt, ptr, slice};
pub(crate) struct Shard<T, C: cfg::Config> {
pub(crate) tid: usize,
local: Box<[page::Local]>,
shared: Box<[page::Shared<T, C>]>,
}
pub(crate) struct Array<T, C: cfg::Config> {
shards: Box<[Ptr<T, C>]>,
max: AtomicUsize,
}
#[derive(Debug)]
struct Ptr<T, C: cfg::Config>(AtomicPtr<alloc::Track<Shard<T, C>>>);
#[derive(Debug)]
pub(crate) struct IterMut<'a, T: 'a, C: cfg::Config + 'a>(slice::IterMut<'a, Ptr<T, C>>);
impl<T, C> Shard<T, C>
where
C: cfg::Config,
{
#[inline(always)]
pub(crate) fn with_slot<'a, U>(
&'a self,
idx: usize,
f: impl FnOnce(&'a page::Slot<T, C>) -> Option<U>,
) -> Option<U> {
debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
let (addr, page_index) = page::indices::<C>(idx);
test_println!("-> {:?}", addr);
if page_index >= self.shared.len() {
return None;
}
self.shared[page_index].with_slot(addr, f)
}
pub(crate) fn new(tid: usize) -> Self {
let mut total_sz = 0;
let shared = (0..C::MAX_PAGES)
.map(|page_num| {
let sz = C::page_size(page_num);
let prev_sz = total_sz;
total_sz += sz;
page::Shared::new(sz, prev_sz)
})
.collect();
let local = (0..C::MAX_PAGES).map(|_| page::Local::new()).collect();
Self { tid, local, shared }
}
}
impl<T, C> Shard<Option<T>, C>
where
C: cfg::Config,
{
pub(crate) fn take_local(&self, idx: usize) -> Option<T> {
debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
let (addr, page_index) = page::indices::<C>(idx);
test_println!("-> remove_local {:?}", addr);
self.shared
.get(page_index)?
.take(addr, C::unpack_gen(idx), self.local(page_index))
}
pub(crate) fn take_remote(&self, idx: usize) -> Option<T> {
debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
debug_assert!(Tid::<C>::current().as_usize() != self.tid);
let (addr, page_index) = page::indices::<C>(idx);
test_println!("-> take_remote {:?}; page {:?}", addr, page_index);
let shared = self.shared.get(page_index)?;
shared.take(addr, C::unpack_gen(idx), shared.free_list())
}
pub(crate) fn remove_local(&self, idx: usize) -> bool {
debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
let (addr, page_index) = page::indices::<C>(idx);
if page_index >= self.shared.len() {
return false;
}
self.shared[page_index].remove(addr, C::unpack_gen(idx), self.local(page_index))
}
pub(crate) fn remove_remote(&self, idx: usize) -> bool {
debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
let (addr, page_index) = page::indices::<C>(idx);
if page_index >= self.shared.len() {
return false;
}
let shared = &self.shared[page_index];
shared.remove(addr, C::unpack_gen(idx), shared.free_list())
}
pub(crate) fn iter(&self) -> std::slice::Iter<'_, page::Shared<Option<T>, C>> {
self.shared.iter()
}
}
impl<T, C> Shard<T, C>
where
T: Clear + Default,
C: cfg::Config,
{
pub(crate) fn init_with<U>(
&self,
mut init: impl FnMut(usize, &page::Slot<T, C>) -> Option<U>,
) -> Option<U> {
for (page_idx, page) in self.shared.iter().enumerate() {
let local = self.local(page_idx);
test_println!("-> page {}; {:?}; {:?}", page_idx, local, page);
if let Some(res) = page.init_with(local, &mut init) {
return Some(res);
}
}
None
}
pub(crate) fn mark_clear_local(&self, idx: usize) -> bool {
debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
let (addr, page_index) = page::indices::<C>(idx);
if page_index >= self.shared.len() {
return false;
}
self.shared[page_index].mark_clear(addr, C::unpack_gen(idx), self.local(page_index))
}
pub(crate) fn mark_clear_remote(&self, idx: usize) -> bool {
debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
let (addr, page_index) = page::indices::<C>(idx);
if page_index >= self.shared.len() {
return false;
}
let shared = &self.shared[page_index];
shared.mark_clear(addr, C::unpack_gen(idx), shared.free_list())
}
pub(crate) fn clear_after_release(&self, idx: usize) {
crate::sync::atomic::fence(crate::sync::atomic::Ordering::Acquire);
let tid = Tid::<C>::current().as_usize();
test_println!(
"-> clear_after_release; self.tid={:?}; current.tid={:?};",
tid,
self.tid
);
if tid == self.tid {
self.clear_local(idx);
} else {
self.clear_remote(idx);
}
}
fn clear_local(&self, idx: usize) -> bool {
debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
let (addr, page_index) = page::indices::<C>(idx);
if page_index >= self.shared.len() {
return false;
}
self.shared[page_index].clear(addr, C::unpack_gen(idx), self.local(page_index))
}
fn clear_remote(&self, idx: usize) -> bool {
debug_assert_eq_in_drop!(Tid::<C>::from_packed(idx).as_usize(), self.tid);
let (addr, page_index) = page::indices::<C>(idx);
if page_index >= self.shared.len() {
return false;
}
let shared = &self.shared[page_index];
shared.clear(addr, C::unpack_gen(idx), shared.free_list())
}
#[inline(always)]
fn local(&self, i: usize) -> &page::Local {
#[cfg(debug_assertions)]
debug_assert_eq_in_drop!(
Tid::<C>::current().as_usize(),
self.tid,
"tried to access local data from another thread!"
);
&self.local[i]
}
}
impl<T: fmt::Debug, C: cfg::Config> fmt::Debug for Shard<T, C> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut d = f.debug_struct("Shard");
#[cfg(debug_assertions)]
d.field("tid", &self.tid);
d.field("shared", &self.shared).finish()
}
}
impl<T, C> Array<T, C>
where
C: cfg::Config,
{
pub(crate) fn new() -> Self {
let mut shards = Vec::with_capacity(C::MAX_SHARDS);
for _ in 0..C::MAX_SHARDS {
shards.push(Ptr::null());
}
Self {
shards: shards.into(),
max: AtomicUsize::new(0),
}
}
#[inline]
pub(crate) fn get(&self, idx: usize) -> Option<&Shard<T, C>> {
test_println!("-> get shard={}", idx);
self.shards.get(idx)?.load(Acquire)
}
#[inline]
pub(crate) fn current(&self) -> (Tid<C>, &Shard<T, C>) {
let tid = Tid::<C>::current();
test_println!("current: {:?}", tid);
let idx = tid.as_usize();
assert!(
idx < self.shards.len(),
"Thread count overflowed the configured max count. \
Thread index = {}, max threads = {}.",
idx,
C::MAX_SHARDS,
);
let shard = self.shards[idx].load(Relaxed).unwrap_or_else(|| {
let ptr = Box::into_raw(Box::new(alloc::Track::new(Shard::new(idx))));
test_println!("-> allocated new shard for index {} at {:p}", idx, ptr);
self.shards[idx].set(ptr);
let mut max = self.max.load(Acquire);
while max < idx {
match self.max.compare_exchange(max, idx, AcqRel, Acquire) {
Ok(_) => break,
Err(actual) => max = actual,
}
}
test_println!("-> highest index={}, prev={}", std::cmp::max(max, idx), max);
unsafe {
&*ptr
}
.get_ref()
});
(tid, shard)
}
pub(crate) fn iter_mut(&mut self) -> IterMut<'_, T, C> {
test_println!("Array::iter_mut");
let max = self.max.load(Acquire);
test_println!("-> highest index={}", max);
IterMut(self.shards[0..=max].iter_mut())
}
}
impl<T, C: cfg::Config> Drop for Array<T, C> {
fn drop(&mut self) {
let max = self.max.load(Acquire);
for shard in &self.shards[0..=max] {
let ptr = shard.0.load(Acquire);
if ptr.is_null() {
continue;
}
let shard = unsafe {
Box::from_raw(ptr)
};
drop(shard)
}
}
}
impl<T: fmt::Debug, C: cfg::Config> fmt::Debug for Array<T, C> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let max = self.max.load(Acquire);
let mut set = f.debug_map();
for shard in &self.shards[0..=max] {
let ptr = shard.0.load(Acquire);
if let Some(shard) = ptr::NonNull::new(ptr) {
set.entry(&format_args!("{:p}", ptr), unsafe { shard.as_ref() });
} else {
set.entry(&format_args!("{:p}", ptr), &());
}
}
set.finish()
}
}
impl<T, C: cfg::Config> Ptr<T, C> {
#[inline]
fn null() -> Self {
Self(AtomicPtr::new(ptr::null_mut()))
}
#[inline]
fn load(&self, order: Ordering) -> Option<&Shard<T, C>> {
let ptr = self.0.load(order);
test_println!("---> loaded={:p} (order={:?})", ptr, order);
if ptr.is_null() {
test_println!("---> null");
return None;
}
let track = unsafe {
&*ptr
};
Some(track.get_ref())
}
#[inline]
fn set(&self, new: *mut alloc::Track<Shard<T, C>>) {
self.0
.compare_exchange(ptr::null_mut(), new, AcqRel, Acquire)
.expect("a shard can only be inserted by the thread that owns it, this is a bug!");
}
}
impl<'a, T, C> Iterator for IterMut<'a, T, C>
where
T: 'a,
C: cfg::Config + 'a,
{
type Item = &'a Shard<T, C>;
fn next(&mut self) -> Option<Self::Item> {
test_println!("IterMut::next");
loop {
let next = self.0.next();
test_println!("-> next.is_some={}", next.is_some());
if let Some(shard) = next?.load(Acquire) {
test_println!("-> done");
return Some(shard);
}
}
}
}