Skip to content

Commit

Permalink
Revert "Reduce stack usage by boxing File in Dist, CachePolicy
Browse files Browse the repository at this point in the history
…and large futures" (#1003)

Reverts #947
  • Loading branch information
konstin authored Jan 19, 2024
1 parent 6748cf8 commit 692ab5b
Show file tree
Hide file tree
Showing 9 changed files with 25 additions and 65 deletions.
21 changes: 4 additions & 17 deletions crates/distribution-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ pub enum SourceDist {
#[derive(Debug, Clone)]
pub struct RegistryBuiltDist {
pub filename: WheelFilename,
pub file: Box<File>,
pub file: File,
pub index: IndexUrl,
}

Expand All @@ -172,7 +172,7 @@ pub struct PathBuiltDist {
#[derive(Debug, Clone)]
pub struct RegistrySourceDist {
pub filename: SourceDistFilename,
pub file: Box<File>,
pub file: File,
pub index: IndexUrl,
}

Expand Down Expand Up @@ -208,14 +208,14 @@ impl Dist {
DistFilename::WheelFilename(filename) => {
Self::Built(BuiltDist::Registry(RegistryBuiltDist {
filename,
file: Box::new(file),
file,
index,
}))
}
DistFilename::SourceDistFilename(filename) => {
Self::Source(SourceDist::Registry(RegistrySourceDist {
filename,
file: Box::new(file),
file,
index,
}))
}
Expand Down Expand Up @@ -865,16 +865,3 @@ impl Identifier for Dist {
}
}
}

#[cfg(test)]
mod test {
use crate::{BuiltDist, Dist, SourceDist};

/// Ensure that we don't accidentally grow the `Dist` sizes.
#[test]
fn dist_size() {
assert!(std::mem::size_of::<Dist>() <= 240);
assert!(std::mem::size_of::<BuiltDist>() <= 240);
assert!(std::mem::size_of::<SourceDist>() <= 168);
}
}
16 changes: 7 additions & 9 deletions crates/puffin-client/src/cached_client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use futures::FutureExt;
use std::future::Future;
use std::time::SystemTime;

Expand Down Expand Up @@ -43,15 +42,14 @@ enum CachedResponse<Payload: Serialize> {
/// There was no prior cached response or the cache was outdated
///
/// The cache policy is `None` if it isn't storable
ModifiedOrNew(Response, Option<Box<CachePolicy>>),
ModifiedOrNew(Response, Option<CachePolicy>),
}

/// Serialize the actual payload together with its caching information
#[derive(Debug, Deserialize, Serialize)]
pub struct DataWithCachePolicy<Payload: Serialize> {
pub data: Payload,
// The cache policy is large (448 bytes at time of writing), reduce the stack size
cache_policy: Box<CachePolicy>,
cache_policy: CachePolicy,
}

/// Custom caching layer over [`reqwest::Client`] using `http-cache-semantics`.
Expand Down Expand Up @@ -90,7 +88,7 @@ impl CachedClient {
/// client.
#[instrument(skip_all)]
pub async fn get_cached_with_callback<
Payload: Serialize + DeserializeOwned + Send,
Payload: Serialize + DeserializeOwned,
CallBackError,
Callback,
CallbackReturn,
Expand Down Expand Up @@ -130,7 +128,7 @@ impl CachedClient {
None
};

let cached_response = self.send_cached(req, cached).boxed().await?;
let cached_response = self.send_cached(req, cached).await?;

let write_cache = info_span!("write_cache", file = %cache_entry.path().display());
match cached_response {
Expand Down Expand Up @@ -233,14 +231,14 @@ impl CachedClient {
debug!("Found not-modified response for: {url}");
CachedResponse::NotModified(DataWithCachePolicy {
data: cached.data,
cache_policy: Box::new(new_policy),
cache_policy: new_policy,
})
}
AfterResponse::Modified(new_policy, _parts) => {
debug!("Found modified response for: {url}");
CachedResponse::ModifiedOrNew(
res,
new_policy.is_storable().then(|| Box::new(new_policy)),
new_policy.is_storable().then_some(new_policy),
)
}
}
Expand Down Expand Up @@ -273,7 +271,7 @@ impl CachedClient {
CachePolicy::new(&converted_req.into_parts().0, &converted_res.into_parts().0);
Ok(CachedResponse::ModifiedOrNew(
res,
cache_policy.is_storable().then(|| Box::new(cache_policy)),
cache_policy.is_storable().then_some(cache_policy),
))
}
}
7 changes: 3 additions & 4 deletions crates/puffin-client/src/flat_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
use std::path::PathBuf;

use futures::{FutureExt, StreamExt};
use futures::StreamExt;
use reqwest::Response;
use rustc_hash::FxHashMap;
use tracing::{debug, info_span, instrument, warn, Instrument};
Expand Down Expand Up @@ -120,7 +120,6 @@ impl<'a> FlatIndexClient<'a> {
.collect();
Ok(files)
}
.boxed()
.instrument(info_span!("parse_flat_index_html", url = % url))
};
let files = cached_client
Expand Down Expand Up @@ -219,7 +218,7 @@ impl FlatIndex {

let dist = Dist::Built(BuiltDist::Registry(RegistryBuiltDist {
filename,
file: Box::new(file),
file,
index,
}));
match distributions.0.entry(version) {
Expand All @@ -236,7 +235,7 @@ impl FlatIndex {
DistFilename::SourceDistFilename(filename) => {
let dist = Dist::Source(SourceDist::Registry(RegistrySourceDist {
filename: filename.clone(),
file: Box::new(file),
file,
index,
}));
match distributions.0.entry(filename.version.clone()) {
Expand Down
4 changes: 1 addition & 3 deletions crates/puffin-client/src/registry_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::str::FromStr;

use async_http_range_reader::{AsyncHttpRangeReader, AsyncHttpRangeReaderError};
use async_zip::tokio::read::seek::ZipFileReader;
use futures::{FutureExt, TryStreamExt};
use futures::TryStreamExt;
use reqwest::{Client, ClientBuilder, Response, StatusCode};
use reqwest_retry::policies::ExponentialBackoff;
use reqwest_retry::RetryTransientMiddleware;
Expand Down Expand Up @@ -206,7 +206,6 @@ impl RegistryClient {
}
}
}
.boxed()
.instrument(info_span!("parse_simple_api", package = %package_name))
};
let result = self
Expand Down Expand Up @@ -336,7 +335,6 @@ impl RegistryClient {
})?;
Ok(metadata)
}
.boxed()
.instrument(info_span!("read_metadata_range_request", wheel = %filename))
};

