Skip to content

Commit

Permalink
Capability header (#55)
Browse files Browse the repository at this point in the history
* add scaffold capability header

* add documentation

* move case-insensitivity of capability header to parse

---------

Co-authored-by: Tim Dikland <[email protected]>
  • Loading branch information
tdikland and TimDikland-DB authored May 7, 2024
1 parent 373726c commit d2dc4fb
Show file tree
Hide file tree
Showing 5 changed files with 185 additions and 0 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ delta_kernel = { git = "https://github.com/roeap/delta-kernel-rs", rev = "9c8f7f
"developer-visibility",
"default-engine",
] }
http = { version = "1.1.0" }
serde = { version = "1.0.152", features = ["derive"] }
thiserror = "1"
tower = { version = "0.4.13", features = ["limit", "filter", "util"] }
Expand Down
2 changes: 2 additions & 0 deletions delta-sharing/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ delta_kernel.workspace = true
serde.workspace = true
thiserror.workspace = true
url.workspace = true
http.workspace = true
tracing.workspace = true

# server dependencies (in alphabetical order)
object_store = { version = "0.9" }
Expand Down
179 changes: 179 additions & 0 deletions delta-sharing/core/src/capabilities.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
//! Capabilities of the client.
//!
//! The capabilities are communicated between the client and the server using the `delta-sharing-capabilities` header.
use std::str::FromStr;

use http::header::HeaderMap;

use crate::Error;

const DELTA_SHARING_CAPABILITIES: &str = "delta-sharing-capabilities";

/// The format of the response that the client can accept.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ResponseFormat {
/// API response in Parquet format.
Parquet,
/// Api response in Delta format.
Delta,
}

impl FromStr for ResponseFormat {
type Err = Error;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"parquet" => Ok(Self::Parquet),
"delta" => Ok(Self::Delta),
_ => Err(Error::Generic(format!("Unknown response format: {}", s))),
}
}
}

/// Capabilities of the client.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Capabilities {
response_formats: Vec<ResponseFormat>,
reader_features: Vec<String>,
}

impl Capabilities {
/// Create a new [`Capabilities`] instance.
///
/// # Example
/// ```
/// use delta_sharing_core::capabilities::{Capabilities, ResponseFormat};
///
/// let capabilities = Capabilities::new(
/// vec![ResponseFormat::Delta],
/// vec!["deletionVectors".to_string()],
/// );
/// assert_eq!(capabilities.response_formats(), &[ResponseFormat::Delta]);
/// ```
pub fn new(response_formats: Vec<ResponseFormat>, reader_features: Vec<String>) -> Self {
Self {
response_formats,
reader_features: reader_features
.into_iter()
.map(|s| s.to_lowercase())
.collect(),
}
}

/// Returns the response formats that the client can accept.
///
/// # Example
/// ```
/// use delta_sharing_core::capabilities::{Capabilities, ResponseFormat};
///
/// let capabilities = Capabilities::new(
/// vec![ResponseFormat::Delta],
/// vec!["deletionVectors".to_string()],
/// );
/// assert_eq!(capabilities.response_formats(), &[ResponseFormat::Delta]);
/// ```
pub fn response_formats(&self) -> &[ResponseFormat] {
&self.response_formats
}

/// Returns the reader features that the client can accept.
///
/// # Example
/// ```
/// use delta_sharing_core::capabilities::{Capabilities, ResponseFormat};
///
/// let capabilities = Capabilities::new(
/// vec![ResponseFormat::Delta],
/// vec!["deletionVectors".to_string()],
/// );
/// assert_eq!(capabilities.reader_features(), &["deletionvectors"]);
pub fn reader_features(&self) -> &[String] {
self.reader_features.as_slice()
}
}

impl Default for Capabilities {
fn default() -> Self {
Self {
response_formats: vec![ResponseFormat::Parquet],
reader_features: vec![],
}
}
}

impl TryFrom<&HeaderMap> for Capabilities {
type Error = Error;

fn try_from(headers: &HeaderMap) -> Result<Self, Self::Error> {
let mut capabilities = Capabilities::default();
if let Some(header) = headers.get(DELTA_SHARING_CAPABILITIES) {
let capability_header = header.to_str().map_err(|e| {
Error::Generic(format!("Failed to parse capabilities header: {}", e))
})?;
for capability in capability_header.split(';') {
let (capability_key, capability_value) = capability.split_once('=').ok_or(
Error::Generic(format!("Failed to parse capability: {}", capability)),
)?;
match capability_key {
"responseformat" => {
capabilities.response_formats = capability_value
.split(',')
.flat_map(|s| s.trim().parse().ok())
.collect();
}
"readerfeatures" => {
capabilities.reader_features = capability_value
.split(',')
.map(|s| s.trim().to_lowercase())
.collect();
}
_ => {
tracing::warn!(
capability = capability_key,
"encountered unrecognized capability"
);
}
}
}
}

Ok(capabilities)
}
}

#[cfg(test)]
mod test {
use super::*;

#[test]
fn test_default_capabilities() {
let capabilities = Capabilities::default();
assert_eq!(
capabilities.response_formats(),
vec![ResponseFormat::Parquet]
);
assert_eq!(capabilities.reader_features(), Vec::<String>::new());
}

#[test]
fn test_capabilities_from_headers() {
let mut headers = HeaderMap::new();
headers.insert(
DELTA_SHARING_CAPABILITIES,
"responseformat=delta,Parquet;readerfeatures=feature1,feature2"
.parse()
.unwrap(),
);

let capabilities = Capabilities::try_from(&headers).unwrap();
assert_eq!(
capabilities.response_formats(),
vec![ResponseFormat::Delta, ResponseFormat::Parquet]
);
assert_eq!(
capabilities.reader_features(),
vec!["feature1".to_string(), "feature2".to_string()]
);
}
}
1 change: 1 addition & 0 deletions delta-sharing/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub mod types {

include!("gen/delta_sharing.v1.rs");
}
pub mod capabilities;
pub mod error;
#[cfg(feature = "memory")]
mod in_memory;
Expand Down

0 comments on commit d2dc4fb

Please sign in to comment.