Skip to content

Commit

Permalink
feat: scaffold hybrid grpc/rest server architecture (#63)
Browse files Browse the repository at this point in the history
* feat: generate basic grpc service

* feat: baseline grpc discovery servicer

* feat: move rest routes to common

* refactor: reaname types module to models

* chore: dependencies

* perf: simplify default recipient

* chore: cleanup rest server

* feat: add grpc command
  • Loading branch information
roeap authored Nov 23, 2024
1 parent ee4e12e commit 1d9f7b6
Show file tree
Hide file tree
Showing 41 changed files with 2,685 additions and 1,899 deletions.
1,349 changes: 818 additions & 531 deletions Cargo.lock

Large diffs are not rendered by default.

28 changes: 15 additions & 13 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,29 @@
members = ["delta-sharing/*"]

[workspace.dependencies]
async-trait = "0.1.64"
async-trait = "0.1.83"
chrono = { version = "0.4.38", features = ["serde"] }
clap = { version = "4.5.4", features = ["derive"] }
delta_kernel = { version = "0.1", features = [
clap = { version = "4.5.21", features = ["derive"] }
delta_kernel = { version = "0.2", features = [
"tokio",
"developer-visibility",
"default-engine",
] }
futures = { version = "0.3.28" }
futures = { version = "0.3.31" }
http = { version = "1.1.0" }
object_store = { version = "0.9" }
reqwest = { version = "0.12", default-features = false, features = [
hyper = { version = "1.5.1" }
object_store = { version = "0.10" }
reqwest = { version = "0.12.9", default-features = false, features = [
"rustls-tls-native-roots",
"http2",
] }
serde = { version = "1.0.152", features = ["derive"] }
serde_json = "1.0.92"
serde = { version = "1.0.215", features = ["derive"] }
serde_json = "1.0.133"
thiserror = "1"
tower = { version = "0.4.13", features = ["limit", "filter", "util"] }
tracing = { version = "0.1.37", features = ["log"] }
url = { version = "2.5.0", features = ["serde"] }
tonic = { version = "0.12.3" }
tower = { version = "0.5.1", features = ["limit", "filter", "util"] }
tracing = { version = "0.1.40", features = ["log"] }
url = { version = "2.5.4", features = ["serde"] }

[package]
name = "delta-sharing-legacy"
Expand All @@ -36,6 +38,7 @@ async-trait = { workspace = true }
chrono = { workspace = true }
clap = { workspace = true }
futures = { workspace = true }
hyper = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tower = { workspace = true }
Expand All @@ -49,7 +52,6 @@ axum = "0.7.5"
axum-extra = { version = "0.9.2", features = ["json-lines"] }
deltalake = { version = "0.18", features = ["s3", "azure", "gcs"] }
futures-util = "0.3.28"
hyper = { version = "0.14.13" }
tokio = { version = "1.25.0", features = ["full", "rt-multi-thread"] }
config = "0.14.0"
colored = "2.0.0"
Expand All @@ -59,7 +61,7 @@ glob = "0.3.1"
git-version = "0.3.5"
jsonwebtoken = "9.3"
md5 = "0.7.0"
object_store = { version = "0.9", features = ["aws", "azure", "gcp"] }
object_store = { version = "0.10", features = ["aws", "azure", "gcp"] }
once_cell = "1.17.1"
rand = "0.8.5"
rusoto_core = "0.48.0"
Expand Down
9 changes: 8 additions & 1 deletion Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ generate:

# run the delta-sharing server with the dev config
do-it:
@RUST_BACKTRACE=1 cargo run -p delta-sharing-server -- --config {{ local_config }}
@RUST_BACKTRACE=1 cargo run -p delta-sharing server --config {{ local_config }}

# the the documentation (requires mdbook)
doc:
Expand All @@ -79,3 +79,10 @@ profile:
--validity 90 \
--shares asdf \
--secret secret

# run the delta-sharing server with the dev config
rest:
@RUST_LOG=DEBUG cargo run -p delta-sharing rest --config {{ local_config }}

grpc:
@RUST_LOG=DEBUG cargo run -p delta-sharing grpc --config {{ local_config }}
30 changes: 26 additions & 4 deletions buf.gen.yaml
Original file line number Diff line number Diff line change
@@ -1,11 +1,33 @@
version: v1
version: v2
plugins:
- plugin: buf.build/community/neoeinstein-prost:v0.3.1
- remote: buf.build/community/neoeinstein-prost:v0.3.1
out: delta-sharing/common/src/gen
opt:
- bytes=.
- compile_well_known_types
- extern_path=.google.protobuf=::pbjson_types
- file_descriptor_set
- plugin: buf.build/community/neoeinstein-prost-serde:v0.3.0
- file_descriptor_set=false
- remote: buf.build/community/neoeinstein-prost-serde:v0.3.0
out: delta-sharing/common/src/gen
opt:
# useful to build reusable extractors for axum
- ignore_unknown_fields=true
- remote: buf.build/community/neoeinstein-tonic:v0.4.0
out: delta-sharing/common/src/gen
opt:
- no_client=true
- extern_path=.google.protobuf=::pbjson_types
- remote: buf.build/community/neoeinstein-prost:v0.3.1
out: delta-sharing/client/src/gen
opt:
- bytes=.
- compile_well_known_types
- extern_path=.google.protobuf=::pbjson_types
- file_descriptor_set=false
- remote: buf.build/community/neoeinstein-prost-serde:v0.3.0
out: delta-sharing/client/src/gen
- remote: buf.build/community/neoeinstein-tonic:v0.4.0
out: delta-sharing/client/src/gen
opt:
- no_server=true
- extern_path=.google.protobuf=::pbjson_types
32 changes: 16 additions & 16 deletions delta-sharing/client/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
use delta_sharing_common::types as t;
use delta_sharing_common::models::v1::*;
use reqwest::{header, Client, Method};

use crate::client::retry::RetryExt;
use crate::{ClientOptions, CredentialProvider, Error, Result, RetryConfig};

#[async_trait::async_trait]
pub trait ServiceClient: Send + Sync + 'static {
async fn list_shares(&self, request: t::ListSharesRequest) -> Result<t::ListSharesResponse>;
async fn get_share(&self, request: t::GetShareRequest) -> Result<t::GetShareResponse>;
async fn list_schemas(&self, request: t::ListSchemasRequest) -> Result<t::ListSchemasResponse>;
async fn list_shares(&self, request: ListSharesRequest) -> Result<ListSharesResponse>;
async fn get_share(&self, request: GetShareRequest) -> Result<GetShareResponse>;
async fn list_schemas(&self, request: ListSchemasRequest) -> Result<ListSchemasResponse>;
async fn list_schema_tables(
&self,
request: t::ListSchemaTablesRequest,
) -> Result<t::ListSchemaTablesResponse>;
request: ListSchemaTablesRequest,
) -> Result<ListSchemaTablesResponse>;
async fn list_share_tables(
&self,
request: t::ListShareTablesRequest,
) -> Result<t::ListShareTablesResponse>;
request: ListShareTablesRequest,
) -> Result<ListShareTablesResponse>;
}

pub struct RestServiceClient {
Expand Down Expand Up @@ -44,7 +44,7 @@ impl RestServiceClient {

#[async_trait::async_trait]
impl ServiceClient for RestServiceClient {
async fn list_shares(&self, request: t::ListSharesRequest) -> Result<t::ListSharesResponse> {
async fn list_shares(&self, request: ListSharesRequest) -> Result<ListSharesResponse> {
let url = self.endpoint.join("shares")?;
let cred = self.credential_provider.get_credential().await?;

Expand All @@ -71,7 +71,7 @@ impl ServiceClient for RestServiceClient {
})
}

async fn get_share(&self, request: t::GetShareRequest) -> Result<t::GetShareResponse> {
async fn get_share(&self, request: GetShareRequest) -> Result<GetShareResponse> {
let url = self.endpoint.join(&format!("shares/{}", request.share))?;
let cred = self.credential_provider.get_credential().await?;

Expand All @@ -95,7 +95,7 @@ impl ServiceClient for RestServiceClient {
})
}

async fn list_schemas(&self, request: t::ListSchemasRequest) -> Result<t::ListSchemasResponse> {
async fn list_schemas(&self, request: ListSchemasRequest) -> Result<ListSchemasResponse> {
let url = self
.endpoint
.join(&format!("shares/{}/schemas", request.share))?;
Expand Down Expand Up @@ -126,8 +126,8 @@ impl ServiceClient for RestServiceClient {

async fn list_schema_tables(
&self,
request: t::ListSchemaTablesRequest,
) -> Result<t::ListSchemaTablesResponse> {
request: ListSchemaTablesRequest,
) -> Result<ListSchemaTablesResponse> {
let url = self.endpoint.join(&format!(
"shares/{}/schemas/{}/tables",
request.share, request.schema
Expand Down Expand Up @@ -159,8 +159,8 @@ impl ServiceClient for RestServiceClient {

async fn list_share_tables(
&self,
request: t::ListShareTablesRequest,
) -> Result<t::ListShareTablesResponse> {
request: ListShareTablesRequest,
) -> Result<ListShareTablesResponse> {
let url = self
.endpoint
.join(&format!("shares/{}/all-tables", request.share))?;
Expand Down Expand Up @@ -192,7 +192,7 @@ impl ServiceClient for RestServiceClient {

fn add_pagination_query(
mut req: reqwest::RequestBuilder,
pagination: Option<t::Pagination>,
pagination: Option<Pagination>,
) -> reqwest::RequestBuilder {
if let Some(pagination) = pagination {
if let Some(max_results) = pagination.max_results {
Expand Down
14 changes: 7 additions & 7 deletions delta-sharing/client/src/sharing.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::Arc;

use delta_sharing_common::{types as t, ListSharesRequest, Pagination, Share};
use delta_sharing_common::{models::v1::*, ListSharesRequest, Pagination, Share};
use futures::{Stream, TryStreamExt};

use crate::client::pagination::stream_paginated;
Expand Down Expand Up @@ -43,11 +43,11 @@ impl DeltaSharingClient {
&self,
share: impl Into<String>,
max_results: Option<i32>,
) -> impl Stream<Item = Result<t::Schema>> + '_ {
) -> impl Stream<Item = Result<Schema>> + '_ {
stream_paginated(
(share.into(), max_results),
move |(share, max_results), page_token| async move {
let req = t::ListSchemasRequest {
let req = ListSchemasRequest {
share: share.clone(),
pagination: Some(Pagination {
max_results,
Expand All @@ -71,11 +71,11 @@ impl DeltaSharingClient {
share: impl Into<String>,
schema: impl Into<String>,
max_results: Option<i32>,
) -> impl Stream<Item = Result<t::Table>> + '_ {
) -> impl Stream<Item = Result<Table>> + '_ {
stream_paginated(
(share.into(), schema.into(), max_results),
move |(share, schema, max_results), page_token| async move {
let req = t::ListSchemaTablesRequest {
let req = ListSchemaTablesRequest {
share: share.clone(),
schema: schema.clone(),
pagination: Some(Pagination {
Expand All @@ -99,11 +99,11 @@ impl DeltaSharingClient {
&self,
share: impl Into<String>,
max_results: Option<i32>,
) -> impl Stream<Item = Result<t::Table>> + '_ {
) -> impl Stream<Item = Result<Table>> + '_ {
stream_paginated(
(share.into(), max_results),
move |(share, max_results), page_token| async move {
let req = t::ListShareTablesRequest {
let req = ListShareTablesRequest {
share: share.clone(),
pagination: Some(Pagination {
max_results,
Expand Down
19 changes: 19 additions & 0 deletions delta-sharing/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,31 @@ authors = ["Robert Pack <[email protected]>"]
async-trait = { workspace = true }
chrono = { workspace = true }
delta_kernel = { workspace = true }
futures = { workspace = true }
http = { workspace = true }
hyper = { workspace = true }
object_store = { workspace = true }
serde = { workspace = true }
thiserror = { workspace = true }
tonic = { workspace = true }
tower = { workspace = true, features = ["make"] }
tracing = { workspace = true }
url = { workspace = true }

# server dependencies (in alphabetical order)
bytes = { version = "1.1" }
futures-util = "0.3.28"
http-body = { version = "1" }
http-body-util = "0.1.2"
hyper-util = { version = "0.1", features = ["full"] }
pbjson = { version = "0.7" }
pin-project = "1.1"
prost = { version = "0.13" }
serde_json = { version = "1.0" }
serde_yml = { version = "0.0.12" }
tokio = { version = "1", features = ["rt-multi-thread", "parking_lot"] }
tower-http = { version = "0.6", features = ["trace"] }
tracing-subscriber = { version = "0.3", features = ["tracing-log", "fmt"] }

# in-memory handler dependencies (in alphabetical order)
dashmap = { version = "6", optional = true }
Expand All @@ -37,6 +51,11 @@ axum = { version = "0.7.5", optional = true }

[dev-dependencies]
tokio = { version = "1", features = ["full"] }
tower = "*"
http = "*"
http-body-util = "*"
serde_json = "*"


[features]
default = ["memory", "profiles", "axum"]
Expand Down
45 changes: 44 additions & 1 deletion delta-sharing/common/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#[cfg(feature = "axum")]
use axum::extract::rejection::{PathRejection, QueryRejection};
use jsonwebtoken::errors::{Error as JwtError, ErrorKind as JwtErrorKind};
use tonic::Status;

// A convenience type for declaring Results in the Delta Sharing libraries.
pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand All @@ -25,6 +26,9 @@ pub enum Error {
#[error("Generic error: {0}")]
Generic(String),

#[error("Failed to extract recipient from request")]
MissingRecipient,

#[cfg(feature = "axum")]
#[error("Axum path: {0}")]
AxumPath(#[from] PathRejection),
Expand All @@ -34,6 +38,38 @@ pub enum Error {
AxumQuery(#[from] QueryRejection),
}

impl Error {
pub fn generic(msg: impl Into<String>) -> Self {
Self::Generic(msg.into())
}
}

impl From<Error> for Status {
fn from(e: Error) -> Self {
match e {
Error::NotFound => Status::not_found("The requested resource does not exist."),
Error::NotAllowed => {
Status::permission_denied("The request is forbidden from being fulfilled.")
}
Error::Unauthenticated => Status::unauthenticated(
"The request is unauthenticated. The bearer token is missing or incorrect.",
),
Error::Kernel(error) => Status::internal(error.to_string()),
Error::InvalidTableLocation(location) => {
Status::internal(format!("Invalid table location: {}", location))
}
Error::MissingRecipient => {
Status::invalid_argument("Failed to extract recipient from request")
}
Error::Generic(message) => Status::internal(message),
#[cfg(feature = "axum")]
Error::AxumPath(rejection) => Status::internal(format!("Axum path: {}", rejection)),
#[cfg(feature = "axum")]
Error::AxumQuery(rejection) => Status::internal(format!("Axum query: {}", rejection)),
}
}
}

impl From<JwtError> for Error {
fn from(e: JwtError) -> Self {
match e.kind() {
Expand All @@ -56,7 +92,7 @@ mod server {
use tracing::error;

use super::Error;
use crate::types::ErrorResponse;
use crate::models::ErrorResponse;

const INTERNAL_ERROR: (StatusCode, &'static str) = (
StatusCode::INTERNAL_SERVER_ERROR,
Expand Down Expand Up @@ -92,6 +128,13 @@ mod server {
error!("Generic error: {}", message);
INTERNAL_ERROR
}
Error::MissingRecipient => {
error!("Failed to extract recipient from request");
(
StatusCode::BAD_REQUEST,
"Failed to extract recipient from request",
)
}
// TODO(roeap): what codes should these have?
#[cfg(feature = "axum")]
Error::AxumPath(rejection) => {
Expand Down
Loading

0 comments on commit 1d9f7b6

Please sign in to comment.