Skip to content

Commit

Permalink
Merge pull request #1946 from levydsa/replica-detection
Browse files Browse the repository at this point in the history
Use offline database database when sync API is available
  • Loading branch information
penberg authored Feb 11, 2025
2 parents c6e4e09 + 0518768 commit 7aad3e6
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 1 deletion.
11 changes: 11 additions & 0 deletions libsql/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,17 @@ cfg_replication_or_sync! {
self.frames_synced
}
}

}

cfg_sync! {
#[derive(Default)]
pub enum SyncProtocol {
#[default]
Auto,
V1,
V2,
}
}

enum DbType {
Expand Down
59 changes: 58 additions & 1 deletion libsql/src/database/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ impl Builder<()> {
http_request_callback: None,
namespace: None,
skip_safety_assert: false,
#[cfg(feature = "sync")]
sync_protocol: Default::default(),
},
}
}
Expand Down Expand Up @@ -222,6 +224,8 @@ cfg_replication! {
http_request_callback: Option<crate::util::HttpRequestCallback>,
namespace: Option<String>,
skip_safety_assert: bool,
#[cfg(feature = "sync")]
sync_protocol: super::SyncProtocol,
}

/// Local replica configuration type in [`Builder`].
Expand Down Expand Up @@ -274,6 +278,15 @@ cfg_replication! {
self
}

/// Set the duration at which the replicator will automatically call `sync` in the
/// background. The sync will continue for the duration that the resulted `Database`
/// type is alive for, once it is dropped the background task will get dropped and stop.
#[cfg(feature = "sync")]
pub fn sync_protocol(mut self, protocol: super::SyncProtocol) -> Builder<RemoteReplica> {
self.inner.sync_protocol = protocol;
self
}

pub fn http_request_callback<F>(mut self, f: F) -> Builder<RemoteReplica>
where
F: Fn(&mut http::Request<()>) + Send + Sync + 'static
Expand Down Expand Up @@ -326,7 +339,9 @@ cfg_replication! {
sync_interval,
http_request_callback,
namespace,
skip_safety_assert
skip_safety_assert,
#[cfg(feature = "sync")]
sync_protocol,
} = self.inner;

let connector = if let Some(connector) = connector {
Expand All @@ -342,6 +357,48 @@ cfg_replication! {
crate::util::ConnectorService::new(svc)
};

#[cfg(feature = "sync")]
{
use super::SyncProtocol;
match sync_protocol {
p @ (SyncProtocol::Auto | SyncProtocol::V2) => {
let client = hyper::client::Client::builder()
.build::<_, hyper::Body>(connector.clone());

let req = http::Request::get(format!("{url}/sync/0/0/0"))
.header("Authorization", format!("Bearer {}", auth_token))
.body(hyper::Body::empty())
.unwrap();

let res = client
.request(req)
.await
.map_err(|err| crate::Error::Sync(err.into()))?;

if matches!(p, SyncProtocol::V2) {
if !res.status().is_success() {
let status = res.status();
let body_bytes = hyper::body::to_bytes(res.into_body())
.await
.map_err(|err| crate::Error::Sync(err.into()))?;
let error_message = String::from_utf8_lossy(&body_bytes);
return Err(crate::Error::Sync(format!("HTTP error {}: {}", status, error_message).into()));
}
}

if res.status().is_success() {
return Builder::new_synced_database(path, url, auth_token)
.remote_writes(true)
.read_your_writes(read_your_writes)
.build()
.await;
}

}
SyncProtocol::V1 => {}
}
}

let path = path.to_str().ok_or(crate::Error::InvalidUTF8Path)?.to_owned();

let db = if !skip_safety_assert {
Expand Down
1 change: 1 addition & 0 deletions libsql/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ pub mod params;

cfg_sync! {
mod sync;
pub use database::SyncProtocol;
}

cfg_replication! {
Expand Down

0 comments on commit 7aad3e6

Please sign in to comment.