Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] [Tiered caching] Introducing cache plugins and exposing Ehcache as one of the pluggable disk cache option #12509

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased 2.x]
### Added
- [Tiered caching] Introducing cache plugins and exposing Ehcache as one of the pluggable disk cache option ([#11874](https:/opensearch-project/OpenSearch/pull/11874))
- Add support for dependencies in plugin descriptor properties with semver range ([#11441](https:/opensearch-project/OpenSearch/pull/11441))
- Add community_id ingest processor ([#12121](https:/opensearch-project/OpenSearch/pull/12121))
- Introduce query level setting `index.query.max_nested_depth` limiting nested queries ([#3268](https:/opensearch-project/OpenSearch/issues/3268)
Expand Down
17 changes: 17 additions & 0 deletions modules/cache-common/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

opensearchplugin {
description 'Module for caches which are optional and do not require additional security permission'
classname 'org.opensearch.cache.common.tier.TieredSpilloverCachePlugin'
}

test {
// TODO: Adding permission in plugin-security.policy doesn't seem to work.
systemProperty 'tests.security.manager', 'false'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,335 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cache.common.tier;

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.ICache;
import org.opensearch.common.cache.LoadAwareCacheLoader;
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.store.config.CacheConfig;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.ReleasableLock;
import org.opensearch.common.util.iterable.Iterables;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;

/**
* This cache spillover the evicted items from heap tier to disk tier. All the new items are first cached on heap
* and the items evicted from on heap cache are moved to disk based cache. If disk based cache also gets full,
* then items are eventually evicted from it and removed which will result in cache miss.
*
* @param <K> Type of key
* @param <V> Type of value
*
* @opensearch.experimental
*/
@ExperimentalApi
public class TieredSpilloverCache<K, V> implements ICache<K, V> {

private final ICache<K, V> diskCache;
private final ICache<K, V> onHeapCache;
private final RemovalListener<K, V> removalListener;
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
ReleasableLock readLock = new ReleasableLock(readWriteLock.readLock());
ReleasableLock writeLock = new ReleasableLock(readWriteLock.writeLock());
/**
* Maintains caching tiers in ascending order of cache latency.
*/
private final List<ICache<K, V>> cacheList;

TieredSpilloverCache(Builder<K, V> builder) {
Objects.requireNonNull(builder.onHeapCacheFactory, "onHeap cache builder can't be null");
Objects.requireNonNull(builder.diskCacheFactory, "disk cache builder can't be null");
this.removalListener = Objects.requireNonNull(builder.removalListener, "Removal listener can't be null");

this.onHeapCache = builder.onHeapCacheFactory.create(
new CacheConfig.Builder<K, V>().setRemovalListener(new RemovalListener<K, V>() {
@Override
public void onRemoval(RemovalNotification<K, V> notification) {
try (ReleasableLock ignore = writeLock.acquire()) {
diskCache.put(notification.getKey(), notification.getValue());
}
removalListener.onRemoval(notification);
}
})
.setKeyType(builder.cacheConfig.getKeyType())
.setValueType(builder.cacheConfig.getValueType())
.setSettings(builder.cacheConfig.getSettings())
.setWeigher(builder.cacheConfig.getWeigher())
.build(),
builder.cacheType,
builder.cacheFactories

);
this.diskCache = builder.diskCacheFactory.create(builder.cacheConfig, builder.cacheType, builder.cacheFactories);
this.cacheList = Arrays.asList(onHeapCache, diskCache);
}

// Package private for testing
ICache<K, V> getOnHeapCache() {
return onHeapCache;
}

// Package private for testing
ICache<K, V> getDiskCache() {
return diskCache;
}

@Override
public V get(K key) {
return getValueFromTieredCache().apply(key);
}

@Override
public void put(K key, V value) {
try (ReleasableLock ignore = writeLock.acquire()) {
onHeapCache.put(key, value);
}
}

@Override
public V computeIfAbsent(K key, LoadAwareCacheLoader<K, V> loader) throws Exception {

V cacheValue = getValueFromTieredCache().apply(key);
if (cacheValue == null) {
// Add the value to the onHeap cache. We are calling computeIfAbsent which does another get inside.
// This is needed as there can be many requests for the same key at the same time and we only want to load
// the value once.
V value = null;
try (ReleasableLock ignore = writeLock.acquire()) {
value = onHeapCache.computeIfAbsent(key, loader);
}
return value;
}
return cacheValue;
}

@Override
public void invalidate(K key) {
// We are trying to invalidate the key from all caches though it would be present in only of them.
// Doing this as we don't know where it is located. We could do a get from both and check that, but what will
// also trigger a hit/miss listener event, so ignoring it for now.
try (ReleasableLock ignore = writeLock.acquire()) {
for (ICache<K, V> cache : cacheList) {
cache.invalidate(key);
}
}
}

@Override
public void invalidateAll() {
try (ReleasableLock ignore = writeLock.acquire()) {
for (ICache<K, V> cache : cacheList) {
cache.invalidateAll();
}
}
}

/**
* Provides an iteration over both onHeap and disk keys. This is not protected from any mutations to the cache.
* @return An iterable over (onHeap + disk) keys
*/
@SuppressWarnings("unchecked")
@Override
public Iterable<K> keys() {
return Iterables.concat(onHeapCache.keys(), diskCache.keys());
}

@Override
public long count() {
long count = 0;
for (ICache<K, V> cache : cacheList) {
count += cache.count();
}
return count;
}

@Override
public void refresh() {
try (ReleasableLock ignore = writeLock.acquire()) {
for (ICache<K, V> cache : cacheList) {
cache.refresh();
}
}
}

@Override
public void close() throws IOException {
for (ICache<K, V> cache : cacheList) {
cache.close();
}
}

private Function<K, V> getValueFromTieredCache() {
return key -> {
try (ReleasableLock ignore = readLock.acquire()) {
for (ICache<K, V> cache : cacheList) {
V value = cache.get(key);
if (value != null) {
// update hit stats
return value;
} else {
// update miss stats
}
}
}
return null;
};
}

/**
* Factory to create TieredSpilloverCache objects.
*/
public static class TieredSpilloverCacheFactory implements ICache.Factory {

/**
* Defines cache name
*/
public static final String TIERED_SPILLOVER_CACHE_NAME = "tiered_spillover";

/**
* Default constructor
*/
public TieredSpilloverCacheFactory() {}

@Override
public <K, V> ICache<K, V> create(CacheConfig<K, V> config, CacheType cacheType, Map<String, Factory> cacheFactories) {
Settings settings = config.getSettings();
Setting<String> onHeapSetting = TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_NAME.getConcreteSettingForNamespace(
cacheType.getSettingPrefix()
);
String onHeapCacheStoreName = onHeapSetting.get(settings);
if (!cacheFactories.containsKey(onHeapCacheStoreName)) {
throw new IllegalArgumentException(
"No associated onHeapCache found for tieredSpilloverCache for " + "cacheType:" + cacheType
);
}
ICache.Factory onHeapCacheFactory = cacheFactories.get(onHeapCacheStoreName);

Setting<String> onDiskSetting = TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace(
cacheType.getSettingPrefix()
);
String diskCacheStoreName = onDiskSetting.get(settings);
if (!cacheFactories.containsKey(diskCacheStoreName)) {
throw new IllegalArgumentException(
"No associated diskCache found for tieredSpilloverCache for " + "cacheType:" + cacheType
);
}
ICache.Factory diskCacheFactory = cacheFactories.get(diskCacheStoreName);
return new Builder<K, V>().setDiskCacheFactory(diskCacheFactory)
.setOnHeapCacheFactory(onHeapCacheFactory)
.setRemovalListener(config.getRemovalListener())
.setCacheConfig(config)
.setCacheType(cacheType)
.build();
}

@Override
public String getCacheName() {
return TIERED_SPILLOVER_CACHE_NAME;
}
}

/**
* Builder object for tiered spillover cache.
* @param <K> Type of key
* @param <V> Type of value
*/
public static class Builder<K, V> {
private ICache.Factory onHeapCacheFactory;
private ICache.Factory diskCacheFactory;
private RemovalListener<K, V> removalListener;
private CacheConfig<K, V> cacheConfig;
private CacheType cacheType;
private Map<String, ICache.Factory> cacheFactories;

/**
* Default constructor
*/
public Builder() {}

/**
* Set onHeap cache factory
* @param onHeapCacheFactory Factory for onHeap cache.
* @return builder
*/
public Builder<K, V> setOnHeapCacheFactory(ICache.Factory onHeapCacheFactory) {
this.onHeapCacheFactory = onHeapCacheFactory;
return this;
}

/**
* Set disk cache factory
* @param diskCacheFactory Factory for disk cache.
* @return builder
*/
public Builder<K, V> setDiskCacheFactory(ICache.Factory diskCacheFactory) {
this.diskCacheFactory = diskCacheFactory;
return this;
}

/**
* Set removal listener for tiered cache.
* @param removalListener Removal listener
* @return builder
*/
public Builder<K, V> setRemovalListener(RemovalListener<K, V> removalListener) {
this.removalListener = removalListener;
return this;
}

/**
* Set cache config.
* @param cacheConfig cache config.
* @return builder
*/
public Builder<K, V> setCacheConfig(CacheConfig<K, V> cacheConfig) {
this.cacheConfig = cacheConfig;
return this;
}

/**
* Set cache type.
* @param cacheType Cache type
* @return builder
*/
public Builder<K, V> setCacheType(CacheType cacheType) {
this.cacheType = cacheType;
return this;
}

/**
* Set cache factories
* @param cacheFactories cache factories
* @return builder
*/
public Builder<K, V> setCacheFactories(Map<String, ICache.Factory> cacheFactories) {
this.cacheFactories = cacheFactories;
return this;
}

/**
* Build tiered spillover cache.
* @return TieredSpilloverCache
*/
public TieredSpilloverCache<K, V> build() {
return new TieredSpilloverCache<>(this);
}
}
}
Loading
Loading