use std::ffi::{CStr, CString};
use std::ops::DerefMut;
use crate::{array::Array, datatypes::Field, error::Error};
use super::{export_array_to_c, export_field_to_c, import_array_from_c, import_field_from_c};
use super::{ArrowArray, ArrowArrayStream, ArrowSchema};
impl Drop for ArrowArrayStream {
fn drop(&mut self) {
match self.release {
None => (),
Some(release) => unsafe { release(self) },
};
}
}
impl ArrowArrayStream {
pub fn empty() -> Self {
Self {
get_schema: None,
get_next: None,
get_last_error: None,
release: None,
private_data: std::ptr::null_mut(),
}
}
}
unsafe fn handle_error(iter: &mut ArrowArrayStream) -> Error {
let error = unsafe { (iter.get_last_error.unwrap())(&mut *iter) };
if error.is_null() {
return Error::External(
"C stream".to_string(),
Box::new(Error::ExternalFormat("an unspecified error".to_string())),
);
}
let error = unsafe { CStr::from_ptr(error) };
Error::External(
"C stream".to_string(),
Box::new(Error::ExternalFormat(error.to_str().unwrap().to_string())),
)
}
pub struct ArrowArrayStreamReader<Iter: DerefMut<Target = ArrowArrayStream>> {
iter: Iter,
field: Field,
}
impl<Iter: DerefMut<Target = ArrowArrayStream>> ArrowArrayStreamReader<Iter> {
pub unsafe fn try_new(mut iter: Iter) -> Result<Self, Error> {
if iter.release.is_none() {
return Err(Error::InvalidArgumentError(
"The C stream was already released".to_string(),
));
};
if iter.get_next.is_none() {
return Err(Error::OutOfSpec(
"The C stream MUST contain a non-null get_next".to_string(),
));
};
if iter.get_last_error.is_none() {
return Err(Error::OutOfSpec(
"The C stream MUST contain a non-null get_last_error".to_string(),
));
};
let mut field = ArrowSchema::empty();
let status = if let Some(f) = iter.get_schema {
unsafe { (f)(&mut *iter, &mut field) }
} else {
return Err(Error::OutOfSpec(
"The C stream MUST contain a non-null get_schema".to_string(),
));
};
if status != 0 {
return Err(unsafe { handle_error(&mut iter) });
}
let field = unsafe { import_field_from_c(&field)? };
Ok(Self { iter, field })
}
pub fn field(&self) -> &Field {
&self.field
}
pub unsafe fn next(&mut self) -> Option<Result<Box<dyn Array>, Error>> {
let mut array = ArrowArray::empty();
let status = unsafe { (self.iter.get_next.unwrap())(&mut *self.iter, &mut array) };
if status != 0 {
return Some(Err(unsafe { handle_error(&mut self.iter) }));
}
array.release?;
unsafe { import_array_from_c(array, self.field.data_type.clone()) }
.map(Some)
.transpose()
}
}
struct PrivateData {
iter: Box<dyn Iterator<Item = Result<Box<dyn Array>, Error>>>,
field: Field,
error: Option<CString>,
}
unsafe extern "C" fn get_next(iter: *mut ArrowArrayStream, array: *mut ArrowArray) -> i32 {
if iter.is_null() {
return 2001;
}
let private = &mut *((*iter).private_data as *mut PrivateData);
match private.iter.next() {
Some(Ok(item)) => {
let item_dt = item.data_type();
let expected_dt = private.field.data_type();
if item_dt != expected_dt {
private.error = Some(CString::new(format!("The iterator produced an item of data type {item_dt:?} but the producer expects data type {expected_dt:?}").as_bytes().to_vec()).unwrap());
return 2001; }
std::ptr::write(array, export_array_to_c(item));
private.error = None;
0
}
Some(Err(err)) => {
private.error = Some(CString::new(err.to_string().as_bytes().to_vec()).unwrap());
2001 }
None => {
let a = ArrowArray::empty();
std::ptr::write_unaligned(array, a);
private.error = None;
0
}
}
}
unsafe extern "C" fn get_schema(iter: *mut ArrowArrayStream, schema: *mut ArrowSchema) -> i32 {
if iter.is_null() {
return 2001;
}
let private = &mut *((*iter).private_data as *mut PrivateData);
std::ptr::write(schema, export_field_to_c(&private.field));
0
}
unsafe extern "C" fn get_last_error(iter: *mut ArrowArrayStream) -> *const ::std::os::raw::c_char {
if iter.is_null() {
return std::ptr::null();
}
let private = &mut *((*iter).private_data as *mut PrivateData);
private
.error
.as_ref()
.map(|x| x.as_ptr())
.unwrap_or(std::ptr::null())
}
unsafe extern "C" fn release(iter: *mut ArrowArrayStream) {
if iter.is_null() {
return;
}
let _ = Box::from_raw((*iter).private_data as *mut PrivateData);
(*iter).release = None;
}
pub fn export_iterator(
iter: Box<dyn Iterator<Item = Result<Box<dyn Array>, Error>>>,
field: Field,
) -> ArrowArrayStream {
let private_data = Box::new(PrivateData {
iter,
field,
error: None,
});
ArrowArrayStream {
get_schema: Some(get_schema),
get_next: Some(get_next),
get_last_error: Some(get_last_error),
release: Some(release),
private_data: Box::into_raw(private_data) as *mut ::std::os::raw::c_void,
}
}