use std::{iter::FromIterator, sync::Arc};
use crate::array::physical_binary::extend_validity;
use crate::array::TryExtendFromSelf;
use crate::bitmap::Bitmap;
use crate::{
array::{Array, MutableArray, TryExtend, TryPush},
bitmap::MutableBitmap,
datatypes::DataType,
error::Error,
trusted_len::TrustedLen,
types::NativeType,
};
use super::{check, PrimitiveArray};
#[derive(Debug, Clone)]
pub struct MutablePrimitiveArray<T: NativeType> {
data_type: DataType,
values: Vec<T>,
validity: Option<MutableBitmap>,
}
impl<T: NativeType> From<MutablePrimitiveArray<T>> for PrimitiveArray<T> {
fn from(other: MutablePrimitiveArray<T>) -> Self {
let validity = other.validity.and_then(|x| {
let bitmap: Bitmap = x.into();
if bitmap.unset_bits() == 0 {
None
} else {
Some(bitmap)
}
});
PrimitiveArray::<T>::new(other.data_type, other.values.into(), validity)
}
}
impl<T: NativeType, P: AsRef<[Option<T>]>> From<P> for MutablePrimitiveArray<T> {
fn from(slice: P) -> Self {
Self::from_trusted_len_iter(slice.as_ref().iter().map(|x| x.as_ref()))
}
}
impl<T: NativeType> MutablePrimitiveArray<T> {
pub fn new() -> Self {
Self::with_capacity(0)
}
pub fn with_capacity(capacity: usize) -> Self {
Self::with_capacity_from(capacity, T::PRIMITIVE.into())
}
pub fn try_new(
data_type: DataType,
values: Vec<T>,
validity: Option<MutableBitmap>,
) -> Result<Self, Error> {
check(&data_type, &values, validity.as_ref().map(|x| x.len()))?;
Ok(Self {
data_type,
values,
validity,
})
}
pub fn into_inner(self) -> (DataType, Vec<T>, Option<MutableBitmap>) {
(self.data_type, self.values, self.validity)
}
pub fn apply_values<F: Fn(&mut [T])>(&mut self, f: F) {
f(&mut self.values);
}
}
impl<T: NativeType> Default for MutablePrimitiveArray<T> {
fn default() -> Self {
Self::new()
}
}
impl<T: NativeType> From<DataType> for MutablePrimitiveArray<T> {
fn from(data_type: DataType) -> Self {
assert!(data_type.to_physical_type().eq_primitive(T::PRIMITIVE));
Self {
data_type,
values: Vec::<T>::new(),
validity: None,
}
}
}
impl<T: NativeType> MutablePrimitiveArray<T> {
pub fn with_capacity_from(capacity: usize, data_type: DataType) -> Self {
assert!(data_type.to_physical_type().eq_primitive(T::PRIMITIVE));
Self {
data_type,
values: Vec::<T>::with_capacity(capacity),
validity: None,
}
}
pub fn reserve(&mut self, additional: usize) {
self.values.reserve(additional);
if let Some(x) = self.validity.as_mut() {
x.reserve(additional)
}
}
#[inline]
pub fn push(&mut self, value: Option<T>) {
match value {
Some(value) => {
self.values.push(value);
match &mut self.validity {
Some(validity) => validity.push(true),
None => {}
}
}
None => {
self.values.push(T::default());
match &mut self.validity {
Some(validity) => validity.push(false),
None => {
self.init_validity();
}
}
}
}
}
pub fn pop(&mut self) -> Option<T> {
let value = self.values.pop()?;
self.validity
.as_mut()
.map(|x| x.pop()?.then(|| value))
.unwrap_or_else(|| Some(value))
}
#[inline]
pub fn extend_constant(&mut self, additional: usize, value: Option<T>) {
if let Some(value) = value {
self.values.resize(self.values.len() + additional, value);
if let Some(validity) = &mut self.validity {
validity.extend_constant(additional, true)
}
} else {
if let Some(validity) = &mut self.validity {
validity.extend_constant(additional, false)
} else {
let mut validity = MutableBitmap::with_capacity(self.values.capacity());
validity.extend_constant(self.len(), true);
validity.extend_constant(additional, false);
self.validity = Some(validity)
}
self.values
.resize(self.values.len() + additional, T::default());
}
}
#[inline]
pub fn extend_trusted_len<P, I>(&mut self, iterator: I)
where
P: std::borrow::Borrow<T>,
I: TrustedLen<Item = Option<P>>,
{
unsafe { self.extend_trusted_len_unchecked(iterator) }
}
#[inline]
pub unsafe fn extend_trusted_len_unchecked<P, I>(&mut self, iterator: I)
where
P: std::borrow::Borrow<T>,
I: Iterator<Item = Option<P>>,
{
if let Some(validity) = self.validity.as_mut() {
extend_trusted_len_unzip(iterator, validity, &mut self.values)
} else {
let mut validity = MutableBitmap::new();
validity.extend_constant(self.len(), true);
extend_trusted_len_unzip(iterator, &mut validity, &mut self.values);
self.validity = Some(validity);
}
}
#[inline]
pub fn extend_trusted_len_values<I>(&mut self, iterator: I)
where
I: TrustedLen<Item = T>,
{
unsafe { self.extend_trusted_len_values_unchecked(iterator) }
}
#[inline]
pub unsafe fn extend_trusted_len_values_unchecked<I>(&mut self, iterator: I)
where
I: Iterator<Item = T>,
{
self.values.extend(iterator);
self.update_all_valid();
}
#[inline]
pub fn extend_from_slice(&mut self, items: &[T]) {
self.values.extend_from_slice(items);
self.update_all_valid();
}
fn update_all_valid(&mut self) {
let len = self.len();
if let Some(validity) = self.validity.as_mut() {
validity.extend_constant(len - validity.len(), true);
}
}
fn init_validity(&mut self) {
let mut validity = MutableBitmap::with_capacity(self.values.capacity());
validity.extend_constant(self.len(), true);
validity.set(self.len() - 1, false);
self.validity = Some(validity)
}
#[inline]
pub fn to(self, data_type: DataType) -> Self {
Self::try_new(data_type, self.values, self.validity).unwrap()
}
pub fn into_arc(self) -> Arc<dyn Array> {
let a: PrimitiveArray<T> = self.into();
Arc::new(a)
}
pub fn shrink_to_fit(&mut self) {
self.values.shrink_to_fit();
if let Some(validity) = &mut self.validity {
validity.shrink_to_fit()
}
}
pub fn capacity(&self) -> usize {
self.values.capacity()
}
}
impl<T: NativeType> MutablePrimitiveArray<T> {
pub fn values(&self) -> &Vec<T> {
&self.values
}
pub fn values_mut_slice(&mut self) -> &mut [T] {
self.values.as_mut_slice()
}
}
impl<T: NativeType> MutablePrimitiveArray<T> {
pub fn set(&mut self, index: usize, value: Option<T>) {
assert!(index < self.len());
unsafe { self.set_unchecked(index, value) }
}
pub unsafe fn set_unchecked(&mut self, index: usize, value: Option<T>) {
*self.values.get_unchecked_mut(index) = value.unwrap_or_default();
if value.is_none() && self.validity.is_none() {
let mut validity = MutableBitmap::new();
validity.extend_constant(self.len(), true);
self.validity = Some(validity);
}
if let Some(x) = self.validity.as_mut() {
x.set_unchecked(index, value.is_some())
}
}
pub fn set_validity(&mut self, validity: Option<MutableBitmap>) {
if let Some(validity) = &validity {
assert_eq!(self.values.len(), validity.len())
}
self.validity = validity;
}
pub fn set_values(&mut self, values: Vec<T>) {
assert_eq!(values.len(), self.values.len());
self.values = values;
}
}
impl<T: NativeType> Extend<Option<T>> for MutablePrimitiveArray<T> {
fn extend<I: IntoIterator<Item = Option<T>>>(&mut self, iter: I) {
let iter = iter.into_iter();
self.reserve(iter.size_hint().0);
iter.for_each(|x| self.push(x))
}
}
impl<T: NativeType> TryExtend<Option<T>> for MutablePrimitiveArray<T> {
fn try_extend<I: IntoIterator<Item = Option<T>>>(&mut self, iter: I) -> Result<(), Error> {
self.extend(iter);
Ok(())
}
}
impl<T: NativeType> TryPush<Option<T>> for MutablePrimitiveArray<T> {
fn try_push(&mut self, item: Option<T>) -> Result<(), Error> {
self.push(item);
Ok(())
}
}
impl<T: NativeType> MutableArray for MutablePrimitiveArray<T> {
fn len(&self) -> usize {
self.values.len()
}
fn validity(&self) -> Option<&MutableBitmap> {
self.validity.as_ref()
}
fn as_box(&mut self) -> Box<dyn Array> {
PrimitiveArray::new(
self.data_type.clone(),
std::mem::take(&mut self.values).into(),
std::mem::take(&mut self.validity).map(|x| x.into()),
)
.boxed()
}
fn as_arc(&mut self) -> Arc<dyn Array> {
PrimitiveArray::new(
self.data_type.clone(),
std::mem::take(&mut self.values).into(),
std::mem::take(&mut self.validity).map(|x| x.into()),
)
.arced()
}
fn data_type(&self) -> &DataType {
&self.data_type
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
self
}
fn push_null(&mut self) {
self.push(None)
}
fn reserve(&mut self, additional: usize) {
self.reserve(additional)
}
fn shrink_to_fit(&mut self) {
self.shrink_to_fit()
}
}
impl<T: NativeType> MutablePrimitiveArray<T> {
pub fn from_slice<P: AsRef<[T]>>(slice: P) -> Self {
Self::from_trusted_len_values_iter(slice.as_ref().iter().copied())
}
#[inline]
pub unsafe fn from_trusted_len_iter_unchecked<I, P>(iterator: I) -> Self
where
P: std::borrow::Borrow<T>,
I: Iterator<Item = Option<P>>,
{
let (validity, values) = trusted_len_unzip(iterator);
Self {
data_type: T::PRIMITIVE.into(),
values,
validity,
}
}
#[inline]
pub fn from_trusted_len_iter<I, P>(iterator: I) -> Self
where
P: std::borrow::Borrow<T>,
I: TrustedLen<Item = Option<P>>,
{
unsafe { Self::from_trusted_len_iter_unchecked(iterator) }
}
#[inline]
pub unsafe fn try_from_trusted_len_iter_unchecked<E, I, P>(
iter: I,
) -> std::result::Result<Self, E>
where
P: std::borrow::Borrow<T>,
I: IntoIterator<Item = std::result::Result<Option<P>, E>>,
{
let iterator = iter.into_iter();
let (validity, values) = try_trusted_len_unzip(iterator)?;
Ok(Self {
data_type: T::PRIMITIVE.into(),
values,
validity,
})
}
#[inline]
pub fn try_from_trusted_len_iter<E, I, P>(iterator: I) -> std::result::Result<Self, E>
where
P: std::borrow::Borrow<T>,
I: TrustedLen<Item = std::result::Result<Option<P>, E>>,
{
unsafe { Self::try_from_trusted_len_iter_unchecked(iterator) }
}
pub fn from_trusted_len_values_iter<I: TrustedLen<Item = T>>(iter: I) -> Self {
Self {
data_type: T::PRIMITIVE.into(),
values: iter.collect(),
validity: None,
}
}
pub fn from_vec(values: Vec<T>) -> Self {
Self::try_new(T::PRIMITIVE.into(), values, None).unwrap()
}
pub unsafe fn from_trusted_len_values_iter_unchecked<I: Iterator<Item = T>>(iter: I) -> Self {
Self {
data_type: T::PRIMITIVE.into(),
values: iter.collect(),
validity: None,
}
}
}
impl<T: NativeType, Ptr: std::borrow::Borrow<Option<T>>> FromIterator<Ptr>
for MutablePrimitiveArray<T>
{
fn from_iter<I: IntoIterator<Item = Ptr>>(iter: I) -> Self {
let iter = iter.into_iter();
let (lower, _) = iter.size_hint();
let mut validity = MutableBitmap::with_capacity(lower);
let values: Vec<T> = iter
.map(|item| {
if let Some(a) = item.borrow() {
validity.push(true);
*a
} else {
validity.push(false);
T::default()
}
})
.collect();
let validity = Some(validity);
Self {
data_type: T::PRIMITIVE.into(),
values,
validity,
}
}
}
#[inline]
pub(crate) unsafe fn extend_trusted_len_unzip<I, P, T>(
iterator: I,
validity: &mut MutableBitmap,
buffer: &mut Vec<T>,
) where
T: NativeType,
P: std::borrow::Borrow<T>,
I: Iterator<Item = Option<P>>,
{
let (_, upper) = iterator.size_hint();
let additional = upper.expect("trusted_len_unzip requires an upper limit");
validity.reserve(additional);
let values = iterator.map(|item| {
if let Some(item) = item {
validity.push_unchecked(true);
*item.borrow()
} else {
validity.push_unchecked(false);
T::default()
}
});
buffer.extend(values);
}
#[inline]
pub(crate) unsafe fn trusted_len_unzip<I, P, T>(iterator: I) -> (Option<MutableBitmap>, Vec<T>)
where
T: NativeType,
P: std::borrow::Borrow<T>,
I: Iterator<Item = Option<P>>,
{
let mut validity = MutableBitmap::new();
let mut buffer = Vec::<T>::new();
extend_trusted_len_unzip(iterator, &mut validity, &mut buffer);
let validity = Some(validity);
(validity, buffer)
}
#[inline]
pub(crate) unsafe fn try_trusted_len_unzip<E, I, P, T>(
iterator: I,
) -> std::result::Result<(Option<MutableBitmap>, Vec<T>), E>
where
T: NativeType,
P: std::borrow::Borrow<T>,
I: Iterator<Item = std::result::Result<Option<P>, E>>,
{
let (_, upper) = iterator.size_hint();
let len = upper.expect("trusted_len_unzip requires an upper limit");
let mut null = MutableBitmap::with_capacity(len);
let mut buffer = Vec::<T>::with_capacity(len);
let mut dst = buffer.as_mut_ptr();
for item in iterator {
let item = if let Some(item) = item? {
null.push(true);
*item.borrow()
} else {
null.push(false);
T::default()
};
std::ptr::write(dst, item);
dst = dst.add(1);
}
assert_eq!(
dst.offset_from(buffer.as_ptr()) as usize,
len,
"Trusted iterator length was not accurately reported"
);
buffer.set_len(len);
null.set_len(len);
let validity = Some(null);
Ok((validity, buffer))
}
impl<T: NativeType> PartialEq for MutablePrimitiveArray<T> {
fn eq(&self, other: &Self) -> bool {
self.iter().eq(other.iter())
}
}
impl<T: NativeType> TryExtendFromSelf for MutablePrimitiveArray<T> {
fn try_extend_from_self(&mut self, other: &Self) -> Result<(), Error> {
extend_validity(self.len(), &mut self.validity, &other.validity);
let slice = other.values.as_slice();
self.values.extend_from_slice(slice);
Ok(())
}
}