Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/templatestorage #1298

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions common/src/main/java/eu/cloudnetservice/common/io/FileUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -241,14 +241,32 @@ public static void delete(@Nullable Path path) {
}

/**
* Resolves a random path in the temp directory of the cloud and creates all needed parents directories for a
* temporary file including the {@link FileUtil#TEMP_DIR}.
* Resolves a random path in the temp directory of the cloud and creates all needed parent directories for a temporary
* file including the {@link FileUtil#TEMP_DIR}. This method is equivalent to
* {@code FileUtil.createTempFile(UUID.randomUUID().toString())}
*
* @return the path to the temporary file.
* @see #createTempFile(String)
*/
public static @NonNull Path createTempFile() {
return createTempFile(UUID.randomUUID().toString());
}

/**
* Resolves a path in the temp directory of the cloud and creates all needed parent directories for a temporary file
* including the {@link FileUtil#TEMP_DIR}. The final name might not be the argument if the file already exists.
*
* @param name the preferred name of the temporary file.
* @return the path to the temporary file.
*/
public static @NonNull Path createTempFile(String name) {
createDirectory(TEMP_DIR);
return TEMP_DIR.resolve(UUID.randomUUID().toString());
int id = 0;
Path path;
do {
path = TEMP_DIR.resolve(id++ == 0 ? name : name + "-" + id);
} while (Files.exists(path));
return path;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,5 +72,13 @@ interface Callback {
void handleSessionComplete(
@NonNull ChunkSessionInformation information,
@NonNull InputStream dataInput) throws IOException;

/**
* @return whether the {@code dataInput} from {@link #handleSessionComplete(ChunkSessionInformation, InputStream)}
* should be closed after {@link #handleSessionComplete(ChunkSessionInformation, InputStream)} has finished
*/
default boolean autoClose() {
return true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import eu.cloudnetservice.driver.network.chunk.TransferStatus;
import eu.cloudnetservice.driver.network.chunk.data.ChunkSessionInformation;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.nio.file.Files;
import java.nio.file.Path;
Expand Down Expand Up @@ -85,6 +86,7 @@ public DefaultFileChunkedPacketHandler(
try {
// create the file
if (Files.notExists(tempFilePath)) {
FileUtil.createDirectory(tempFilePath.getParent());
Files.createFile(tempFilePath);
}
// open the file
Expand Down Expand Up @@ -128,9 +130,15 @@ public boolean handleChunkPart(int chunkPosition, @NonNull DataBuf dataBuf) {
return true;
}
// delete the file after posting
try (var inputStream = Files.newInputStream(this.tempFilePath, StandardOpenOption.DELETE_ON_CLOSE)) {
InputStream inputStream = null;
try {
inputStream = Files.newInputStream(this.tempFilePath, StandardOpenOption.DELETE_ON_CLOSE);
this.writeCompleteHandler.handleSessionComplete(this.chunkSessionInformation, inputStream);
return true;
} finally {
if (inputStream != null && this.writeCompleteHandler.autoClose()) {
inputStream.close();
}
}
}
// not completed yet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,13 @@ private void registerTransformer(@NonNull TransformerRegistry transformerRegistr

@Inject
@Order(300)
private void registerDefaultListeners(@NonNull EventManager eventManager) {
private void registerDefaultListeners(@NonNull EventManager eventManager,
TemplateStorageCallbackListener templateStorageCallbackListener) {
eventManager.registerListener(TaskChannelMessageListener.class);
eventManager.registerListener(GroupChannelMessageListener.class);
eventManager.registerListener(ServiceChannelMessageListener.class);
eventManager.registerListener(TemplateStorageCallbackListener.class);
// We can't use the extension layer here, or we get multiple instances
eventManager.registerListener(templateStorageCallbackListener);
}

@Inject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import eu.cloudnetservice.driver.template.TemplateStorageProvider;
import eu.cloudnetservice.wrapper.configuration.WrapperConfiguration;
import eu.cloudnetservice.wrapper.database.WrapperDatabaseProvider;
import eu.cloudnetservice.wrapper.network.chunk.TemplateStorageCallbackListener;
import eu.cloudnetservice.wrapper.permission.WrapperPermissionManagement;
import eu.cloudnetservice.wrapper.provider.WrapperCloudServiceProvider;
import eu.cloudnetservice.wrapper.provider.WrapperTemplateStorageProvider;
Expand Down Expand Up @@ -106,14 +107,15 @@ private RPCFactories() {
public static @NonNull TemplateStorageProvider provideTemplateStorageProvider(
@NonNull RPCFactory factory,
@NonNull NetworkClient networkClient,
@NonNull ComponentInfo componentInfo
@NonNull ComponentInfo componentInfo,
@NonNull TemplateStorageCallbackListener templateStorageCallbackListener
) {
return provideSpecial(
factory,
networkClient,
TemplateStorageProvider.class,
WrapperTemplateStorageProvider.class,
componentInfo, networkClient);
componentInfo, networkClient, templateStorageCallbackListener);
}

@Factory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package eu.cloudnetservice.driver.template.defaults;
package eu.cloudnetservice.wrapper.network;

import eu.cloudnetservice.common.io.FileUtil;
import eu.cloudnetservice.common.io.ListenableOutputStream;
Expand All @@ -28,12 +28,12 @@
import eu.cloudnetservice.driver.network.def.NetworkConstants;
import eu.cloudnetservice.driver.service.ServiceTemplate;
import eu.cloudnetservice.driver.template.TemplateStorage;
import eu.cloudnetservice.wrapper.network.chunk.TemplateStorageCallbackListener;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
Expand All @@ -49,23 +49,26 @@ public abstract class RemoteTemplateStorage implements TemplateStorage {

private final String name;
private final ComponentInfo componentInfo;
private final TemplateStorageCallbackListener templateStorageCallbackListener;
private final NetworkClient networkClient;

/**
* Constructs a new remote template storage instance.
*
* @param name the name of the storage which was created.
* @param componentInfo the information about the current component.
* @param networkClient the network client of the current component.
* @param name the name of the storage which was created.
* @param componentInfo the information about the current component.
* @param templateStorageCallbackListener the callback listener for file transfer
* @param networkClient the network client of the current component.
* @throws NullPointerException if the given name, component info or network client is null.
*/
public RemoteTemplateStorage(
@NonNull String name,
@NonNull ComponentInfo componentInfo,
@NonNull NetworkClient networkClient
TemplateStorageCallbackListener templateStorageCallbackListener, @NonNull NetworkClient networkClient
) {
this.name = name;
this.componentInfo = componentInfo;
this.templateStorageCallbackListener = templateStorageCallbackListener;
this.networkClient = networkClient;
}

Expand Down Expand Up @@ -112,9 +115,10 @@ public boolean deploy(@NonNull ServiceTemplate target, @NonNull InputStream inpu
* {@inheritDoc}
*/
@Override
public @Nullable InputStream zipTemplate(@NonNull ServiceTemplate template) throws IOException {
public @Nullable InputStream zipTemplate(@NonNull ServiceTemplate template) {
// send a request for the template to the node
var responseId = UUID.randomUUID();
this.templateStorageCallbackListener.startSession(responseId);
var response = ChannelMessage.builder()
.message("remote_templates_zip_template")
.channel(NetworkConstants.INTERNAL_MSG_CHANNEL)
Expand All @@ -124,10 +128,11 @@ public boolean deploy(@NonNull ServiceTemplate target, @NonNull InputStream inpu
.sendSingleQuery();
// check if we got a response
if (response == null || !response.content().readBoolean()) {
this.templateStorageCallbackListener.stopSession(responseId);
return null;
}
// the file is transferred and should be readable
return Files.newInputStream(FileUtil.TEMP_DIR.resolve(responseId.toString()), StandardOpenOption.DELETE_ON_CLOSE);
// the file is transferred, but may not be fully written yet
return this.templateStorageCallbackListener.waitForFile(responseId);
}

/**
Expand Down Expand Up @@ -190,9 +195,10 @@ public boolean deploy(@NonNull ServiceTemplate target, @NonNull InputStream inpu
public @Nullable InputStream newInputStream(
@NonNull ServiceTemplate template,
@NonNull String path
) throws IOException {
) {
// send a request for the file to the node
var responseId = UUID.randomUUID();
this.templateStorageCallbackListener.startSession(responseId);
var response = ChannelMessage.builder()
.message("remote_templates_template_file")
.channel(NetworkConstants.INTERNAL_MSG_CHANNEL)
Expand All @@ -202,9 +208,10 @@ public boolean deploy(@NonNull ServiceTemplate target, @NonNull InputStream inpu
.sendSingleQuery();
// check if we got a response
if (response == null || !response.content().readBoolean()) {
this.templateStorageCallbackListener.stopSession(responseId);
return null;
}
// the file is transferred and should be readable
return Files.newInputStream(FileUtil.TEMP_DIR.resolve(responseId.toString()), StandardOpenOption.DELETE_ON_CLOSE);
// the file is transferred, but may not be fully written yet
return this.templateStorageCallbackListener.waitForFile(responseId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,122 @@

package eu.cloudnetservice.wrapper.network.chunk;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalListener;
import com.github.benmanes.caffeine.cache.Scheduler;
import eu.cloudnetservice.common.concurrent.Task;
import eu.cloudnetservice.common.io.FileUtil;
import eu.cloudnetservice.driver.event.EventListener;
import eu.cloudnetservice.driver.event.events.chunk.ChunkedPacketSessionOpenEvent;
import eu.cloudnetservice.driver.network.chunk.ChunkedPacketHandler;
import eu.cloudnetservice.driver.network.chunk.data.ChunkSessionInformation;
import eu.cloudnetservice.driver.network.chunk.defaults.DefaultFileChunkedPacketHandler;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.io.InputStream;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import lombok.NonNull;
import org.jetbrains.annotations.Nullable;

public final class TemplateStorageCallbackListener {
@Singleton
public final class TemplateStorageCallbackListener implements ChunkedPacketHandler.Callback {

private final Cache<UUID, Task<InputStream>> activeSessions;

@Inject
public TemplateStorageCallbackListener() {
// We have 5 minutes to receive a file
this.activeSessions = Caffeine.newBuilder()
.expireAfterWrite(5, TimeUnit.MINUTES)
.scheduler(Scheduler.systemScheduler())
.removalListener(this.newRemovalListener()).build();
}

private RemovalListener<UUID, Task<InputStream>> newRemovalListener() {
return ($, value, cause) -> {
if (cause.wasEvicted() && value != null) {
value.completeExceptionally(new TimeoutException());
}
};
}

@EventListener
public void handle(@NonNull ChunkedPacketSessionOpenEvent event) {
if (event.session().transferChannel().equals("request_template_file_result")) {
if (!this.sessionExists(event.session().sessionUniqueId())) {
// No point in continuing when there is no running session.
// Might want to add some logging here as this is almost certainly caused by an error
return;
}
event.handler(new DefaultFileChunkedPacketHandler(
event.session(),
null,
FileUtil.TEMP_DIR.resolve(event.session().sessionUniqueId().toString())));
this,
FileUtil.createTempFile(event.session().sessionUniqueId().toString())));
}
}

/**
* This waits for a session started with {@link #startSession(UUID)} to complete. This also stops the session
*
* @param sessionId the session id
* @return the {@link InputStream} for the file in the session, or null if session was not found or failed
*/
public @Nullable InputStream waitForFile(UUID sessionId) {
var task = this.activeSessions.getIfPresent(sessionId);
if (task == null) {
return null;
}
var stream = task.getDef(null);
// Invalidate the session. No need to keep it in memory at this point
this.activeSessions.invalidate(sessionId);
return stream;
}

/**
* Stops the session
*
* @param sessionId the session id
*/
public void stopSession(UUID sessionId) {
this.activeSessions.invalidate(sessionId);
}

/**
* Check if a session exists
*
* @param sessionId the session id
* @return whether the session exists
*/
public boolean sessionExists(UUID sessionId) {
return this.activeSessions.getIfPresent(sessionId) != null;
}

/**
* This starts a session. This session will expire after 5 Minutes. {@link #stopSession(UUID)} should be called after
* this to prevent using memory until the session expires
*
* @param sessionId the session id
*/
public void startSession(UUID sessionId) {
this.activeSessions.put(sessionId, new Task<>());
}

@Override
public boolean autoClose() {
// We want to close this manually to maintain API
return false;
}

@Override
public void handleSessionComplete(ChunkSessionInformation information, InputStream dataInput) {
var sessionId = information.sessionUniqueId();
var task = this.activeSessions.getIfPresent(sessionId);
// Complete the session if we find it
if (task != null) {
task.complete(dataInput);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
import eu.cloudnetservice.driver.service.ServiceTemplate;
import eu.cloudnetservice.driver.template.TemplateStorage;
import eu.cloudnetservice.driver.template.TemplateStorageProvider;
import eu.cloudnetservice.driver.template.defaults.RemoteTemplateStorage;
import eu.cloudnetservice.wrapper.network.RemoteTemplateStorage;
import eu.cloudnetservice.wrapper.network.chunk.TemplateStorageCallbackListener;
import lombok.NonNull;
import org.jetbrains.annotations.Nullable;

Expand All @@ -32,15 +33,17 @@ public abstract class WrapperTemplateStorageProvider implements TemplateStorageP
private final RPCSender rpcSender;
private final ComponentInfo componentInfo;
private final NetworkClient networkClient;
private final TemplateStorageCallbackListener templateStorageCallbackListener;

public WrapperTemplateStorageProvider(
@NonNull RPCSender sender,
@NonNull ComponentInfo componentInfo,
@NonNull NetworkClient networkClient
) {
@NonNull NetworkClient networkClient,
TemplateStorageCallbackListener templateStorageCallbackListener) {
this.rpcSender = sender;
this.componentInfo = componentInfo;
this.networkClient = networkClient;
this.templateStorageCallbackListener = templateStorageCallbackListener;
}

@Override
Expand All @@ -59,6 +62,7 @@ public WrapperTemplateStorageProvider(
this.rpcSender,
TemplateStorage.class,
GenerationContext.forClass(RemoteTemplateStorage.class).build()
).newInstance(new Object[]{storage, this.componentInfo, this.networkClient}, new Object[]{storage});
).newInstance(new Object[]{storage, this.componentInfo, this.templateStorageCallbackListener, this.networkClient},
new Object[]{storage});
}
}