From 9370046c13c51a74d941705a9ebb3b8a240015f8 Mon Sep 17 00:00:00 2001 From: KyJah Keys Date: Sat, 11 May 2024 12:55:09 -0400 Subject: [PATCH 1/3] fix(rust, python): fixed differences in storage options between log and object stores causing divergent dynamodb behavior --- crates/aws/src/storage.rs | 56 +++++++++++++++++++++++++-------------- 1 file changed, 36 insertions(+), 20 deletions(-) diff --git a/crates/aws/src/storage.rs b/crates/aws/src/storage.rs index e4116e216b..1165798962 100644 --- a/crates/aws/src/storage.rs +++ b/crates/aws/src/storage.rs @@ -1,5 +1,6 @@ //! AWS S3 storage backend. +use aws_config::meta::region::ProvideRegion; use aws_config::provider_config::ProviderConfig; use aws_config::{Region, SdkConfig}; use bytes::Bytes; @@ -13,6 +14,7 @@ use deltalake_core::storage::{ use deltalake_core::{DeltaResult, ObjectStoreError, Path}; use futures::stream::BoxStream; use futures::Future; +use tracing::warn; use std::collections::HashMap; use std::fmt::Debug; use std::ops::Range; @@ -67,9 +69,9 @@ impl ObjectStoreFactory for S3ObjectStoreFactory { fn parse_url_opts( &self, url: &Url, - options: &StorageOptions, + storage_options: &StorageOptions, ) -> DeltaResult<(ObjectStoreRef, Path)> { - let options = self.with_env_s3(options); + let options = self.with_env_s3(storage_options); let (inner, prefix) = parse_url_opts( url, options.0.iter().filter_map(|(key, value)| { @@ -87,7 +89,7 @@ impl ObjectStoreFactory for S3ObjectStoreFactory { { Ok((store, prefix)) } else { - let s3_options = S3StorageOptions::from_map(&options.0)?; + let s3_options = S3StorageOptions::from_map(&storage_options.0)?; let store = S3StorageBackend::try_new( store, @@ -140,7 +142,6 @@ impl S3StorageOptions { .filter(|(k, _)| !s3_constants::S3_OPTS.contains(&k.as_str())) .map(|(k, v)| (k.to_owned(), v.to_owned())) .collect(); - // Copy web identity values provided in options but not the environment into the environment // to get picked up by the `from_k8s_env` call in `get_web_identity_provider`. Self::ensure_env_var(options, s3_constants::AWS_REGION); @@ -175,10 +176,20 @@ impl S3StorageOptions { .unwrap_or(true); let imds_timeout = Self::u64_or_default(options, s3_constants::AWS_EC2_METADATA_TIMEOUT, 100); - - let region_provider = crate::credentials::new_region_provider(disable_imds, imds_timeout); - let region = execute_sdk_future(region_provider.region())?; - let provider_config = ProviderConfig::default().with_region(region); + let (loader, provider_config) = if let Some(endpoint_url) = str_option(options, s3_constants::AWS_ENDPOINT_URL) { + let (region_provider, provider_config) = Self::create_provider_config( + str_option(options, s3_constants::AWS_REGION) + .or_else(|| std::env::var("AWS_DEFAULT_REGION").ok()) + .map_or(Region::from_static("custom"), Region::new))?; + let loader = aws_config::from_env() + .endpoint_url(endpoint_url) + .region(region_provider); + (loader, provider_config) + } else { + let (region_provider, provider_config) = Self::create_provider_config( crate::credentials::new_region_provider(disable_imds, imds_timeout))?; + (aws_config::from_env().region(region_provider), provider_config) + }; + let credentials_provider = crate::credentials::ConfiguredCredentialChain::new( disable_imds, imds_timeout, @@ -186,30 +197,22 @@ impl S3StorageOptions { ); #[cfg(feature = "native-tls")] let sdk_config = execute_sdk_future( - aws_config::from_env() + loader .http_client(native::use_native_tls_client( str_option(options, s3_constants::AWS_ALLOW_HTTP) .map(|val| str_is_truthy(&val)) .unwrap_or(false), )) .credentials_provider(credentials_provider) - .region(region_provider) .load(), )?; #[cfg(feature = "rustls")] let sdk_config = execute_sdk_future( - aws_config::from_env() + loader .credentials_provider(credentials_provider) - .region(region_provider) .load(), )?; - let sdk_config = - if let Some(endpoint_url) = str_option(options, s3_constants::AWS_ENDPOINT_URL) { - sdk_config.to_builder().endpoint_url(endpoint_url).build() - } else { - sdk_config - }; Ok(Self { virtual_hosted_style_request, locking_provider: str_option(options, s3_constants::AWS_S3_LOCKING_PROVIDER), @@ -230,6 +233,11 @@ impl S3StorageOptions { self.sdk_config.region() } + fn create_provider_config(region_provider: T) -> DeltaResult<(T, ProviderConfig)> { + let region = execute_sdk_future(region_provider.region())?; + Ok((region_provider, ProviderConfig::default().with_region(region))) + } + fn u64_or_default(map: &HashMap, key: &str, default: u64) -> u64 { str_option(map, key) .and_then(|v| v.parse().ok()) @@ -494,8 +502,16 @@ pub mod s3_constants { } pub(crate) fn str_option(map: &HashMap, key: &str) -> Option { - map.get(key) - .map_or_else(|| std::env::var(key).ok(), |v| Some(v.to_owned())) + if let Some(s) = map.get(key) { + return Some(s.to_owned()); + } + + if let Some(s) = map.get(&key.to_ascii_lowercase()) { + warn!("********** found lowercase **********: {} : {}", key, s); + return Some(s.to_owned()); + } + + std::env::var(key).ok() } #[cfg(test)] From bb7ae280d2a6845c100dae030febb6f6522bd935 Mon Sep 17 00:00:00 2001 From: KyJah Keys Date: Sat, 11 May 2024 18:12:11 -0400 Subject: [PATCH 2/3] reformatted code --- crates/aws/src/storage.rs | 55 +++++++++++++++++++++++---------------- 1 file changed, 32 insertions(+), 23 deletions(-) diff --git a/crates/aws/src/storage.rs b/crates/aws/src/storage.rs index 1165798962..08159afda0 100644 --- a/crates/aws/src/storage.rs +++ b/crates/aws/src/storage.rs @@ -14,7 +14,6 @@ use deltalake_core::storage::{ use deltalake_core::{DeltaResult, ObjectStoreError, Path}; use futures::stream::BoxStream; use futures::Future; -use tracing::warn; use std::collections::HashMap; use std::fmt::Debug; use std::ops::Range; @@ -22,6 +21,7 @@ use std::str::FromStr; use std::sync::Arc; use std::time::Duration; use tokio::io::AsyncWrite; +use tracing::warn; use url::Url; use crate::errors::DynamoDbConfigError; @@ -176,20 +176,27 @@ impl S3StorageOptions { .unwrap_or(true); let imds_timeout = Self::u64_or_default(options, s3_constants::AWS_EC2_METADATA_TIMEOUT, 100); - let (loader, provider_config) = if let Some(endpoint_url) = str_option(options, s3_constants::AWS_ENDPOINT_URL) { - let (region_provider, provider_config) = Self::create_provider_config( - str_option(options, s3_constants::AWS_REGION) - .or_else(|| std::env::var("AWS_DEFAULT_REGION").ok()) - .map_or(Region::from_static("custom"), Region::new))?; - let loader = aws_config::from_env() - .endpoint_url(endpoint_url) - .region(region_provider); - (loader, provider_config) - } else { - let (region_provider, provider_config) = Self::create_provider_config( crate::credentials::new_region_provider(disable_imds, imds_timeout))?; - (aws_config::from_env().region(region_provider), provider_config) - }; - + let (loader, provider_config) = + if let Some(endpoint_url) = str_option(options, s3_constants::AWS_ENDPOINT_URL) { + let (region_provider, provider_config) = Self::create_provider_config( + str_option(options, s3_constants::AWS_REGION) + .or_else(|| std::env::var("AWS_DEFAULT_REGION").ok()) + .map_or(Region::from_static("custom"), Region::new), + )?; + let loader = aws_config::from_env() + .endpoint_url(endpoint_url) + .region(region_provider); + (loader, provider_config) + } else { + let (region_provider, provider_config) = Self::create_provider_config( + crate::credentials::new_region_provider(disable_imds, imds_timeout), + )?; + ( + aws_config::from_env().region(region_provider), + provider_config, + ) + }; + let credentials_provider = crate::credentials::ConfiguredCredentialChain::new( disable_imds, imds_timeout, @@ -207,11 +214,8 @@ impl S3StorageOptions { .load(), )?; #[cfg(feature = "rustls")] - let sdk_config = execute_sdk_future( - loader - .credentials_provider(credentials_provider) - .load(), - )?; + let sdk_config = + execute_sdk_future(loader.credentials_provider(credentials_provider).load())?; Ok(Self { virtual_hosted_style_request, @@ -233,9 +237,14 @@ impl S3StorageOptions { self.sdk_config.region() } - fn create_provider_config(region_provider: T) -> DeltaResult<(T, ProviderConfig)> { + fn create_provider_config( + region_provider: T, + ) -> DeltaResult<(T, ProviderConfig)> { let region = execute_sdk_future(region_provider.region())?; - Ok((region_provider, ProviderConfig::default().with_region(region))) + Ok(( + region_provider, + ProviderConfig::default().with_region(region), + )) } fn u64_or_default(map: &HashMap, key: &str, default: u64) -> u64 { @@ -510,7 +519,7 @@ pub(crate) fn str_option(map: &HashMap, key: &str) -> Option Date: Sun, 12 May 2024 07:01:24 -0400 Subject: [PATCH 3/3] removed debug statement --- crates/aws/src/storage.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/aws/src/storage.rs b/crates/aws/src/storage.rs index 08159afda0..c12151089f 100644 --- a/crates/aws/src/storage.rs +++ b/crates/aws/src/storage.rs @@ -516,7 +516,6 @@ pub(crate) fn str_option(map: &HashMap, key: &str) -> Option