Skip to content

Commit

Permalink
feat: add visitor SidecarVisitor and Sidecar action struct (#673)
Browse files Browse the repository at this point in the history
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-incubator/delta-kernel-rs/blob/main/CONTRIBUTING.md
2. Run `cargo t --all-features --all-targets` to get started testing,
and run `cargo fmt`.
  3. Ensure you have added or run the appropriate tests for your PR.
4. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  5. Be sure to keep the PR description updated to reflect all changes.
-->

<!--
PR title formatting:
This project uses conventional commits:
https://www.conventionalcommits.org/

Each PR corresponds to a commit on the `main` branch, with the title of
the PR (typically) being
used for the commit message on main. In order to ensure proper
formatting in the CHANGELOG please
ensure your PR title adheres to the conventional commit specification.

Examples:
- new feature PR: "feat: new API for snapshot.update()"
- bugfix PR: "fix: correctly apply DV in read-table example"
-->



## What changes are proposed in this pull request?
<!--
Please clarify what changes you are proposing and why the changes are
needed.
The purpose of this section is to outline the changes, why they are
needed, and how this PR fixes the issue.
If the reason for the change is already explained clearly in an issue,
then it does not need to be restated here.
1. If you propose a new API or feature, clarify the use case for a new
API or feature.
  2. If you fix a bug, you can clarify why it is a bug.
-->

This PR introduces the `Sidecar` action and its associated
`SidecarVisitor`.

This action and visitor is used for the V2 checkpoints Reader/Writer
table feature.

Edge cases:
- If a batch of actions includes two sidecar actions referencing the
same file, issue a warning. We will not throw an error; instead, we will
ignore the duplicate file path to prevent unnecessary re-reading of the
file.
- Error if the sidecar `path` field is a full file-path instead of just
the filename. Ref to issue:
#675

resolves #668

<!--
Uncomment this section if there are any changes affecting public APIs:
### This PR affects the following public APIs

If there are breaking changes, please ensure the `breaking-changes`
label gets added by CI, and describe why the changes are needed.

Note that _new_ public APIs are not considered breaking.
-->

Note: PR also simplifies some conditional flags detailed in issue:
#672



## How was this change tested?
<!--
Please make sure to add test cases that check the changes thoroughly
including negative and positive cases if possible.
If it was tested in a way different from regular unit tests, please
clarify how you tested, ideally via a reproducible test documented in
the PR description.
-->


- Ensure schema projection to `Sidecar` works
- Ensure that the visitor correctly extracts `Sidecar` actions
~- Ensure duplicate `paths` in sidecar actions are ignored~
~- Ensure `VisitorError` is returned when full file path is passed in
`path` field~
  • Loading branch information
sebastiantia authored Feb 5, 2025
1 parent 379f5e5 commit 2240154
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 21 deletions.
58 changes: 51 additions & 7 deletions kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ pub(crate) const SET_TRANSACTION_NAME: &str = "txn";
pub(crate) const COMMIT_INFO_NAME: &str = "commitInfo";
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) const CDC_NAME: &str = "cdc";
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) const SIDECAR_NAME: &str = "sidecar";

static LOG_ADD_SCHEMA: LazyLock<SchemaRef> =
LazyLock::new(|| StructType::new([Option::<Add>::get_struct_field(ADD_NAME)]).into());
Expand All @@ -58,6 +60,7 @@ static LOG_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
Option::<SetTransaction>::get_struct_field(SET_TRANSACTION_NAME),
Option::<CommitInfo>::get_struct_field(COMMIT_INFO_NAME),
Option::<Cdc>::get_struct_field(CDC_NAME),
Option::<Sidecar>::get_struct_field(SIDECAR_NAME),
// We don't support the following actions yet
//Option::<DomainMetadata>::get_struct_field(DOMAIN_METADATA_NAME),
])
Expand Down Expand Up @@ -326,9 +329,8 @@ where

