use std::sync::Arc;
use chrono::{
format::{parse, Parsed, StrftimeItems},
Datelike, Duration, FixedOffset, NaiveDate, NaiveDateTime, NaiveTime,
};
use crate::error::Result;
use crate::{
array::{PrimitiveArray, Utf8Array},
error::Error,
offset::Offset,
};
use crate::{
datatypes::{DataType, TimeUnit},
types::months_days_ns,
};
pub const SECONDS_IN_DAY: i64 = 86_400;
pub const MILLISECONDS: i64 = 1_000;
pub const MICROSECONDS: i64 = 1_000_000;
pub const NANOSECONDS: i64 = 1_000_000_000;
pub const MILLISECONDS_IN_DAY: i64 = SECONDS_IN_DAY * MILLISECONDS;
pub const EPOCH_DAYS_FROM_CE: i32 = 719_163;
#[inline]
pub fn date32_to_datetime(v: i32) -> NaiveDateTime {
date32_to_datetime_opt(v).expect("invalid or out-of-range datetime")
}
#[inline]
pub fn date32_to_datetime_opt(v: i32) -> Option<NaiveDateTime> {
NaiveDateTime::from_timestamp_opt(v as i64 * SECONDS_IN_DAY, 0)
}
#[inline]
pub fn date32_to_date(days: i32) -> NaiveDate {
date32_to_date_opt(days).expect("out-of-range date")
}
#[inline]
pub fn date32_to_date_opt(days: i32) -> Option<NaiveDate> {
NaiveDate::from_num_days_from_ce_opt(EPOCH_DAYS_FROM_CE + days)
}
#[inline]
pub fn date64_to_datetime(v: i64) -> NaiveDateTime {
NaiveDateTime::from_timestamp_opt(
v / MILLISECONDS,
(v % MILLISECONDS * MICROSECONDS) as u32,
)
.expect("invalid or out-of-range datetime")
}
#[inline]
pub fn date64_to_date(milliseconds: i64) -> NaiveDate {
date64_to_datetime(milliseconds).date()
}
#[inline]
pub fn time32s_to_time(v: i32) -> NaiveTime {
NaiveTime::from_num_seconds_from_midnight_opt(v as u32, 0).expect("invalid time")
}
#[inline]
pub fn duration_s_to_duration(v: i64) -> Duration {
Duration::seconds(v)
}
#[inline]
pub fn duration_ms_to_duration(v: i64) -> Duration {
Duration::milliseconds(v)
}
#[inline]
pub fn duration_us_to_duration(v: i64) -> Duration {
Duration::microseconds(v)
}
#[inline]
pub fn duration_ns_to_duration(v: i64) -> Duration {
Duration::nanoseconds(v)
}
#[inline]
pub fn time32ms_to_time(v: i32) -> NaiveTime {
let v = v as i64;
let seconds = v / MILLISECONDS;
let milli_to_nano = 1_000_000;
let nano = (v - seconds * MILLISECONDS) * milli_to_nano;
NaiveTime::from_num_seconds_from_midnight_opt(seconds as u32, nano as u32)
.expect("invalid time")
}
#[inline]
pub fn time64us_to_time(v: i64) -> NaiveTime {
time64us_to_time_opt(v).expect("invalid time")
}
#[inline]
pub fn time64us_to_time_opt(v: i64) -> Option<NaiveTime> {
NaiveTime::from_num_seconds_from_midnight_opt(
(v / MICROSECONDS) as u32,
(v % MICROSECONDS * MILLISECONDS) as u32,
)
}
#[inline]
pub fn time64ns_to_time(v: i64) -> NaiveTime {
time64ns_to_time_opt(v).expect("invalid time")
}
#[inline]
pub fn time64ns_to_time_opt(v: i64) -> Option<NaiveTime> {
NaiveTime::from_num_seconds_from_midnight_opt(
(v / NANOSECONDS) as u32,
(v % NANOSECONDS) as u32,
)
}
#[inline]
pub fn timestamp_s_to_datetime(seconds: i64) -> NaiveDateTime {
timestamp_s_to_datetime_opt(seconds).expect("invalid or out-of-range datetime")
}
#[inline]
pub fn timestamp_s_to_datetime_opt(seconds: i64) -> Option<NaiveDateTime> {
NaiveDateTime::from_timestamp_opt(seconds, 0)
}
#[inline]
pub fn timestamp_ms_to_datetime(v: i64) -> NaiveDateTime {
timestamp_ms_to_datetime_opt(v).expect("invalid or out-of-range datetime")
}
#[inline]
pub fn timestamp_ms_to_datetime_opt(v: i64) -> Option<NaiveDateTime> {
if v >= 0 {
NaiveDateTime::from_timestamp_opt(
v / MILLISECONDS,
(v % MILLISECONDS * MICROSECONDS) as u32,
)
} else {
let secs_rem = (v / MILLISECONDS, v % MILLISECONDS);
if secs_rem.1 == 0 {
NaiveDateTime::from_timestamp_opt(secs_rem.0, 0)
} else {
NaiveDateTime::from_timestamp_opt(
secs_rem.0 - 1,
(NANOSECONDS + (v % MILLISECONDS * MICROSECONDS)) as u32,
)
}
}
}
#[inline]
pub fn timestamp_us_to_datetime(v: i64) -> NaiveDateTime {
timestamp_us_to_datetime_opt(v).expect("invalid or out-of-range datetime")
}
#[inline]
pub fn timestamp_us_to_datetime_opt(v: i64) -> Option<NaiveDateTime> {
if v >= 0 {
NaiveDateTime::from_timestamp_opt(
v / MICROSECONDS,
(v % MICROSECONDS * MILLISECONDS) as u32,
)
} else {
let secs_rem = (v / MICROSECONDS, v % MICROSECONDS);
if secs_rem.1 == 0 {
NaiveDateTime::from_timestamp_opt(secs_rem.0, 0)
} else {
NaiveDateTime::from_timestamp_opt(
secs_rem.0 - 1,
(NANOSECONDS + (v % MICROSECONDS * MILLISECONDS)) as u32,
)
}
}
}
#[inline]
pub fn timestamp_ns_to_datetime(v: i64) -> NaiveDateTime {
timestamp_ns_to_datetime_opt(v).expect("invalid or out-of-range datetime")
}
#[inline]
pub fn timestamp_ns_to_datetime_opt(v: i64) -> Option<NaiveDateTime> {
if v >= 0 {
NaiveDateTime::from_timestamp_opt(
v / NANOSECONDS,
(v % NANOSECONDS) as u32,
)
} else {
let secs_rem = (v / NANOSECONDS, v % NANOSECONDS);
if secs_rem.1 == 0 {
NaiveDateTime::from_timestamp_opt(secs_rem.0, 0)
} else {
NaiveDateTime::from_timestamp_opt(
secs_rem.0 - 1,
(NANOSECONDS + (v % NANOSECONDS)) as u32,
)
}
}
}
#[inline]
pub fn timestamp_to_naive_datetime(timestamp: i64, time_unit: TimeUnit) -> chrono::NaiveDateTime {
match time_unit {
TimeUnit::Second => timestamp_s_to_datetime(timestamp),
TimeUnit::Millisecond => timestamp_ms_to_datetime(timestamp),
TimeUnit::Microsecond => timestamp_us_to_datetime(timestamp),
TimeUnit::Nanosecond => timestamp_ns_to_datetime(timestamp),
}
}
#[inline]
pub fn timestamp_to_datetime<T: chrono::TimeZone>(
timestamp: i64,
time_unit: TimeUnit,
timezone: &T,
) -> chrono::DateTime<T> {
timezone.from_utc_datetime(×tamp_to_naive_datetime(timestamp, time_unit))
}
pub fn timeunit_scale(a: TimeUnit, b: TimeUnit) -> f64 {
match (a, b) {
(TimeUnit::Second, TimeUnit::Second) => 1.0,
(TimeUnit::Second, TimeUnit::Millisecond) => 0.001,
(TimeUnit::Second, TimeUnit::Microsecond) => 0.000_001,
(TimeUnit::Second, TimeUnit::Nanosecond) => 0.000_000_001,
(TimeUnit::Millisecond, TimeUnit::Second) => 1_000.0,
(TimeUnit::Millisecond, TimeUnit::Millisecond) => 1.0,
(TimeUnit::Millisecond, TimeUnit::Microsecond) => 0.001,
(TimeUnit::Millisecond, TimeUnit::Nanosecond) => 0.000_001,
(TimeUnit::Microsecond, TimeUnit::Second) => 1_000_000.0,
(TimeUnit::Microsecond, TimeUnit::Millisecond) => 1_000.0,
(TimeUnit::Microsecond, TimeUnit::Microsecond) => 1.0,
(TimeUnit::Microsecond, TimeUnit::Nanosecond) => 0.001,
(TimeUnit::Nanosecond, TimeUnit::Second) => 1_000_000_000.0,
(TimeUnit::Nanosecond, TimeUnit::Millisecond) => 1_000_000.0,
(TimeUnit::Nanosecond, TimeUnit::Microsecond) => 1_000.0,
(TimeUnit::Nanosecond, TimeUnit::Nanosecond) => 1.0,
}
}
pub fn parse_offset(offset: &str) -> Result<FixedOffset> {
if offset == "UTC" {
return Ok(FixedOffset::east_opt(0).expect("FixedOffset::east out of bounds"));
}
let error = "timezone offset must be of the form [-]00:00";
let mut a = offset.split(':');
let first = a
.next()
.map(Ok)
.unwrap_or_else(|| Err(Error::InvalidArgumentError(error.to_string())))?;
let last = a
.next()
.map(Ok)
.unwrap_or_else(|| Err(Error::InvalidArgumentError(error.to_string())))?;
let hours: i32 = first
.parse()
.map_err(|_| Error::InvalidArgumentError(error.to_string()))?;
let minutes: i32 = last
.parse()
.map_err(|_| Error::InvalidArgumentError(error.to_string()))?;
Ok(FixedOffset::east_opt(hours * 60 * 60 + minutes * 60)
.expect("FixedOffset::east out of bounds"))
}
#[inline]
pub fn utf8_to_timestamp_ns_scalar<T: chrono::TimeZone>(
value: &str,
fmt: &str,
tz: &T,
) -> Option<i64> {
utf8_to_timestamp_scalar(value, fmt, tz, &TimeUnit::Nanosecond)
}
#[inline]
pub fn utf8_to_timestamp_scalar<T: chrono::TimeZone>(
value: &str,
fmt: &str,
tz: &T,
tu: &TimeUnit,
) -> Option<i64> {
let mut parsed = Parsed::new();
let fmt = StrftimeItems::new(fmt);
let r = parse(&mut parsed, value, fmt).ok();
if r.is_some() {
parsed
.to_datetime()
.map(|x| x.naive_utc())
.map(|x| tz.from_utc_datetime(&x))
.map(|x| match tu {
TimeUnit::Second => x.timestamp(),
TimeUnit::Millisecond => x.timestamp_millis(),
TimeUnit::Microsecond => x.timestamp_micros(),
TimeUnit::Nanosecond => x.timestamp_nanos_opt().unwrap(),
})
.ok()
} else {
None
}
}
#[inline]
pub fn utf8_to_naive_timestamp_ns_scalar(value: &str, fmt: &str) -> Option<i64> {
utf8_to_naive_timestamp_scalar(value, fmt, &TimeUnit::Nanosecond)
}
#[inline]
pub fn utf8_to_naive_timestamp_scalar(value: &str, fmt: &str, tu: &TimeUnit) -> Option<i64> {
let fmt = StrftimeItems::new(fmt);
let mut parsed = Parsed::new();
parse(&mut parsed, value, fmt.clone()).ok();
parsed
.to_naive_datetime_with_offset(0)
.map(|x| match tu {
TimeUnit::Second => x.timestamp(),
TimeUnit::Millisecond => x.timestamp_millis(),
TimeUnit::Microsecond => x.timestamp_micros(),
TimeUnit::Nanosecond => x.timestamp_nanos_opt().unwrap(),
})
.ok()
}
fn utf8_to_timestamp_ns_impl<O: Offset, T: chrono::TimeZone>(
array: &Utf8Array<O>,
fmt: &str,
timezone: Arc<String>,
tz: T,
) -> PrimitiveArray<i64> {
let iter = array
.iter()
.map(|x| x.and_then(|x| utf8_to_timestamp_ns_scalar(x, fmt, &tz)));
PrimitiveArray::from_trusted_len_iter(iter)
.to(DataType::Timestamp(TimeUnit::Nanosecond, Some(timezone)))
}
#[cfg(feature = "chrono-tz")]
#[cfg_attr(docsrs, doc(cfg(feature = "chrono-tz")))]
pub fn parse_offset_tz(timezone: &str) -> Result<chrono_tz::Tz> {
timezone.parse::<chrono_tz::Tz>().map_err(|_| {
Error::InvalidArgumentError(format!("timezone \"{timezone}\" cannot be parsed"))
})
}
#[cfg(feature = "chrono-tz")]
#[cfg_attr(docsrs, doc(cfg(feature = "chrono-tz")))]
fn chrono_tz_utf_to_timestamp_ns<O: Offset>(
array: &Utf8Array<O>,
fmt: &str,
timezone: Arc<String>,
) -> Result<PrimitiveArray<i64>> {
let tz = parse_offset_tz(timezone.as_str())?;
Ok(utf8_to_timestamp_ns_impl(array, fmt, timezone, tz))
}
#[cfg(not(feature = "chrono-tz"))]
fn chrono_tz_utf_to_timestamp_ns<O: Offset>(
_: &Utf8Array<O>,
_: &str,
timezone: Arc<String>,
) -> Result<PrimitiveArray<i64>> {
Err(Error::InvalidArgumentError(format!(
"timezone \"{timezone}\" cannot be parsed (feature chrono-tz is not active)",
)))
}
pub fn utf8_to_timestamp_ns<O: Offset>(
array: &Utf8Array<O>,
fmt: &str,
timezone: Arc<String>,
) -> Result<PrimitiveArray<i64>> {
let tz = parse_offset(timezone.as_str());
if let Ok(tz) = tz {
Ok(utf8_to_timestamp_ns_impl(array, fmt, timezone, tz))
} else {
chrono_tz_utf_to_timestamp_ns(array, fmt, timezone)
}
}
pub fn utf8_to_naive_timestamp_ns<O: Offset>(
array: &Utf8Array<O>,
fmt: &str,
) -> PrimitiveArray<i64> {
let iter = array
.iter()
.map(|x| x.and_then(|x| utf8_to_naive_timestamp_ns_scalar(x, fmt)));
PrimitiveArray::from_trusted_len_iter(iter).to(DataType::Timestamp(TimeUnit::Nanosecond, None))
}
fn add_month(year: i32, month: u32, months: i32) -> chrono::NaiveDate {
let new_year = (year * 12 + (month - 1) as i32 + months) / 12;
let new_month = (year * 12 + (month - 1) as i32 + months) % 12 + 1;
chrono::NaiveDate::from_ymd_opt(new_year, new_month as u32, 1)
.expect("invalid or out-of-range date")
}
fn get_days_between_months(year: i32, month: u32, months: i32) -> i64 {
add_month(year, month, months)
.signed_duration_since(
chrono::NaiveDate::from_ymd_opt(year, month, 1).expect("invalid or out-of-range date"),
)
.num_days()
}
#[inline]
pub fn add_naive_interval(timestamp: i64, time_unit: TimeUnit, interval: months_days_ns) -> i64 {
let datetime = match time_unit {
TimeUnit::Second => timestamp_s_to_datetime(timestamp),
TimeUnit::Millisecond => timestamp_ms_to_datetime(timestamp),
TimeUnit::Microsecond => timestamp_us_to_datetime(timestamp),
TimeUnit::Nanosecond => timestamp_ns_to_datetime(timestamp),
};
let delta_days = get_days_between_months(datetime.year(), datetime.month(), interval.months())
+ interval.days() as i64;
let new_datetime_tz = datetime
+ chrono::Duration::nanoseconds(delta_days * 24 * 60 * 60 * 1_000_000_000 + interval.ns());
match time_unit {
TimeUnit::Second => new_datetime_tz.timestamp_millis() / 1000,
TimeUnit::Millisecond => new_datetime_tz.timestamp_millis(),
TimeUnit::Microsecond => new_datetime_tz.timestamp_nanos_opt().unwrap() / 1000,
TimeUnit::Nanosecond => new_datetime_tz.timestamp_nanos_opt().unwrap(),
}
}
#[inline]
pub fn add_interval<T: chrono::TimeZone>(
timestamp: i64,
time_unit: TimeUnit,
interval: months_days_ns,
timezone: &T,
) -> i64 {
let datetime_tz = timestamp_to_datetime(timestamp, time_unit, timezone);
let delta_days =
get_days_between_months(datetime_tz.year(), datetime_tz.month(), interval.months())
+ interval.days() as i64;
let new_datetime_tz = datetime_tz
+ chrono::Duration::nanoseconds(delta_days * 24 * 60 * 60 * 1_000_000_000 + interval.ns());
match time_unit {
TimeUnit::Second => new_datetime_tz.timestamp_millis() / 1000,
TimeUnit::Millisecond => new_datetime_tz.timestamp_millis(),
TimeUnit::Microsecond => new_datetime_tz.timestamp_nanos_opt().unwrap() / 1000,
TimeUnit::Nanosecond => new_datetime_tz.timestamp_nanos_opt().unwrap(),
}
}