use std::sync::Arc;
use crate::bitmap::utils::count_zeros;
use crate::buffer::BytesAllocator;
use crate::{
array::*,
bitmap::{utils::bytes_for, Bitmap},
buffer::{Buffer, Bytes},
datatypes::{DataType, PhysicalType},
error::{Error, Result},
ffi::schema::get_child,
types::NativeType,
};
use super::ArrowArray;
pub unsafe fn try_from<A: ArrowArrayRef>(array: A) -> Result<Box<dyn Array>> {
use PhysicalType::*;
Ok(match array.data_type().to_physical_type() {
Null => Box::new(NullArray::try_from_ffi(array)?),
Boolean => Box::new(BooleanArray::try_from_ffi(array)?),
Primitive(primitive) => with_match_primitive_type!(primitive, |$T| {
Box::new(PrimitiveArray::<$T>::try_from_ffi(array)?)
}),
Utf8 => Box::new(Utf8Array::<i32>::try_from_ffi(array)?),
LargeUtf8 => Box::new(Utf8Array::<i64>::try_from_ffi(array)?),
Binary => Box::new(BinaryArray::<i32>::try_from_ffi(array)?),
LargeBinary => Box::new(BinaryArray::<i64>::try_from_ffi(array)?),
FixedSizeBinary => Box::new(FixedSizeBinaryArray::try_from_ffi(array)?),
List => Box::new(ListArray::<i32>::try_from_ffi(array)?),
LargeList => Box::new(ListArray::<i64>::try_from_ffi(array)?),
FixedSizeList => Box::new(FixedSizeListArray::try_from_ffi(array)?),
Struct => Box::new(StructArray::try_from_ffi(array)?),
Dictionary(key_type) => {
match_integer_type!(key_type, |$T| {
Box::new(DictionaryArray::<$T>::try_from_ffi(array)?)
})
}
Union => Box::new(UnionArray::try_from_ffi(array)?),
Map => Box::new(MapArray::try_from_ffi(array)?),
})
}
unsafe impl Send for ArrowArray {}
unsafe impl Sync for ArrowArray {}
impl Drop for ArrowArray {
fn drop(&mut self) {
match self.release {
None => (),
Some(release) => unsafe { release(self) },
};
}
}
unsafe extern "C" fn c_release_array(array: *mut ArrowArray) {
if array.is_null() {
return;
}
let array = &mut *array;
let private = Box::from_raw(array.private_data as *mut PrivateData);
for child in private.children_ptr.iter() {
let _ = Box::from_raw(*child);
}
if let Some(ptr) = private.dictionary_ptr {
let _ = Box::from_raw(ptr);
}
array.release = None;
}
#[allow(dead_code)]
struct PrivateData {
array: Box<dyn Array>,
buffers_ptr: Box<[*const std::os::raw::c_void]>,
children_ptr: Box<[*mut ArrowArray]>,
dictionary_ptr: Option<*mut ArrowArray>,
}
impl ArrowArray {
pub(crate) fn new(array: Box<dyn Array>) -> Self {
let (offset, buffers, children, dictionary) =
offset_buffers_children_dictionary(array.as_ref());
let buffers_ptr = buffers
.iter()
.map(|maybe_buffer| match maybe_buffer {
Some(b) => *b as *const std::os::raw::c_void,
None => std::ptr::null(),
})
.collect::<Box<[_]>>();
let n_buffers = buffers.len() as i64;
let children_ptr = children
.into_iter()
.map(|child| Box::into_raw(Box::new(ArrowArray::new(child))))
.collect::<Box<_>>();
let n_children = children_ptr.len() as i64;
let dictionary_ptr =
dictionary.map(|array| Box::into_raw(Box::new(ArrowArray::new(array))));
let length = array.len() as i64;
let null_count = array.null_count() as i64;
let mut private_data = Box::new(PrivateData {
array,
buffers_ptr,
children_ptr,
dictionary_ptr,
});
Self {
length,
null_count,
offset: offset as i64,
n_buffers,
n_children,
buffers: private_data.buffers_ptr.as_mut_ptr(),
children: private_data.children_ptr.as_mut_ptr(),
dictionary: private_data.dictionary_ptr.unwrap_or(std::ptr::null_mut()),
release: Some(c_release_array),
private_data: Box::into_raw(private_data) as *mut ::std::os::raw::c_void,
}
}
pub fn empty() -> Self {
Self {
length: 0,
null_count: 0,
offset: 0,
n_buffers: 0,
n_children: 0,
buffers: std::ptr::null_mut(),
children: std::ptr::null_mut(),
dictionary: std::ptr::null_mut(),
release: None,
private_data: std::ptr::null_mut(),
}
}
pub(crate) fn len(&self) -> usize {
self.length as usize
}
pub(crate) fn offset(&self) -> usize {
self.offset as usize
}
pub(crate) fn null_count(&self) -> usize {
self.null_count as usize
}
}
unsafe fn get_buffer_ptr<T: NativeType>(
array: &ArrowArray,
data_type: &DataType,
index: usize,
) -> Result<*mut T> {
if array.buffers.is_null() {
return Err(Error::oos(format!(
"An ArrowArray of type {data_type:?} must have non-null buffers"
)));
}
if array
.buffers
.align_offset(std::mem::align_of::<*mut *const u8>())
!= 0
{
return Err(Error::oos(format!(
"An ArrowArray of type {data_type:?}
must have buffer {index} aligned to type {}",
std::any::type_name::<*mut *const u8>()
)));
}
let buffers = array.buffers as *mut *const u8;
if index >= array.n_buffers as usize {
return Err(Error::oos(format!(
"An ArrowArray of type {data_type:?}
must have buffer {index}."
)));
}
let ptr = *buffers.add(index);
if ptr.is_null() {
return Err(Error::oos(format!(
"An array of type {data_type:?}
must have a non-null buffer {index}"
)));
}
Ok(ptr as *mut T)
}
unsafe fn create_buffer<T: NativeType>(
array: &ArrowArray,
data_type: &DataType,
owner: InternalArrowArray,
index: usize,
) -> Result<Buffer<T>> {
let len = buffer_len(array, data_type, index)?;
if len == 0 {
return Ok(Buffer::new());
}
let offset = buffer_offset(array, data_type, index);
let ptr: *mut T = get_buffer_ptr(array, data_type, index)?;
if ptr.align_offset(std::mem::align_of::<T>()) == 0 {
let bytes = Bytes::from_foreign(ptr, len, BytesAllocator::InternalArrowArray(owner));
Ok(Buffer::from_bytes(bytes).sliced(offset, len - offset))
}
else {
let buf = std::slice::from_raw_parts(ptr, len - offset).to_vec();
Ok(Buffer::from(buf))
}
}
unsafe fn create_bitmap(
array: &ArrowArray,
data_type: &DataType,
owner: InternalArrowArray,
index: usize,
is_validity: bool,
) -> Result<Bitmap> {
let len: usize = array.length.try_into().expect("length to fit in `usize`");
if len == 0 {
return Ok(Bitmap::new());
}
let ptr = get_buffer_ptr(array, data_type, index)?;
let offset: usize = array.offset.try_into().expect("offset to fit in `usize`");
let bytes_len = bytes_for(offset + len);
let bytes = Bytes::from_foreign(ptr, bytes_len, BytesAllocator::InternalArrowArray(owner));
let null_count: usize = if is_validity {
array.null_count()
} else {
count_zeros(bytes.as_ref(), offset, len)
};
Bitmap::from_inner(Arc::new(bytes), offset, len, null_count)
}
fn buffer_offset(array: &ArrowArray, data_type: &DataType, i: usize) -> usize {
use PhysicalType::*;
match (data_type.to_physical_type(), i) {
(LargeUtf8, 2) | (LargeBinary, 2) | (Utf8, 2) | (Binary, 2) => 0,
(FixedSizeBinary, 1) => {
if let DataType::FixedSizeBinary(size) = data_type.to_logical_type() {
let offset: usize = array.offset.try_into().expect("Offset to fit in `usize`");
offset * *size
} else {
unreachable!()
}
}
_ => array.offset.try_into().expect("Offset to fit in `usize`"),
}
}
unsafe fn buffer_len(array: &ArrowArray, data_type: &DataType, i: usize) -> Result<usize> {
Ok(match (data_type.to_physical_type(), i) {
(PhysicalType::FixedSizeBinary, 1) => {
if let DataType::FixedSizeBinary(size) = data_type.to_logical_type() {
*size * (array.offset as usize + array.length as usize)
} else {
unreachable!()
}
}
(PhysicalType::FixedSizeList, 1) => {
if let DataType::FixedSizeList(_, size) = data_type.to_logical_type() {
*size * (array.offset as usize + array.length as usize)
} else {
unreachable!()
}
}
(PhysicalType::Utf8, 1)
| (PhysicalType::LargeUtf8, 1)
| (PhysicalType::Binary, 1)
| (PhysicalType::LargeBinary, 1)
| (PhysicalType::List, 1)
| (PhysicalType::LargeList, 1)
| (PhysicalType::Map, 1) => {
array.offset as usize + array.length as usize + 1
}
(PhysicalType::Utf8, 2) | (PhysicalType::Binary, 2) => {
let len = buffer_len(array, data_type, 1)?;
let offset_buffer = unsafe { *(array.buffers as *mut *const u8).add(1) };
let offset_buffer = offset_buffer as *const i32;
(unsafe { *offset_buffer.add(len - 1) }) as usize
}
(PhysicalType::LargeUtf8, 2) | (PhysicalType::LargeBinary, 2) => {
let len = buffer_len(array, data_type, 1)?;
let offset_buffer = unsafe { *(array.buffers as *mut *const u8).add(1) };
let offset_buffer = offset_buffer as *const i64;
(unsafe { *offset_buffer.add(len - 1) }) as usize
}
_ => array.offset as usize + array.length as usize,
})
}
unsafe fn create_child(
array: &ArrowArray,
data_type: &DataType,
parent: InternalArrowArray,
index: usize,
) -> Result<ArrowArrayChild<'static>> {
let data_type = get_child(data_type, index)?;
if array.children.is_null() {
return Err(Error::oos(format!(
"An ArrowArray of type {data_type:?} must have non-null children"
)));
}
if index >= array.n_children as usize {
return Err(Error::oos(format!(
"An ArrowArray of type {data_type:?}
must have child {index}."
)));
}
let arr_ptr = unsafe { *array.children.add(index) };
if arr_ptr.is_null() {
return Err(Error::oos(format!(
"An array of type {data_type:?}
must have a non-null child {index}"
)));
}
let arr_ptr = unsafe { &*arr_ptr };
Ok(ArrowArrayChild::new(arr_ptr, data_type, parent))
}
unsafe fn create_dictionary(
array: &ArrowArray,
data_type: &DataType,
parent: InternalArrowArray,
) -> Result<Option<ArrowArrayChild<'static>>> {
if let DataType::Dictionary(_, values, _) = data_type {
let data_type = values.as_ref().clone();
if array.dictionary.is_null() {
return Err(Error::oos(format!(
"An array of type {data_type:?}
must have a non-null dictionary"
)));
}
let array = unsafe { &*array.dictionary };
Ok(Some(ArrowArrayChild::new(array, data_type, parent)))
} else {
Ok(None)
}
}
pub trait ArrowArrayRef: std::fmt::Debug {
fn owner(&self) -> InternalArrowArray {
(*self.parent()).clone()
}
unsafe fn validity(&self) -> Result<Option<Bitmap>> {
if self.array().null_count() == 0 {
Ok(None)
} else {
create_bitmap(self.array(), self.data_type(), self.owner(), 0, true).map(Some)
}
}
unsafe fn buffer<T: NativeType>(&self, index: usize) -> Result<Buffer<T>> {
create_buffer::<T>(self.array(), self.data_type(), self.owner(), index)
}
unsafe fn bitmap(&self, index: usize) -> Result<Bitmap> {
create_bitmap(self.array(), self.data_type(), self.owner(), index, false)
}
unsafe fn child(&self, index: usize) -> Result<ArrowArrayChild> {
create_child(self.array(), self.data_type(), self.parent().clone(), index)
}
unsafe fn dictionary(&self) -> Result<Option<ArrowArrayChild>> {
create_dictionary(self.array(), self.data_type(), self.parent().clone())
}
fn n_buffers(&self) -> usize;
fn parent(&self) -> &InternalArrowArray;
fn array(&self) -> &ArrowArray;
fn data_type(&self) -> &DataType;
}
#[derive(Debug, Clone)]
pub struct InternalArrowArray {
array: Arc<ArrowArray>,
data_type: Arc<DataType>,
}
impl InternalArrowArray {
pub fn new(array: ArrowArray, data_type: DataType) -> Self {
Self {
array: Arc::new(array),
data_type: Arc::new(data_type),
}
}
}
impl ArrowArrayRef for InternalArrowArray {
fn data_type(&self) -> &DataType {
&self.data_type
}
fn parent(&self) -> &InternalArrowArray {
self
}
fn array(&self) -> &ArrowArray {
self.array.as_ref()
}
fn n_buffers(&self) -> usize {
self.array.n_buffers as usize
}
}
#[derive(Debug)]
pub struct ArrowArrayChild<'a> {
array: &'a ArrowArray,
data_type: DataType,
parent: InternalArrowArray,
}
impl<'a> ArrowArrayRef for ArrowArrayChild<'a> {
fn data_type(&self) -> &DataType {
&self.data_type
}
fn parent(&self) -> &InternalArrowArray {
&self.parent
}
fn array(&self) -> &ArrowArray {
self.array
}
fn n_buffers(&self) -> usize {
self.array.n_buffers as usize
}
}
impl<'a> ArrowArrayChild<'a> {
fn new(array: &'a ArrowArray, data_type: DataType, parent: InternalArrowArray) -> Self {
Self {
array,
data_type,
parent,
}
}
}