Expand Down
8 changes: 2 additions & 6 deletions crates/puffin-distribution/src/distribution_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::str::FromStr;
use std::sync::Arc;

use fs_err::tokio as fs;
use futures::FutureExt;
use thiserror::Error;
use tokio::task::JoinError;
use tokio_util::compat::FuturesAsyncReadCompatExt;
Expand Down Expand Up @@ -220,7 +219,7 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
let lock = self.locks.acquire(&dist).await;
let _guard = lock.lock().await;

let built_wheel = self.builder.download_and_build(source_dist).boxed().await?;
let built_wheel = self.builder.download_and_build(source_dist).await?;
Ok(LocalWheel::Built(BuiltWheel {
dist: dist.clone(),
path: built_wheel.path,
Expand All @@ -243,9 +242,7 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
dist: &Dist,
) -> Result<(Metadata21, Option<Url>), DistributionDatabaseError> {
match dist {
Dist::Built(built_dist) => {
Ok((self.client.wheel_metadata(built_dist).boxed().await?, None))
}
Dist::Built(built_dist) => Ok((self.client.wheel_metadata(built_dist).await?, None)),
Dist::Source(source_dist) => {
// Optimization: Skip source dist download when we must not build them anyway.
if self.build_context.no_build() {
Expand All @@ -266,7 +263,6 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
let metadata = self
.builder
.download_and_build_metadata(&source_dist)
.boxed()
.await?;
Ok((metadata, precise))
}
Expand Down
22 changes: 4 additions & 18 deletions crates/puffin-distribution/src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::sync::Arc;

use anyhow::Result;
use fs_err::tokio as fs;
use futures::{FutureExt, TryStreamExt};
use futures::TryStreamExt;
use reqwest::Response;
use tempfile::TempDir;
use tokio_util::compat::FuturesAsyncReadCompatExt;
Expand Down Expand Up @@ -96,7 +96,6 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
&cache_shard,
subdirectory.as_deref(),
)
.boxed()
.await?
}
SourceDist::Registry(registry_source_dist) => {
Expand Down Expand Up @@ -135,7 +134,6 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
&cache_shard,
None,
)
.boxed()
.await?
}
SourceDist::Git(git_source_dist) => self.git(source_dist, git_source_dist).await?,
Expand Down Expand Up @@ -173,7 +171,6 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
&cache_shard,
subdirectory.as_deref(),
)
.boxed()
.await?
}
SourceDist::Registry(registry_source_dist) => {
Expand All @@ -192,10 +189,7 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
path: path.clone(),
editable: false,
};
return self
.path_metadata(source_dist, &path_source_dist)
.boxed()
.await;
return self.path_metadata(source_dist, &path_source_dist).await;
}
};

Expand All @@ -215,18 +209,13 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
&cache_shard,
None,
)
.boxed()
.await?
}
SourceDist::Git(git_source_dist) => {
self.git_metadata(source_dist, git_source_dist)
.boxed()
.await?
self.git_metadata(source_dist, git_source_dist).await?
}
SourceDist::Path(path_source_dist) => {
self.path_metadata(source_dist, path_source_dist)
.boxed()
.await?
self.path_metadata(source_dist, path_source_dist).await?
}
};

Expand Down Expand Up @@ -391,7 +380,6 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
// If the backend supports `prepare_metadata_for_build_wheel`, use it.
if let Some(metadata) = self
.build_source_dist_metadata(source_dist, source_dist_entry.path(), subdirectory)
.boxed()
.await?
{
if let Ok(cached) = fs::read(cache_entry.path()).await {
Expand Down Expand Up @@ -576,7 +564,6 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
// If the backend supports `prepare_metadata_for_build_wheel`, use it.
if let Some(metadata) = self
.build_source_dist_metadata(source_dist, &path_source_dist.path, None)
.boxed()
.await?
{
// Store the metadata for this build along with all the other builds.
Expand Down Expand Up @@ -725,7 +712,6 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
// If the backend supports `prepare_metadata_for_build_wheel`, use it.
if let Some(metadata) = self
.build_source_dist_metadata(source_dist, fetch.path(), subdirectory.as_deref())
.boxed()
.await?
{
// Store the metadata for this build along with all the other builds.
Expand Down
5 changes: 2 additions & 3 deletions crates/puffin-installer/src/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::cmp::Reverse;
use std::path::Path;
use std::sync::Arc;

use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
use futures::{Stream, StreamExt, TryFutureExt, TryStreamExt};
use tokio::task::JoinError;
use tracing::{instrument, warn};
use url::Url;
Expand Down Expand Up @@ -68,7 +68,7 @@ impl<'a, Context: BuildContext + Send + Sync> Downloader<'a, Context> {
) -> impl Stream<Item = Result<CachedDist, Error>> + 'stream {
futures::stream::iter(distributions)
.map(|dist| async {
let wheel = self.get_wheel(dist, in_flight).boxed().await?;
let wheel = self.get_wheel(dist, in_flight).await?;
if let Some(reporter) = self.reporter.as_ref() {
reporter.on_progress(&wheel);
}
Expand Down Expand Up @@ -158,7 +158,6 @@ impl<'a, Context: BuildContext + Send + Sync> Downloader<'a, Context> {
let download: LocalWheel = self
.database
.get_or_build_wheel(dist.clone())
.boxed()
.map_err(|err| Error::Fetch(dist.clone(), err))
.await?;
let result = Self::unzip_wheel(download).await;
Expand Down
5 changes: 1 addition & 4 deletions crates/puffin-resolver/src/resolver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,7 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> {
/// Fetch the metadata for a stream of packages and versions.
async fn fetch(&self, request_stream: UnboundedReceiver<Request>) -> Result<(), ResolveError> {
let mut response_stream = request_stream
.map(|request| self.process_request(request).boxed())
.map(|request| self.process_request(request))
.buffer_unordered(50);

while let Some(response) = response_stream.next().await {
Expand Down Expand Up @@ -738,7 +738,6 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> {
let version_map = self
.provider
.get_version_map(&package_name)
.boxed()
.await
.map_err(ResolveError::Client)?;
Ok(Some(Response::Package(package_name, version_map)))
Expand All @@ -749,7 +748,6 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> {
let (metadata, precise) = self
.provider
.get_or_build_wheel_metadata(&dist)
.boxed()
.await
.map_err(|err| match dist.clone() {
Dist::Built(BuiltDist::Path(built_dist)) => {
Expand Down Expand Up @@ -802,7 +800,6 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> {
let (metadata, precise) = self
.provider
.get_or_build_wheel_metadata(&dist)
.boxed()
.await
.map_err(|err| match dist.clone() {
Dist::Built(BuiltDist::Path(built_dist)) => {
Expand Down
2 changes: 1 addition & 1 deletion crates/puffin-traits/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use puffin_interpreter::{Interpreter, Virtualenv};
/// them.

// TODO(konstin): Proper error types
pub trait BuildContext: Sync {
pub trait BuildContext {
type SourceDistBuilder: SourceBuildTrait + Send + Sync;

fn cache(&self) -> &Cache;
Expand Down

0 comments on commit 692ab5b

Please sign in to comment.