#[derive(Debug, Clone, PartialEq, Eq, Schema)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
#[cfg_attr(test, derive(Serialize, Default), serde(rename_all = "camelCase"))]
struct CommitInfo {
pub(crate) struct CommitInfo {
/// The time this logical file was created, as milliseconds since the epoch.
/// Read: optional, write: required (that is, kernel always writes).
pub(crate) timestamp: Option<i64>,
Expand Down Expand Up @@ -417,9 +419,8 @@ impl Add {

#[derive(Debug, Clone, PartialEq, Eq, Schema)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
#[cfg_attr(test, derive(Serialize, Default), serde(rename_all = "camelCase"))]
struct Remove {
pub(crate) struct Remove {
/// A relative path to a data file from the root of the table or an absolute path to a file
/// that should be added to the table. The path is a URI as specified by
/// [RFC 2396 URI Generic Syntax], which needs to be decoded to get the data file path.
Expand Down Expand Up @@ -468,9 +469,8 @@ struct Remove {

#[derive(Debug, Clone, PartialEq, Eq, Schema)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
#[cfg_attr(test, derive(Serialize, Default), serde(rename_all = "camelCase"))]
struct Cdc {
pub(crate) struct Cdc {
/// A relative path to a change data file from the root of the table or an absolute path to a
/// change data file that should be added to the table. The path is a URI as specified by
/// [RFC 2396 URI Generic Syntax], which needs to be decoded to get the file path.
Expand Down Expand Up @@ -511,6 +511,33 @@ pub struct SetTransaction {
pub last_updated: Option<i64>,
}

/// The sidecar action references a sidecar file which provides some of the checkpoint's
/// file actions. This action is only allowed in checkpoints following the V2 spec.
///
/// [More info]: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#sidecar-file-information
#[allow(unused)] //TODO: Remove once we implement V2 checkpoint file processing
#[derive(Schema, Debug, PartialEq)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) struct Sidecar {
/// A path to a sidecar file that can be either:
/// - A relative path (just the file name) within the `_delta_log/_sidecars` directory.
/// - An absolute path
/// The path is a URI as specified by [RFC 2396 URI Generic Syntax], which needs to be decoded
/// to get the file path.
///
/// [RFC 2396 URI Generic Syntax]: https://www.ietf.org/rfc/rfc2396.txt
pub path: String,

/// The size of the sidecar file in bytes.
pub size_in_bytes: i64,

/// The time this logical file was created, as milliseconds since the epoch.
pub modification_time: i64,

/// A map containing any additional metadata about the logicial file.
pub tags: Option<HashMap<String, String>>,
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand Down Expand Up @@ -637,7 +664,7 @@ mod tests {
fn test_cdc_schema() {
let schema = get_log_schema()
.project(&[CDC_NAME])
.expect("Couldn't get remove field");
.expect("Couldn't get cdc field");
let expected = Arc::new(StructType::new([StructField::nullable(
"cdc",
StructType::new([
Expand All @@ -654,6 +681,23 @@ mod tests {
assert_eq!(schema, expected);
}

#[test]
fn test_sidecar_schema() {
let schema = get_log_schema()
.project(&[SIDECAR_NAME])
.expect("Couldn't get sidecar field");
let expected = Arc::new(StructType::new([StructField::nullable(
"sidecar",
StructType::new([
StructField::not_null("path", DataType::STRING),
StructField::not_null("sizeInBytes", DataType::LONG),
StructField::not_null("modificationTime", DataType::LONG),
tags_field(),
]),
)]));
assert_eq!(schema, expected);
}

#[test]
fn test_transaction_schema() {
let schema = get_log_schema()
Expand Down
92 changes: 78 additions & 14 deletions kernel/src/actions/visitors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@ use crate::{DeltaResult, Error};
use super::deletion_vector::DeletionVectorDescriptor;
use super::schemas::ToSchema as _;
use super::{
Add, Cdc, Format, Metadata, Protocol, Remove, SetTransaction, ADD_NAME, CDC_NAME,
METADATA_NAME, PROTOCOL_NAME, REMOVE_NAME, SET_TRANSACTION_NAME,
Add, Cdc, Format, Metadata, Protocol, Remove, SetTransaction, Sidecar, ADD_NAME, CDC_NAME,
METADATA_NAME, PROTOCOL_NAME, REMOVE_NAME, SET_TRANSACTION_NAME, SIDECAR_NAME,
};

#[derive(Default)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
struct MetadataVisitor {
pub(crate) struct MetadataVisitor {
pub(crate) metadata: Option<Metadata>,
}

Expand Down Expand Up @@ -114,8 +113,7 @@ impl RowVisitor for SelectionVectorVisitor {

#[derive(Default)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
struct ProtocolVisitor {
pub(crate) struct ProtocolVisitor {
pub(crate) protocol: Option<Protocol>,
}

Expand Down Expand Up @@ -318,15 +316,13 @@ impl RowVisitor for RemoveVisitor {
#[allow(unused)]
#[derive(Default)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
struct CdcVisitor {
pub(crate) struct CdcVisitor {
pub(crate) cdcs: Vec<Cdc>,
}

impl CdcVisitor {
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
fn visit_cdc<'a>(
pub(crate) fn visit_cdc<'a>(
row_index: usize,
path: String,
getters: &[&'a dyn GetData<'a>],
Expand Down Expand Up @@ -377,7 +373,6 @@ pub type SetTransactionMap = HashMap<String, SetTransaction>;
///
#[derive(Default, Debug)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
pub(crate) struct SetTransactionVisitor {
pub(crate) set_transactions: SetTransactionMap,
pub(crate) application_id: Option<String>,
Expand All @@ -393,8 +388,7 @@ impl SetTransactionVisitor {
}

#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
fn visit_txn<'a>(
pub(crate) fn visit_txn<'a>(
row_index: usize,
app_id: String,
getters: &[&'a dyn GetData<'a>],
Expand Down Expand Up @@ -444,6 +438,52 @@ impl RowVisitor for SetTransactionVisitor {
}
}

#[allow(unused)] //TODO: Remove once we implement V2 checkpoint file processing
#[derive(Default)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) struct SidecarVisitor {
pub(crate) sidecars: Vec<Sidecar>,
}

impl SidecarVisitor {
fn visit_sidecar<'a>(
row_index: usize,
path: String,
getters: &[&'a dyn GetData<'a>],
) -> DeltaResult<Sidecar> {
Ok(Sidecar {
path,
size_in_bytes: getters[1].get(row_index, "sidecar.sizeInBytes")?,
modification_time: getters[2].get(row_index, "sidecar.modificationTime")?,
tags: getters[3].get_opt(row_index, "sidecar.tags")?,
})
}
}

impl RowVisitor for SidecarVisitor {
fn selected_column_names_and_types(&self) -> (&'static [ColumnName], &'static [DataType]) {
static NAMES_AND_TYPES: LazyLock<ColumnNamesAndTypes> =
LazyLock::new(|| Sidecar::to_schema().leaves(SIDECAR_NAME));
NAMES_AND_TYPES.as_ref()
}
fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> {
require!(
getters.len() == 4,
Error::InternalError(format!(
"Wrong number of SidecarVisitor getters: {}",
getters.len()
))
);
for i in 0..row_count {
// Since path column is required, use it to detect presence of a sidecar action
if let Some(path) = getters[0].get_opt(i, "sidecar.path")? {
self.sidecars.push(Self::visit_sidecar(i, path, getters)?);
}
}
Ok(())
}
}

/// Get a DV out of some engine data. The caller is responsible for slicing the `getters` slice such
/// that the first element contains the `storageType` element of the deletion vector.
pub(crate) fn visit_deletion_vector_at<'a>(
Expand Down Expand Up @@ -501,7 +541,8 @@ mod tests {
r#"{"commitInfo":{"timestamp":1677811178585,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"10","numOutputBytes":"635"},"engineInfo":"Databricks-Runtime/<unknown>","txnId":"a6a94671-55ef-450e-9546-b8465b9147de"}}"#,
r#"{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["deletionVectors"],"writerFeatures":["deletionVectors"]}}"#,
r#"{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableDeletionVectors":"true","delta.columnMapping.mode":"none", "delta.enableChangeDataFeed":"true"},"createdTime":1677811175819}}"#,
r#"{"cdc":{"path":"_change_data/age=21/cdc-00000-93f7fceb-281a-446a-b221-07b88132d203.c000.snappy.parquet","partitionValues":{"age":"21"},"size":1033,"dataChange":false}}"#
r#"{"cdc":{"path":"_change_data/age=21/cdc-00000-93f7fceb-281a-446a-b221-07b88132d203.c000.snappy.parquet","partitionValues":{"age":"21"},"size":1033,"dataChange":false}}"#,
r#"{"sidecar":{"path":"016ae953-37a9-438e-8683-9a9a4a79a395.parquet","sizeInBytes":9268,"modificationTime":1714496113961,"tags":{"tag_foo":"tag_bar"}}}"#,
]
.into();
let output_schema = get_log_schema().clone();
Expand Down Expand Up @@ -544,6 +585,29 @@ mod tests {
Ok(())
}

#[test]
fn test_parse_sidecar() -> DeltaResult<()> {
let data = action_batch();

let mut visitor = SidecarVisitor::default();
visitor.visit_rows_of(data.as_ref())?;

let sidecar1 = Sidecar {
path: "016ae953-37a9-438e-8683-9a9a4a79a395.parquet".into(),
size_in_bytes: 9268,
modification_time: 1714496113961,
tags: Some(HashMap::from([(
"tag_foo".to_string(),
"tag_bar".to_string(),
)])),
};

assert_eq!(visitor.sidecars.len(), 1);
assert_eq!(visitor.sidecars[0], sidecar1);

Ok(())
}

#[test]
fn test_parse_metadata() -> DeltaResult<()> {
let data = action_batch();
Expand Down

0 comments on commit 2240154

Please sign in to comment.