Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue with Compression of Response Body #523

Open
kingsmen47 opened this issue Feb 3, 2025 · 0 comments
Open

Issue with Compression of Response Body #523

kingsmen47 opened this issue Feb 3, 2025 · 0 comments
Labels
bug Something isn't working

Comments

@kingsmen47
Copy link

Describe the bug

I am trying to enable compression for the response I am getting from the upstream backend using zstd algorithm level 3 but the issue is that the compression is failing over my response body and I am not able to understand why

Pingora info

Cargo.toml

name = "load_balancer"
version = "0.1.0"
edition = "2021"

[dependencies]
async-trait="0.1"
pingora-core = { version = "0.4", features = ["openssl"] }
pingora-http = { version = "0.4" }
pingora-proxy = { version = "0.4.0" }
pingora-load-balancing = { version = "0.4.0" }
prometheus = "0.12"
clap = "4.5.19"
tokio = { version = "1", features = ["full"] }
log = "0.4"
env_logger = "0.9"
structopt = "0.3.8"
lazy_static = "1.4"
axum = "0.7.7"
serde = "1.0.215"
axum-server = "0.7.1"
rand = "0.8.5"
openssl = "0.10"
rustls-pemfile = "2.1.2"
x509-parser = "0.16.0"
notify = "6.0.0"
regex = "1.10.0"
wasm-bindgen = "0.2.89"
wasmtime = "29.0.0"
anyhow = "1.0.71"
wasmtime-wasi = "29.0.0"
http = "1.0.0"
serde_json = "1.0.120"
bytes = "1.6.0"

Main.rs

#[async_trait]
impl ProxyHttp for MyGateway {
    type CTX = ResponseCompressionCtx;

    fn new_ctx(&self) -> Self::CTX {
        let mut ctx = ResponseCompressionCtx::new(
            3,      // compression level
            false,  // disable header compression
            false   // disable auto decompress
        );
        ctx.adjust_algorithm_level(Algorithm::Gzip, 3); // Ensure Zstd is supported
        ctx
    }

    // Add request filtering before WASM processing
    async fn request_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result<bool> {
        if let Some(wasm) = &self.wasm_module {
            let req = Request {
                path: session.req_header().uri.path().to_string(),
                method: session.req_header().method.to_string(),
                headers: session.req_header().headers.iter()
                    .map(|(name, value)| (
                        name.as_str().to_string(),
                        value.to_str().unwrap_or_default().to_string(),
                    ))
                    .collect(),
                body: session.read_request_body().await
                    .ok()
                    .and_then(|b| b.map(|bytes: Bytes| bytes.to_vec())),
                remote_addr: session.client_addr().map(|addr: &pingora_core::protocols::l4::socket::SocketAddr| addr.to_string()),
            };

            if let Ok(res) = wasm.process_request(&req).await {
                if res.blocked {
                    let status = res.status.unwrap_or(403);
                    session.respond_error(status).await.map_err(|e| {
                        pingora_core::Error::because(ErrorType::InternalError, "Failed to process request", e)
                    })?;
                    return Ok(true);
                }
            }
        }
        Ok(false)
    }


 
    async fn response_filter(
        &self,
        session: &mut Session,
        upstream_response: &mut ResponseHeader,
        ctx: &mut Self::CTX,
    ) -> Result<()> {
        // Add custom headers
        upstream_response.insert_header("X-Proxy-Type", "WASM-Gateway")?;

        // Check if client accepts zstd compression
        if let Some(accept_encoding) = session.req_header().headers.get("accept-encoding") {
            if accept_encoding.to_str().unwrap_or("").contains("gzip") {
                // Set content-encoding header
                upstream_response.insert_header("content-encoding", "gzip")?;
            }
        }

        // Apply response header compression
        ctx.response_header_filter(upstream_response, true);

        Ok(())
    }

    fn response_body_filter(
        &self,
        session: &mut Session,
        body: &mut Option<Bytes>,
        end_of_stream: bool,
        ctx: &mut Self::CTX,
    ) -> Result<Option<std::time::Duration>>
    where
        Self::CTX: Send + Sync,
    {
        if let Some(accept_encoding) = session.req_header().headers.get("accept-encoding") {
            log::info!("Accept-Encoding: {:?}", accept_encoding);

            if accept_encoding.to_str().unwrap_or("").contains("gzip") {
                if let Some(ref body_ref) = body.as_ref() {
                    let original_size = body_ref.len();
                    log::info!("Original size: {} bytes", original_size);
                    log::info!("End of stream: {}", end_of_stream);

                    // Process the body directly without initialization
                    if let Some(compressed) = ctx.response_body_filter(Some(body_ref), end_of_stream) {
                        let compressed_size = compressed.len();
                        let compression_ratio = (1.0 - (compressed_size as f64 / original_size as f64)) * 100.0;

                        log::info!("Compression successful!");
                        log::info!("  Original size: {} bytes", original_size);
                        log::info!("  Compressed size: {} bytes", compressed_size);
                        log::info!("  Compression ratio: {:.2}%", compression_ratio);

                        *body = Some(compressed);
                    } else {
                        log::error!("❌ Compression failed on chunk of size {}", original_size);
                        // Fallback to uncompressed data
                        *body = Some((*body_ref).clone());
                    }
                }
            }
        }

        Ok(None)
    }

Terminal Logs

[2025-02-03T13:25:34Z INFO  pingora_core::server] Bootstrap starting
[2025-02-03T13:25:34Z INFO  pingora_core::server] Bootstrap done
[2025-02-03T13:25:34Z INFO  pingora_core::server] Server starting
[2025-02-03T13:25:37Z INFO  load_balancer] Routing request to internal service: localhost:3001 (service_id: suffix_service, route: /end)
[2025-02-03T13:25:40Z INFO  load_balancer] Accept-Encoding: "gzip, deflate, br, zstd"
[2025-02-03T13:25:40Z INFO  load_balancer] Original size: 3856 bytes
[2025-02-03T13:25:40Z INFO  load_balancer] End of stream: false
[2025-02-03T13:25:40Z ERROR load_balancer] ❌ Compression failed on chunk of size 3856
[2025-02-03T13:25:40Z INFO  load_balancer] Accept-Encoding: "gzip, deflate, br, zstd"
[2025-02-03T13:25:40Z INFO  load_balancer] Original size: 44318 bytes
[2025-02-03T13:25:40Z INFO  load_balancer] End of stream: true
[2025-02-03T13:25:40Z ERROR load_balancer] ❌ Compression failed on chunk of size 44318
[2025-02-03T13:25:40Z INFO  load_balancer] Request completed with code 200

Expected results

I was expecting to enforce compression if not already compressed for the response body.

Observed results

I was not able to compress the response body and I am facing an issue regarding the same

@johnhurt johnhurt added the bug Something isn't working label Feb 7, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants