Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): suppport disk object store #2389

Merged
merged 11 commits into from
May 16, 2022
Merged

Conversation

wenym1
Copy link
Contributor

@wenym1 wenym1 commented May 9, 2022

What's changed and what's your intention?

In this PR, we implement a disk object store for future spill to disk support. The local file system is used to maintain the path hierarchy and store the object data.

Checklist

  • I have written necessary docs and comments
  • I have added necessary unit tests and integration tests

Refer to a related PR or issue link (optional)

part of #2384

@wenym1 wenym1 requested a review from hzxa21 May 9, 2022 10:18
@wenym1 wenym1 self-assigned this May 9, 2022
@codecov
Copy link

codecov bot commented May 9, 2022

Codecov Report

Merging #2389 (e15caf5) into main (965e671) will increase coverage by 0.73%.
The diff coverage is 82.98%.

@@            Coverage Diff             @@
##             main    #2389      +/-   ##
==========================================
+ Coverage   71.31%   72.04%   +0.73%     
==========================================
  Files         688      677      -11     
  Lines       86790    87988    +1198     
==========================================
+ Hits        61897    63394    +1497     
+ Misses      24893    24594     -299     
Flag Coverage Δ
rust 72.04% <82.98%> (+0.73%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
src/storage/compactor/src/server.rs 0.00% <0.00%> (ø)
src/storage/src/object/mod.rs 13.63% <0.00%> (-1.75%) ⬇️
src/storage/src/object/s3.rs 0.00% <0.00%> (ø)
src/storage/src/store_impl.rs 5.79% <0.00%> (-0.99%) ⬇️
src/storage/src/object/mem.rs 78.21% <42.85%> (-0.14%) ⬇️
src/storage/src/hummock/sstable_store.rs 65.92% <50.00%> (+4.85%) ⬆️
src/storage/src/hummock/block_cache.rs 80.59% <55.55%> (+7.73%) ⬆️
src/storage/src/hummock/cache.rs 95.78% <88.00%> (+0.27%) ⬆️
src/storage/src/object/disk.rs 94.76% <94.76%> (ø)
src/storage/src/object/error.rs 38.09% <100.00%> (+15.87%) ⬆️
... and 216 more

📣 Codecov can now indicate which changes are the most critical in Pull Requests. Learn more

@skyzh
Copy link
Contributor

skyzh commented May 9, 2022

Do we have plan to support multiple object store backends for Hummock? If we want to spill to disk, I guess we are running two backends. I'm also thinking whether disk should be an object store. Maybe we only need to have disk interface for spill-to-disk, without making it a kind of object store?

@wenym1
Copy link
Contributor Author

wenym1 commented May 10, 2022

Do we have plan to support multiple object store backends for Hummock? If we want to spill to disk, I guess we are running two backends. I'm also thinking whether disk should be an object store.

Yes, in my development plan, we will have two object store, one for local and one for remote. The sstable table store will hold the two object store and do the routing according to the highest bit of the sstable id. A more progressed PR is #2384, and since it's too large, I split in into several smaller PR like this one.

Maybe we only need to have disk interface for spill-to-disk, without making it a kind of object store?

Since the spilled files will be in SST format, making it an object store can share some logic when we read SST and make the code neater. In future PR I will introduce a stream-like uploader for object store so that we don't have to buffer the whole SST in memory when we are using disk object store.

@skyzh
Copy link
Contributor

skyzh commented May 10, 2022

we will have two object store

Looks good!

use crate::object::{ObjectError, ObjectResult};

pub async fn ensure_file_dir_exists(path: &Path) -> ObjectResult<()> {
if let Some(dir) = path.parent() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should also assert path is not already a folder?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's not necessary. If the current path is a folder, in the subsequent open it will fail.

Ok(())
}

async fn read(&self, path: &str, block_loc: Option<BlockLocation>) -> ObjectResult<Bytes> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this would be very inefficient for disk object stores, and would make file handler boosting to a very high level, which would easily exceed file handler limit on most platforms (e.g. 256 on macOS).

For disk object store, I would recommend having a cache of opened files.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean after we finish using a file handle we recycle it in a cache instead of closing it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Also we should use a single file handler (file object) for a file, and then avoid using mutex around them. If there are multiple requests to a file, we should use pread to do positioned read.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1.
But it seems that tokio does not support pread.

@@ -117,6 +121,15 @@ impl StateStoreImpl {
state_store_stats.clone(),
);
in_mem_object_store.set_compactor_shutdown_sender(shutdown_sender);
} else if let ObjectStoreImpl::Disk(disk_object_store) = object_store.as_ref() {
tracing::info!("start a compactor for local disk object store");
let (_, shutdown_sender) = Compactor::start_compactor(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we limit the capability of this compactor? e.g., only allow it to compact files on local disk.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The compactor is unaware of where the file is coming from. It just fetches table id from meta and data from object store. The code here is only for probably using disk object store as the remote ground true object, similar to using in memory object store.

use std::fs::Metadata;
use std::path::Path;

use tokio::fs::{create_dir_all, File, OpenOptions};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we have a benchmark for tokio::fs and unix::fs ? For both bandwidth and latency.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just added a simple bench. The bench runs serially in a single thread. The bench result are as followed. Each r/w is operating on 1MB data.

fs_operation/tokio      time:   [124.42 ms 126.53 ms 128.76 ms]
tokio stat: Write 1310 times, avg 5.948854961832061ms. Read 1310 times, avg 4.315267175572519ms

fs_operation/std        time:   [27.044 ms 27.805 ms 28.629 ms]
std stat: Write 3270 times, avg 0.0027522935779816515ms. Read 3270 times, avg 0.046788990825688076ms

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have plan to support multiple object store backends for Hummock? If we want to spill to disk, I guess we are running two backends. I'm also thinking whether disk should be an object store.

Yes, in my development plan, we will have two object store, one for local and one for remote. The sstable table store will hold the two object store and do the routing according to the highest bit of the sstable id. A more progressed PR is #2384, and since it's too large, I split in into several smaller PR like this one.

Maybe we only need to have disk interface for spill-to-disk, without making it a kind of object store?

Since the spilled files will be in SST format, making it an object store can share some logic when we read SST and make the code neater. In future PR I will introduce a stream-like uploader for object store so that we don't have to buffer the whole SST in memory when we are using disk object store.

Would we implement it as a hybrid-storage? Just like following:

pub struct HybridObjectStore {
    local: Box<dyn ObjectStore>,
    remote: Box<dyn ObjectStore>,
}

impl ObjectStore for HybridObjectStore {
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have thought about it. If we implement it in this way, in the current ObjectStore interface, I can't find an elegant way to specify which object store to use. In the future, such HybridObjectStore can be used with cache semantic, i.e. we first lookup the local object store, which acts as a local cache, and on cache miss, we then lookup the remote object store.

Comment on lines +938 to +942
#[cfg(debug_assertions)]
{
assert!(!(*old_entry).is_in_lru());
assert!((*new_entry).is_in_lru());
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use debug_assert here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is_in_lru only compiles when debug_assertions is enabled, while code generated from debug_assert is if cfg!(debug_asserts){...}, whose body is still compiled even when debug_assertions is disabled. Therefore, if we use debug_asserts here we will get a compile error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

src/storage/src/object/disk.rs Outdated Show resolved Hide resolved
src/storage/src/object/disk.rs Outdated Show resolved Hide resolved
@BugenZhao BugenZhao requested a review from MrCroxx May 12, 2022 05:40
@wenym1
Copy link
Contributor Author

wenym1 commented May 12, 2022

Just updated the benchmark. Added benchmarking with spawn_block(std_file.read/pread) and block-in-place pread.

New benchmark result is as followed:

fs_operation/tokio/write    time:   [44.744 ms 45.613 ms 46.502 ms]
Bench tokio write: op 3270 times, avg time: 4.022018348623853 ms

fs_operation/tokio/read time:   [57.975 ms 59.133 ms 60.315 ms]
Bench tokio read: op 1630 times, avg time: 4.62760736196319 ms

fs_operation/tokio/blocking-read   time:   [14.325 ms 14.435 ms 14.547 ms]
Bench tokio blocking-read: op 6550 times, avg time: 0.35404580152671755 ms

fs_operation/tokio/blocking-pread     time:   [14.186 ms 14.347 ms 14.525 ms]
Bench tokio blocking-pread: op 6550 times, avg time: 0.32916030534351143 ms

fs_operation/std/write  time:   [10.288 ms 10.329 ms 10.409 ms] 
Bench std write: op 10110 times, avg time: 0.0016815034619188922 ms

fs_operation/std/read   time:   [11.364 ms 11.440 ms 11.521 ms]
Bench std read: op 10110 times, avg time: 0.004055390702274975 ms

fs_operation/std/pread  time:   [11.649 ms 11.727 ms 11.813 ms]
Bench std pread: op 10110 times, avg time: 0.001582591493570722 ms

@wenym1
Copy link
Contributor Author

wenym1 commented May 13, 2022

Just added an opened file cache. Used spawn_blocking as a work-around to call file.read_exact_at.

Also extracted the common logic of deduplicating concurrent request on cache entry in cache.lookup_with_request_dedup. Block cache and meta cache, which uses the cache.lookup_for_request, are refactored accordingly.

Comment on lines +961 to 962
#[cfg(debug_assertions)]
assert!((*new_entry).is_in_lru());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#[cfg(debug_assertions)]
assert!((*new_entry).is_in_lru());
debug_assert!((*new_entry).is_in_lru());

I guess this will work

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not working if we are running cargo bench, where unit test will be compile while debug_assertions is not enabled.

The code of is_in_lru only exists when debug_assertions is enabled.

#[cfg(debug_assertions)]
fn is_in_lru(&self) -> bool {
    (self.flags & IN_LRU) > 0
}

, while the generated code of debug_assert is

if cfg!(debug_assertions) {
 ...
}

, and usage for is_in_lru still exists even if debug_assertions is not enabled, and when running cargo bench, the compilation will faill.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, I originally expected that debug_asserts is equivalent to #[cfg(debug_assertions)].

Comment on lines +938 to +942
#[cfg(debug_assertions)]
{
assert!(!(*old_entry).is_in_lru());
assert!((*new_entry).is_in_lru());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor

@skyzh skyzh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rest LGTM, good work!

src/storage/src/hummock/cache.rs Show resolved Hide resolved
match self.lookup_for_request(hash, key.clone()) {
LookupResult::Cached(entry) => Ok(entry),
LookupResult::WaitPendingRequest(recv) => {
let entry = recv.await.map_err(HummockError::other)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to return the original error to the caller, instead of channel closed or something unclear. Here we can only get channel closed error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, moka will return Arc<Error> to all callers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original error may not support Clone?

Comment on lines +961 to 962
#[cfg(debug_assertions)]
assert!((*new_entry).is_in_lru());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, I originally expected that debug_asserts is equivalent to #[cfg(debug_assertions)].

}
}

async fn readv(&self, path: &str, block_locs: Vec<BlockLocation>) -> ObjectResult<Vec<Bytes>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

block_locs: impl AsRef<[BlockLocation]>, so that users can either pass a reference &[BlockLocation { xxx }] or a vector.

impl Drop for InMemObjectStore {
fn drop(&mut self) {
if let Some(sender) = self.compactor_shutdown_sender.lock().take() {
let _ = sender.send(());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to join the compactor thread (future), otherwise there will be some unexpected errors on SIGINT.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why notify compactor shutdown in objectstore?
We shall notify compactor shutdown in state-store closed.

Copy link
Contributor

@Little-Wallace Little-Wallace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@wenym1 wenym1 merged commit 10cce2e into main May 16, 2022
@wenym1 wenym1 deleted the yiming/disk_object_store branch May 16, 2022 08:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants